Source code for streams.poolofpools

# -*- coding: utf-8 -*-


###############################################################################


from collections import defaultdict
from functools import partial
from multiprocessing import cpu_count
from threading import RLock

from six import iteritems, iterkeys, itervalues

from .executors import ParallelExecutor, ProcessPoolExecutor
from .executors.mixins import PoolOfPoolsMixin


###############################################################################


[docs]class ExecutorPool(object): """ Executor pool for :py:class:`PoolOfPools` which does accurate and intelligent management for the pools of predefined classes. Basically it tries to reuse existing executors if possible. If it is not possible it creates new ones. Just an example: you've done a big mapping of the data in 10 threads. As a rule you need to shutdown and clean this pool. But a bit later you see that you need for the pool of 4 threads. Why not to reuse existing pool? This class allow you to do that and it tracks that 6 threads are idle. So if you will have a task where you need <= 6 threads it will reuse that pool also. Task with 4 threads may continue to work in parallel but you have 6 threads you can occupy. So this is the main idea. Also it tries to squash pools into single instance if you have several which idle by expanding an amount of workers in one instance throwing out another one. """
[docs] def __init__(self, worker_class): """ Constructor of the class. worker_class has to be a class which supports required interface and behaviour, it has to be an instance of :py:class:`streams.executors.mixins.PoolOfPoolsMixin`. :param PoolOfPoolsMixin worker_class: The class of executors this pool has to maintain. """ assert issubclass(worker_class, PoolOfPoolsMixin) self.worker_class = worker_class self.workers = defaultdict(lambda: []) self.lock = RLock()
[docs] def get_any(self): """ Returns any map function, it is undetermined how many workers does it have. As a rule, you get a minimal amount of workers within a pool of executors. """ with self.lock: return self.get(min(iterkeys(self.workers)))
[docs] def get(self, required_workers): """ Returns a mapper which guarantees that you can utilize given number of workers. :param int required_workers: The number of workers you need to utilize for your task. """ assert required_workers > 0 with self.lock: self.squash() worker, availability = self.get_suitable_worker(required_workers) if worker is None: worker = self.worker_class(required_workers) availability = required_workers availability -= required_workers if availability > 0: self.workers[availability].append(worker) return partial(worker.map, required_workers=required_workers, callback=self.worker_finished)
[docs] def squash(self): """ Squashes pools and tries to minimize the amount of pools available to avoid unnecessary fragmentation and complexity. """ if not self.workers: return with self.lock: for avail in list(iterkeys(self.workers)): if not self.workers[avail]: self.workers.pop(avail) self.squash_workers(self.name_to_worker_mapping(), self.real_worker_availability())
[docs] def get_suitable_worker(self, required_workers): """ Returns suitable executor which has required amount of workers. Returns ``None`` if nothing is available. Actually it returns a tuple of worker and a count of workers available for utilization within a given pool. It may be more than ``required_workers`` but it can't be less. :param int required_workers: The amount of workers user requires. """ with self.lock: min_available, suspected_workers = None, None for availability, workers in iteritems(self.workers): if availability >= required_workers: if min_available is None or min_available < availability: min_available = availability suspected_workers = workers if min_available is not None: return suspected_workers.pop(), min_available return None, 0
[docs] def worker_finished(self, worker, required_workers): """ The callback used by :py:class:`streams.executors.mixins.PoolOfPoolsMixin`. """ with self.lock: self.workers[required_workers].append(worker)
[docs] def name_to_worker_mapping(self): """ Maps worker names (the result of applying :py:func:`id` to the executor) to executor instances. """ with self.lock: name_to_workers = {} for workers in itervalues(self.workers): name_to_workers.update( (id(worker), worker) for worker in workers ) return name_to_workers
[docs] def real_worker_availability(self): """ Returns mapping of the name for the executor and it real availability. Since :py:meth:`worker_finished` does not do any defragmentation of availability it may be possible that internal structure contains multiple controversial information about worker availability. This method is intended to restore the truth. """ with self.lock: real_availability = defaultdict(lambda: []) for avail, workers in iteritems(self.workers): for wrk in workers: real_availability[id(wrk)].append(avail) for name in iterkeys(real_availability): real_availability[name] = max(real_availability[name]) availability_to_workers = defaultdict(lambda: []) for worker_name, avail in iteritems(real_availability): availability_to_workers[avail].append(worker_name) return availability_to_workers
[docs] def squash_workers(self, names, avails): """ Does actual squashing/defragmentation of internal structure. """ self.workers = defaultdict(lambda: []) avails_to_traverse = set(iterkeys(avails)) while avails_to_traverse: minimal_avail = min(avails_to_traverse) avails_to_traverse.discard(minimal_avail) workers = avails[minimal_avail] selected_worker = names[workers[0]] if len(workers) == 1: self.workers[minimal_avail] = [selected_worker] else: selected_worker.expand(minimal_avail * (len(workers) - 1)) extended_avail = minimal_avail * len(workers) avails_to_traverse.add(extended_avail) avails.pop(minimal_avail)
[docs]class PoolOfPools(object): """ Just a convenient interface to the set of multiple :py:class:`ExecutorPool` instances, nothing more. """ @staticmethod
[docs] def get_from_pool(pool, required_workers): """ Fetches mapper from the pool. :param ExecutorPool pool: The pool you want to fetch mapper from. :param int required_workers: The amount of workers you are requiring. It can be ``None`` then :py:meth:`ExecutorPool.get_any` would be executed. """ if required_workers is None: return pool.get_any() return pool.get(required_workers)
def __init__(self): self.parallels = ExecutorPool(ParallelExecutor) self.processes = ExecutorPool(ProcessPoolExecutor) self.default_count = cpu_count()
[docs] def parallel(self, required_workers): """ Fetches parallel executor mapper from the underlying :py:class:`ExecutorPool`. :param int required_workers: The amount of workers you are requiring. It can be ``None`` then :py:meth:`ExecutorPool.get_any` would be executed. """ return self.get_from_pool(self.parallels, required_workers)
[docs] def process(self, required_workers): """ Fetches process executor mapper from the underlying :py:class:`ExecutorPool`. :param int required_workers: The amount of workers you are requiring. It can be ``None`` then :py:meth:`ExecutorPool.get_any` would be executed. """ return self.get_from_pool(self.processes, required_workers)
[docs] def get(self, kwargs): """ Returns the mapper. :param dict kwargs: Keyword arguments for the mapper. Please checkout :py:meth:`streams.Stream.map` documentation to understand what this dict has to have. """ if "parallel" in kwargs: parallel = kwargs["parallel"] if parallel in (1, True): return self.parallel(self.default_count) if parallel is not None: return self.parallel(parallel) if "process" in kwargs: process = kwargs["process"] if process in (1, True): return self.process(self.default_count) if process is not None: return self.process(process)