# -*- coding: utf-8 -*-
"""
This module provides :py:class:`PoolOfPoolMixin` only. Basically you need to
mix it into :py:class:`concurrent.futures.Executor` implementation and it will
be possible to use it with :py:class:`PoolOfPools`.
"""
###############################################################################
from collections import deque
from itertools import islice
from sys import exc_info
from six import reraise
# noinspection PyUnresolvedReferences
from six.moves import zip as izip
###############################################################################
[docs]class PoolOfPoolsMixin(object):
"""
Mixin to support :py:class:`streams.poolofpools.PoolOfPools` execution
properly.
Basically it replaces map implementation and provides some additional
interface which helps :py:class:`streams.poolofpools.PoolOfPools` to
manage executor instance. Current implementation supports expanding only
(dynamically increasing, on the fly) the number of workers.
"""
@staticmethod
[docs] def dummy_callback(*args, **kwargs):
"""
Just a dummy callback if no
:py:meth:`streams.poolofpools.PoolOfPools.worker_finished` is supplied
for the mapper. Basically does nothing. Literally nothing. Good thing
though, no bugs.
"""
pass
# noinspection PyBroadException
@staticmethod
[docs] def get_first(queue):
"""
Extracts the result of the execution from the first element of the
queue (to support order since a ``map`` is ordering function). Also
it tries to handle exceptions if presented in the same way as
:py:class:`concurrent.futures.ThreadPoolExecutor` or
:py:class:`concurrent.futures.ProcessPoolExecutor` do.
.. note::
It relies on given implementation of ``map`` method in both
:py:class:`concurrent.futures.ThreadPoolExecutor` and
:py:class:`concurrent.futures.ProcessPoolExecutor` so if you
see some differences in behaviour please create an issue.
"""
first_future = queue.popleft()
try:
result = first_future.result()
except:
for future in queue:
future.cancel()
reraise(*exc_info())
else:
return result
finally:
first_future.cancel()
# noinspection PyUnresolvedReferences
[docs] def expand(self, expand_to):
"""
The hack to increase an amount of workers in executor.
:param int expand_to: The amount of worker we need to add to the
executor.
.. note::
It works perfect with
:py:class:`streams.executors._gevent.GeventExecutor` and
:py:class:`concurrent.futures.ThreadPoolExecutor` but has some
issues with :py:class:`concurrent.futures.ProcessPoolExecutor`.
It increases the amount of workers who manage task queue but
it is not possible to expand queue itself in a good way (current
implementation has a limit of tasks in the queue).
"""
assert expand_to >= 0
self._max_workers += expand_to
# noinspection PyUnresolvedReferences
[docs] def map(self, fn, *iterables, **kwargs):
"""
New implementation of concurrent mapper.
It has 2 new arguments: ``callback`` and ``required_workers``
:param Callable callback: Callback to execute after map is done
:param int required_workers: The amount of workers we have to use
for this map procedure.
It differs from default implementation in 2 ways:
1. It uses the limit of workers (``required_workers``). It can be
less than max workers defined on executor initialization
hence it is possible to utilize the same executor for several
tasks more efficient.
2. It doesn't create a list of futures in memory. Actually it
creates only ``required_workers`` amount of futures and tries
to keep this count the same during whole procedure. Yes, it is
not naturally concurrent execution because it just submits
task by task but on big iterables it utilizes as less memory
as possible providing reasonable concurrency.
"""
callback = kwargs.get("callback", self.dummy_callback)
worker_count = kwargs.get("required_workers", self._max_workers)
worker_count = max(worker_count, 1)
queue = deque()
args_iterator = izip(*iterables)
for args in islice(args_iterator, worker_count):
queue.append(self.submit(fn, *args))
for args in args_iterator:
yield self.get_first(queue)
queue.append(self.submit(fn, *args))
while queue:
yield self.get_first(queue)
callback(self, worker_count)