from typing import Dict
from usim import Scope, interval, Resources
from lapis.drone import Drone
from lapis.monitor import sampling_required
class JobQueue(list):
pass
[docs]class CondorJobScheduler(object):
"""
Goal of the htcondor job scheduler is to have a scheduler that somehow
mimics how htcondor does schedule jobs.
Htcondor does scheduling based on a priority queue. The priorities itself
are managed by operators of htcondor.
So different instances can apparently behave very different.
In my case I am going to try building a priority queue that sorts job slots
by increasing cost. The cost itself is calculated based on the current
strategy that is used at GridKa. The scheduler checks if a job either
exactly fits a slot or if it does fit into it several times. The cost for
putting a job at a given slot is given by the amount of resources that
might remain unallocated.
:return:
"""
def __init__(self, job_queue):
self._stream_queue = job_queue
self.drone_cluster = []
self.interval = 60
self.job_queue = JobQueue()
self._collecting = True
self._processing = Resources(jobs=0)
@property
def drone_list(self):
for cluster in self.drone_cluster:
for drone in cluster:
yield drone
def register_drone(self, drone: Drone):
self._add_drone(drone)
def unregister_drone(self, drone: Drone):
for cluster in self.drone_cluster:
try:
cluster.remove(drone)
except ValueError:
pass
else:
if len(cluster) == 0:
self.drone_cluster.remove(cluster)
def _add_drone(self, drone: Drone, drone_resources: Dict = None):
minimum_distance_cluster = None
distance = float("Inf")
if len(self.drone_cluster) > 0:
for cluster in self.drone_cluster:
current_distance = 0
for key in {*cluster[0].pool_resources, *drone.pool_resources}:
if drone_resources:
current_distance += abs(
cluster[0].theoretical_available_resources.get(key, 0)
- drone_resources.get(key, 0)
)
else:
current_distance += abs(
cluster[0].theoretical_available_resources.get(key, 0)
- drone.theoretical_available_resources.get(key, 0)
)
if current_distance < distance:
minimum_distance_cluster = cluster
distance = current_distance
if distance < 1:
minimum_distance_cluster.append(drone)
else:
self.drone_cluster.append([drone])
else:
self.drone_cluster.append([drone])
def update_drone(self, drone: Drone):
self.unregister_drone(drone)
self._add_drone(drone)
async def run(self):
async with Scope() as scope:
scope.do(self._collect_jobs())
async for _ in interval(self.interval):
for job in self.job_queue.copy():
best_match = self._schedule_job(job)
if best_match:
await best_match.schedule_job(job)
self.job_queue.remove(job)
await sampling_required.put(self.job_queue)
self.unregister_drone(best_match)
left_resources = best_match.theoretical_available_resources
left_resources = {
key: value - job.resources.get(key, 0)
for key, value in left_resources.items()
}
self._add_drone(best_match, left_resources)
if (
not self._collecting
and not self.job_queue
and self._processing.levels.jobs == 0
):
break
await sampling_required.put(self)
async def _collect_jobs(self):
async for job in self._stream_queue:
self.job_queue.append(job)
await self._processing.increase(jobs=1)
# TODO: logging happens with each job
await sampling_required.put(self.job_queue)
self._collecting = False
async def job_finished(self, job):
if job.successful:
await self._processing.decrease(jobs=1)
else:
await self._stream_queue.put(job)
def _schedule_job(self, job) -> Drone:
priorities = {}
for cluster in self.drone_cluster:
drone = cluster[0]
cost = 0
resources = drone.theoretical_available_resources
for resource_type in job.resources:
if resources.get(resource_type, 0) < job.resources[resource_type]:
# Inf for all job resources that a drone does not support
# and all resources that are too small to even be considered
cost = float("Inf")
break
else:
try:
cost += 1 / (
resources[resource_type] // job.resources[resource_type]
)
except KeyError:
pass
for additional_resource_type in [
key for key in drone.pool_resources if key not in job.resources
]:
cost += resources[additional_resource_type]
cost /= len((*job.resources, *drone.pool_resources))
if cost <= 1:
# directly start job
return drone
try:
priorities[cost].append(drone)
except KeyError:
priorities[cost] = [drone]
try:
minimal_key = min(priorities)
if minimal_key < float("Inf"):
return priorities[minimal_key][0]
except ValueError:
pass
return None