from typing import Generator, Callable
from cobald import interfaces
from usim import eternity, Scope, interval
from .drone import Drone
[docs]class Pool(interfaces.Pool):
"""
A pool encapsulating a number of pools or drones. Given a specific demand,
allocation and utilisation, the pool is able to adapt in terms of number of
drones providing the given resources.
:param capacity: Maximum number of pools that can be instantiated within the pool
:param init: Number of pools to instantiate at creation time of the pool
:param name: Name of the pool
:param make_drone: Callable to create a drone with specific properties for this pool
"""
def __init__(
self,
make_drone: Callable,
*,
capacity: int = float("inf"),
init: int = 0,
name: str = None,
):
super(Pool, self).__init__()
assert init <= capacity
self.make_drone = make_drone
self._drones = []
self._demand = 1
self._level = init
self._capacity = capacity
self._name = name
async def init_pool(self, scope: Scope, init: int = 0):
"""
Initialisation of existing drones at creation time of pool.
:param init: Number of drones to create.
"""
for _ in range(init):
drone = self.make_drone(0)
scope.do(drone.run())
self._drones.append(drone)
# TODO: the run method currently needs to be called manually
async def run(self):
"""
Pool periodically checks the current demand and provided drones.
If demand is higher than the current level, the pool takes care of
initialising new drones. Otherwise drones get removed.
"""
async with Scope() as scope:
await self.init_pool(scope=scope, init=self._level)
async for _ in interval(1):
drones_required = min(self._demand, self._capacity) - self._level
while drones_required > 0:
drones_required -= 1
# start a new drone
drone = self.make_drone(10)
scope.do(drone.run())
self._drones.append(drone)
self._level += 1
if drones_required < 0:
for drone in self.drones:
if drone.jobs == 0:
drones_required += 1
self._level -= 1
self._drones.remove(drone)
scope.do(drone.shutdown())
if drones_required == 0:
break
@property
def drones(self) -> Generator[Drone, None, None]:
for drone in self._drones:
if drone.supply > 0:
yield drone
def drone_demand(self) -> int:
return len(self._drones)
@property
def allocation(self) -> float:
allocations = []
for drone in self._drones:
allocations.append(drone.allocation)
try:
return sum(allocations) / len(allocations)
except ZeroDivisionError:
return 1
@property
def utilisation(self) -> float:
utilisations = []
for drone in self._drones:
utilisations.append(drone.utilisation)
try:
return sum(utilisations) / len(utilisations)
except ZeroDivisionError:
return 1
@property
def supply(self) -> float:
supply = 0
for drone in self._drones:
supply += drone.supply
return supply
@property
def demand(self) -> float:
return self._demand
@demand.setter
def demand(self, value: float):
if value > 0:
self._demand = value
else:
self._demand = 0
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self._name or id(self))
[docs]class StaticPool(Pool):
"""
A static pool does not react on changing conditions regarding demand,
allocation and utilisation but instead initialises the `capacity` of given
drones with initialised `resources`.
:param capacity: Maximum number of pools that can be instantiated within
the pool
:param resources: Dictionary of resources available for each pool
instantiated within the pool
"""
def __init__(self, make_drone: Callable, capacity: int = 0):
assert capacity > 0, "Static pool was initialised without any resources..."
super(StaticPool, self).__init__(
capacity=capacity, init=capacity, make_drone=make_drone
)
self._demand = capacity
async def run(self):
"""
Pool runs forever and does not check if number of drones needs to be adapted.
"""
async with Scope() as scope:
await self.init_pool(scope=scope, init=self._level)
await eternity