Internal modules¶
Basically you do not need this API but if you are curious feel free to check it out.
streams.executors¶
This module provides different implementation of concurrent executors suitable
to work with streams.poolofpools.PoolOfPools
. If Gevent is
available then you can also import
streams.executors._gevent.GeventExecutor
here.
Also it has some class called streams.executors.ParallelExecutor
.
This is dynamically calculated class for default concurrent execution. If
code is monkey patched by Gevent, then it uses
streams.executors._gevent.GeventExecutor
. Otherwise -
streams.executors.executors.ThreadPoolExecutor
.
streams.executors.executors¶
This module has implementation of executors wrapped by
streams.executors.mixins.PoolOfPoolsMixin
and applicable to work
with streams.poolofpools.PoolOfPools
.
Basically all of them are thin extensions of classes from
concurrent.futures
.
-
class
streams.executors.executors.
ProcessPoolExecutor
(max_workers=None)[source]¶ Implementation of
concurrent.futures.ProcessPoolExecutor
applicable to work withstreams.poolofpools.PoolOfPools
.
-
class
streams.executors.executors.
SequentalExecutor
(*args, **kwargs)[source]¶ Debug executor. No concurrency, it just yields elements one by one.
-
class
streams.executors.executors.
ThreadPoolExecutor
(max_workers)[source]¶ Implementation of
concurrent.futures.ThreadPoolExecutor
applicable to work withstreams.poolofpools.PoolOfPools
.
streams.executors.mixins¶
This module provides PoolOfPoolMixin
only. Basically you need to
mix it into concurrent.futures.Executor
implementation and it will
be possible to use it with PoolOfPools
.
-
class
streams.executors.mixins.
PoolOfPoolsMixin
[source]¶ Mixin to support
streams.poolofpools.PoolOfPools
execution properly.Basically it replaces map implementation and provides some additional interface which helps
streams.poolofpools.PoolOfPools
to manage executor instance. Current implementation supports expanding only (dynamically increasing, on the fly) the number of workers.-
static
dummy_callback
(*args, **kwargs)[source]¶ Just a dummy callback if no
streams.poolofpools.PoolOfPools.worker_finished()
is supplied for the mapper. Basically does nothing. Literally nothing. Good thing though, no bugs.
-
expand
(expand_to)[source]¶ The hack to increase an amount of workers in executor.
Parameters: expand_to (int) – The amount of worker we need to add to the executor. Note
It works perfect with
streams.executors._gevent.GeventExecutor
andconcurrent.futures.ThreadPoolExecutor
but has some issues withconcurrent.futures.ProcessPoolExecutor
.It increases the amount of workers who manage task queue but it is not possible to expand queue itself in a good way (current implementation has a limit of tasks in the queue).
-
static
get_first
(queue)[source]¶ Extracts the result of the execution from the first element of the queue (to support order since a
map
is ordering function). Also it tries to handle exceptions if presented in the same way asconcurrent.futures.ThreadPoolExecutor
orconcurrent.futures.ProcessPoolExecutor
do.Note
It relies on given implementation of
map
method in bothconcurrent.futures.ThreadPoolExecutor
andconcurrent.futures.ProcessPoolExecutor
so if you see some differences in behaviour please create an issue.
-
map
(fn, *iterables, **kwargs)[source]¶ New implementation of concurrent mapper.
It has 2 new arguments:
callback
andrequired_workers
Parameters: - callback (Callable) – Callback to execute after map is done
- required_workers (int) – The amount of workers we have to use for this map procedure.
- It differs from default implementation in 2 ways:
- It uses the limit of workers (
required_workers
). It can be less than max workers defined on executor initialization hence it is possible to utilize the same executor for several tasks more efficient. - It doesn’t create a list of futures in memory. Actually it
creates only
required_workers
amount of futures and tries to keep this count the same during whole procedure. Yes, it is not naturally concurrent execution because it just submits task by task but on big iterables it utilizes as less memory as possible providing reasonable concurrency.
- It uses the limit of workers (
-
static
streams.executors._gevent¶
This module provides implementation of
streams.executors._gevent.GreenletFuture
(thin wrapper around
concurrent.futures.Future
) and implementation of
streams.executors._gevent.GeventExecutor
.
Basically you can use concurrent.futures.ThreadPoolExecutor
, it is
ok and will work but to utilize the power of greenlets more carefully it makes
sense to use custom one.
streams.iterators¶
This module contains some useful iterators. Consider it as a small ad-hoc
extension pack for itertools
.
-
streams.iterators.
accumulate
(iterable, function=<built-in function add>)[source]¶ Implementation of
itertools.accumulate()
from Python 3.3.
-
streams.iterators.
distinct
(iterable)[source]¶ Filters items from iterable and returns only distinct ones. Keeps order.
Parameters: iterable (Iterable) – Something iterable we have to filter. >>> list(distinct([1, 2, 3, 2, 1, 2, 3, 4])) ... [1, 2, 3, 4]
Note
This is fair implementation and we have to keep all items in memory.
Note
All items have to be hashable.
-
streams.iterators.
partly_distinct
(iterable)[source]¶ Filters items from iterable and tries to return only distincts. Keeps order.
Parameters: iterable (Iterable) – Something iterable we have to filter. >>> list(partly_distinct([1, 2, 3, 2, 1, 2, 3, 4])) ... [1, 2, 3, 4]
Note
Unlike
distinct()
it won’t guarantee that all elements would be distinct. But if you have rather small cardinality of the stream, this would work.Note
Current implementation guarantees support for 10000 distinct values. If your cardinality is bigger, there might be some duplicates.
-
streams.iterators.
peek
(iterable, function)[source]¶ Does the same as Java 8 peek does.
Parameters: - iterable (Iterable) – Iterable we want to peek
- function (function) – Peek function
>>> def peek_func(item): ... print "peek", item >>> list(peek([1, 2, 3], peek_func)) ... peek 1 ... peek 2 ... peek 3 ... [1, 2, 3]
-
streams.iterators.
seed
(function, seed_value)[source]¶ Does the same as Java 8 iterate.
Parameters: - iterable (Iterable) – Iterable we want to peek
- function (function) – Peek function
>>> iterator = seed(lambda x: x * 10, 1) >>> next(iterator) ... 1 >>> next(iterator) ... 10 >>> next(iterator) ... 100
streams.poolofpools¶
-
class
streams.poolofpools.
ExecutorPool
(worker_class)[source]¶ Executor pool for
PoolOfPools
which does accurate and intelligent management for the pools of predefined classes.Basically it tries to reuse existing executors if possible. If it is not possible it creates new ones.
Just an example: you’ve done a big mapping of the data in 10 threads. As a rule you need to shutdown and clean this pool. But a bit later you see that you need for the pool of 4 threads. Why not to reuse existing pool? This class allow you to do that and it tracks that 6 threads are idle. So if you will have a task where you need <= 6 threads it will reuse that pool also. Task with 4 threads may continue to work in parallel but you have 6 threads you can occupy. So this is the main idea.
Also it tries to squash pools into single instance if you have several which idle by expanding an amount of workers in one instance throwing out another one.
-
__init__
(worker_class)[source]¶ Constructor of the class. worker_class has to be a class which supports required interface and behaviour, it has to be an instance of
streams.executors.mixins.PoolOfPoolsMixin
.Parameters: worker_class (PoolOfPoolsMixin) – The class of executors this pool has to maintain.
-
__weakref__
¶ list of weak references to the object (if defined)
-
get
(required_workers)[source]¶ Returns a mapper which guarantees that you can utilize given number of workers.
Parameters: required_workers (int) – The number of workers you need to utilize for your task.
-
get_any
()[source]¶ Returns any map function, it is undetermined how many workers does it have. As a rule, you get a minimal amount of workers within a pool of executors.
-
get_suitable_worker
(required_workers)[source]¶ Returns suitable executor which has required amount of workers. Returns
None
if nothing is available.Actually it returns a tuple of worker and a count of workers available for utilization within a given pool. It may be more than
required_workers
but it can’t be less.Parameters: required_workers (int) – The amount of workers user requires.
-
name_to_worker_mapping
()[source]¶ Maps worker names (the result of applying
id()
to the executor) to executor instances.
-
real_worker_availability
()[source]¶ Returns mapping of the name for the executor and it real availability. Since
worker_finished()
does not do any defragmentation of availability it may be possible that internal structure contains multiple controversial information about worker availability. This method is intended to restore the truth.
-
squash
()[source]¶ Squashes pools and tries to minimize the amount of pools available to avoid unnecessary fragmentation and complexity.
-
worker_finished
(worker, required_workers)[source]¶ The callback used by
streams.executors.mixins.PoolOfPoolsMixin
.
-
-
class
streams.poolofpools.
PoolOfPools
[source]¶ Just a convenient interface to the set of multiple
ExecutorPool
instances, nothing more.-
__weakref__
¶ list of weak references to the object (if defined)
-
get
(kwargs)[source]¶ Returns the mapper.
Parameters: kwargs (dict) – Keyword arguments for the mapper. Please checkout streams.Stream.map()
documentation to understand what this dict has to have.
-
static
get_from_pool
(pool, required_workers)[source]¶ Fetches mapper from the pool.
Parameters: - pool (ExecutorPool) – The pool you want to fetch mapper from.
- required_workers (int) – The amount of workers you are requiring.
It can be
None
thenExecutorPool.get_any()
would be executed.
-
parallel
(required_workers)[source]¶ Fetches parallel executor mapper from the underlying
ExecutorPool
.Parameters: required_workers (int) – The amount of workers you are requiring. It can be None
thenExecutorPool.get_any()
would be executed.
-
process
(required_workers)[source]¶ Fetches process executor mapper from the underlying
ExecutorPool
.Parameters: required_workers (int) – The amount of workers you are requiring. It can be None
thenExecutorPool.get_any()
would be executed.
-
streams.utils¶
This module contains some utility functions for Streams.
You may wonder why do we need for such simple filter-*
functions. The
reason is simple and this is about how multiprocessing
and therefore
concurrent.futures.ProcessPoolExecutor
works. It can’t pickle
lambdas so we need for whole pickleable functions.
-
class
streams.utils.
MaxHeapItem
(value)[source]¶ This is small wrapper around item to give it a possibility to use heaps from
heapq
as max-heaps. Unfortunately this module provides min-heaps only.Guys, come on. We need for max-heaps to.
-
streams.utils.
apply_to_tuple
(*funcs, **kwargs)[source]¶ Applies several functions to one
item
and returns tuple of results.Parameters: >>> apply_to_tuple(int, float, item="1") ... (1, 1.0)
-
streams.utils.
decimal_or_none
(item)[source]¶ Tries to convert
item
todecimal.Decimal
. If it is not possible, returnsNone
.Parameters: item (object) – Element to convert into decimal.Decimal
.>>> decimal_or_none(1) ... Decimal("1") >>> decimal_or_none("1") ... Decimal("1") >>> decimal_or_none("smth") ... None
-
streams.utils.
filter_false
(argument)[source]¶ Opposite to
streams.utils.filter_true()
Parameters: argument (tuple) – Argument consists of predicate function and item iteself. >>> filter_false((lambda x: x <= 5, 5)) ... False, 5 >>> filter_false((lambda x: x > 100, 1)) ... True, 1
-
streams.utils.
filter_keys
(item)[source]¶ Returns first element of the tuple or
item
itself.Parameters: item (object) – It can be tuple, list or just an object. >>> filter_keys(1) ... 1 >>> filter_keys((1, 2)) ... 1
-
streams.utils.
filter_true
(argument)[source]¶ Return the predicate value of given item and the item itself.
Parameters: argument (tuple) – Argument consists of predicate function and item iteself. >>> filter_true((lambda x: x <= 5, 5)) ... True, 5 >>> filter_true((lambda x: x > 100, 1) ... False, 1
-
streams.utils.
filter_values
(item)[source]¶ Returns last element of the tuple or
item
itself.Parameters: item (object) – It can be tuple, list or just an object. >>> filter_values(1) ... 1 >>> filter_values((1, 2)) ... 2
-
streams.utils.
float_or_none
(item)[source]¶ Tries to convert
item
tofloat()
. If it is not possible, returnsNone
.Parameters: item (object) – Element to convert into float()
.>>> float_or_none(1) ... 1.0 >>> float_or_none("1") ... 1.0 >>> float_or_none("smth") ... None
-
streams.utils.
int_or_none
(item)[source]¶ Tries to convert
item
toint()
. If it is not possible, returnsNone
.Parameters: item (object) – Element to convert into int()
.>>> int_or_none(1) ... 1 >>> int_or_none("1") ... 1 >>> int_or_none("smth") ... None
-
streams.utils.
key_mapper
(argument)[source]¶ Maps
predicate
only to key (first element) of aitem
. Ifitem
is nottuple()
then tuplifies it first.Parameters: argument (tuple) – The tuple of ( predicate
anditem
).>>> key_mapper((lambda x: x + 10, (1, 2))) ... (11, 2)
-
streams.utils.
long_or_none
(item)[source]¶ Tries to convert
item
tolong()
. If it is not possible, returnsNone
.Parameters: item (object) – Element to convert into long()
.>>> long_or_none(1) ... 1L >>> long_or_none("1") ... 1L >>> long_or_none("smth") ... None
-
streams.utils.
make_list
(iterable)[source]¶ Makes a list from given
iterable
. But won’t create new one ifiterable
is alist()
ortuple()
itself.Parameters: iterable (Iterable) – Some iterable entity we need to convert into list()
.
-
streams.utils.
unicode_or_none
(item)[source]¶ Tries to convert
item
tounicode()
. If it is not possible, returnsNone
.Parameters: item (object) – Element to convert into unicode()
.>>> unicode_or_none(1) ... u"1" >>> unicode_or_none("1") ... u"1" >>> unicode_or_none("smth") ... u"smth"
Note
This is relevant for Python 2 only. Python 3 will use native
str()
.