Internal modules

Basically you do not need this API but if you are curious feel free to check it out.

streams.executors

This module provides different implementation of concurrent executors suitable to work with streams.poolofpools.PoolOfPools. If Gevent is available then you can also import streams.executors._gevent.GeventExecutor here.

Also it has some class called streams.executors.ParallelExecutor. This is dynamically calculated class for default concurrent execution. If code is monkey patched by Gevent, then it uses streams.executors._gevent.GeventExecutor. Otherwise - streams.executors.executors.ThreadPoolExecutor.

streams.executors.executors

This module has implementation of executors wrapped by streams.executors.mixins.PoolOfPoolsMixin and applicable to work with streams.poolofpools.PoolOfPools.

Basically all of them are thin extensions of classes from concurrent.futures.

class streams.executors.executors.ProcessPoolExecutor(max_workers=None)[source]

Implementation of concurrent.futures.ProcessPoolExecutor applicable to work with streams.poolofpools.PoolOfPools.

class streams.executors.executors.SequentalExecutor(*args, **kwargs)[source]

Debug executor. No concurrency, it just yields elements one by one.

class streams.executors.executors.ThreadPoolExecutor(max_workers)[source]

Implementation of concurrent.futures.ThreadPoolExecutor applicable to work with streams.poolofpools.PoolOfPools.

streams.executors.mixins

This module provides PoolOfPoolMixin only. Basically you need to mix it into concurrent.futures.Executor implementation and it will be possible to use it with PoolOfPools.

class streams.executors.mixins.PoolOfPoolsMixin[source]

Mixin to support streams.poolofpools.PoolOfPools execution properly.

Basically it replaces map implementation and provides some additional interface which helps streams.poolofpools.PoolOfPools to manage executor instance. Current implementation supports expanding only (dynamically increasing, on the fly) the number of workers.

static dummy_callback(*args, **kwargs)[source]

Just a dummy callback if no streams.poolofpools.PoolOfPools.worker_finished() is supplied for the mapper. Basically does nothing. Literally nothing. Good thing though, no bugs.

expand(expand_to)[source]

The hack to increase an amount of workers in executor.

Parameters:expand_to (int) – The amount of worker we need to add to the executor.

Note

It works perfect with streams.executors._gevent.GeventExecutor and concurrent.futures.ThreadPoolExecutor but has some issues with concurrent.futures.ProcessPoolExecutor.

It increases the amount of workers who manage task queue but it is not possible to expand queue itself in a good way (current implementation has a limit of tasks in the queue).

static get_first(queue)[source]

Extracts the result of the execution from the first element of the queue (to support order since a map is ordering function). Also it tries to handle exceptions if presented in the same way as concurrent.futures.ThreadPoolExecutor or concurrent.futures.ProcessPoolExecutor do.

Note

It relies on given implementation of map method in both concurrent.futures.ThreadPoolExecutor and concurrent.futures.ProcessPoolExecutor so if you see some differences in behaviour please create an issue.

map(fn, *iterables, **kwargs)[source]

New implementation of concurrent mapper.

It has 2 new arguments: callback and required_workers

Parameters:
  • callback (Callable) – Callback to execute after map is done
  • required_workers (int) – The amount of workers we have to use for this map procedure.
It differs from default implementation in 2 ways:
  1. It uses the limit of workers (required_workers). It can be less than max workers defined on executor initialization hence it is possible to utilize the same executor for several tasks more efficient.
  2. It doesn’t create a list of futures in memory. Actually it creates only required_workers amount of futures and tries to keep this count the same during whole procedure. Yes, it is not naturally concurrent execution because it just submits task by task but on big iterables it utilizes as less memory as possible providing reasonable concurrency.

streams.executors._gevent

This module provides implementation of streams.executors._gevent.GreenletFuture (thin wrapper around concurrent.futures.Future) and implementation of streams.executors._gevent.GeventExecutor.

Basically you can use concurrent.futures.ThreadPoolExecutor, it is ok and will work but to utilize the power of greenlets more carefully it makes sense to use custom one.

class streams.executors._gevent.GeventExecutor(*args, **kwargs)[source]

Implementation of Gevent executor fully compatible with concurrent.futures.Executor.

class streams.executors._gevent.GreenletFuture(greenlet)[source]

Just a thin wrapper around a concurrent.futures.Future to support greenlets.

streams.iterators

This module contains some useful iterators. Consider it as a small ad-hoc extension pack for itertools.

streams.iterators.accumulate(iterable, function=<built-in function add>)[source]

Implementation of itertools.accumulate() from Python 3.3.

streams.iterators.distinct(iterable)[source]

Filters items from iterable and returns only distinct ones. Keeps order.

Parameters:iterable (Iterable) – Something iterable we have to filter.
>>> list(distinct([1, 2, 3, 2, 1, 2, 3, 4]))
... [1, 2, 3, 4]

Note

This is fair implementation and we have to keep all items in memory.

Note

All items have to be hashable.

streams.iterators.partly_distinct(iterable)[source]

Filters items from iterable and tries to return only distincts. Keeps order.

Parameters:iterable (Iterable) – Something iterable we have to filter.
>>> list(partly_distinct([1, 2, 3, 2, 1, 2, 3, 4]))
... [1, 2, 3, 4]

Note

Unlike distinct() it won’t guarantee that all elements would be distinct. But if you have rather small cardinality of the stream, this would work.

Note

Current implementation guarantees support for 10000 distinct values. If your cardinality is bigger, there might be some duplicates.

streams.iterators.peek(iterable, function)[source]

Does the same as Java 8 peek does.

Parameters:
  • iterable (Iterable) – Iterable we want to peek
  • function (function) – Peek function
>>> def peek_func(item):
...     print "peek", item
>>> list(peek([1, 2, 3], peek_func))
... peek 1
... peek 2
... peek 3
... [1, 2, 3]
streams.iterators.seed(function, seed_value)[source]

Does the same as Java 8 iterate.

Parameters:
  • iterable (Iterable) – Iterable we want to peek
  • function (function) – Peek function
>>> iterator = seed(lambda x: x * 10, 1)
>>> next(iterator)
... 1
>>> next(iterator)
... 10
>>> next(iterator)
... 100

streams.poolofpools

class streams.poolofpools.ExecutorPool(worker_class)[source]

Executor pool for PoolOfPools which does accurate and intelligent management for the pools of predefined classes.

Basically it tries to reuse existing executors if possible. If it is not possible it creates new ones.

Just an example: you’ve done a big mapping of the data in 10 threads. As a rule you need to shutdown and clean this pool. But a bit later you see that you need for the pool of 4 threads. Why not to reuse existing pool? This class allow you to do that and it tracks that 6 threads are idle. So if you will have a task where you need <= 6 threads it will reuse that pool also. Task with 4 threads may continue to work in parallel but you have 6 threads you can occupy. So this is the main idea.

Also it tries to squash pools into single instance if you have several which idle by expanding an amount of workers in one instance throwing out another one.

__init__(worker_class)[source]

Constructor of the class. worker_class has to be a class which supports required interface and behaviour, it has to be an instance of streams.executors.mixins.PoolOfPoolsMixin.

Parameters:worker_class (PoolOfPoolsMixin) – The class of executors this pool has to maintain.
__weakref__

list of weak references to the object (if defined)

get(required_workers)[source]

Returns a mapper which guarantees that you can utilize given number of workers.

Parameters:required_workers (int) – The number of workers you need to utilize for your task.
get_any()[source]

Returns any map function, it is undetermined how many workers does it have. As a rule, you get a minimal amount of workers within a pool of executors.

get_suitable_worker(required_workers)[source]

Returns suitable executor which has required amount of workers. Returns None if nothing is available.

Actually it returns a tuple of worker and a count of workers available for utilization within a given pool. It may be more than required_workers but it can’t be less.

Parameters:required_workers (int) – The amount of workers user requires.
name_to_worker_mapping()[source]

Maps worker names (the result of applying id() to the executor) to executor instances.

real_worker_availability()[source]

Returns mapping of the name for the executor and it real availability. Since worker_finished() does not do any defragmentation of availability it may be possible that internal structure contains multiple controversial information about worker availability. This method is intended to restore the truth.

squash()[source]

Squashes pools and tries to minimize the amount of pools available to avoid unnecessary fragmentation and complexity.

squash_workers(names, avails)[source]

Does actual squashing/defragmentation of internal structure.

worker_finished(worker, required_workers)[source]

The callback used by streams.executors.mixins.PoolOfPoolsMixin.

class streams.poolofpools.PoolOfPools[source]

Just a convenient interface to the set of multiple ExecutorPool instances, nothing more.

__weakref__

list of weak references to the object (if defined)

get(kwargs)[source]

Returns the mapper.

Parameters:kwargs (dict) – Keyword arguments for the mapper. Please checkout streams.Stream.map() documentation to understand what this dict has to have.
static get_from_pool(pool, required_workers)[source]

Fetches mapper from the pool.

Parameters:
  • pool (ExecutorPool) – The pool you want to fetch mapper from.
  • required_workers (int) – The amount of workers you are requiring. It can be None then ExecutorPool.get_any() would be executed.
parallel(required_workers)[source]

Fetches parallel executor mapper from the underlying ExecutorPool.

Parameters:required_workers (int) – The amount of workers you are requiring. It can be None then ExecutorPool.get_any() would be executed.
process(required_workers)[source]

Fetches process executor mapper from the underlying ExecutorPool.

Parameters:required_workers (int) – The amount of workers you are requiring. It can be None then ExecutorPool.get_any() would be executed.

streams.utils

This module contains some utility functions for Streams.

You may wonder why do we need for such simple filter-* functions. The reason is simple and this is about how multiprocessing and therefore concurrent.futures.ProcessPoolExecutor works. It can’t pickle lambdas so we need for whole pickleable functions.

class streams.utils.MaxHeapItem(value)[source]

This is small wrapper around item to give it a possibility to use heaps from heapq as max-heaps. Unfortunately this module provides min-heaps only.

Guys, come on. We need for max-heaps to.

streams.utils.apply_to_tuple(*funcs, **kwargs)[source]

Applies several functions to one item and returns tuple of results.

Parameters:
  • func (list) – The list of functions we need to apply.
  • kwargs (dict) – Keyword arguments with only one mandatory argument, item. Functions would be applied to this item.
>>> apply_to_tuple(int, float, item="1")
... (1, 1.0)
streams.utils.decimal_or_none(item)[source]

Tries to convert item to decimal.Decimal. If it is not possible, returns None.

Parameters:item (object) – Element to convert into decimal.Decimal.
>>> decimal_or_none(1)
... Decimal("1")
>>> decimal_or_none("1")
... Decimal("1")
>>> decimal_or_none("smth")
... None
streams.utils.filter_false(argument)[source]

Opposite to streams.utils.filter_true()

Parameters:argument (tuple) – Argument consists of predicate function and item iteself.
>>> filter_false((lambda x: x <= 5, 5))
... False, 5
>>> filter_false((lambda x: x > 100, 1))
... True, 1
streams.utils.filter_keys(item)[source]

Returns first element of the tuple or item itself.

Parameters:item (object) – It can be tuple, list or just an object.
>>> filter_keys(1)
... 1
>>> filter_keys((1, 2))
... 1
streams.utils.filter_true(argument)[source]

Return the predicate value of given item and the item itself.

Parameters:argument (tuple) – Argument consists of predicate function and item iteself.
>>> filter_true((lambda x: x <= 5, 5))
... True, 5
>>> filter_true((lambda x: x > 100, 1)
... False, 1
streams.utils.filter_values(item)[source]

Returns last element of the tuple or item itself.

Parameters:item (object) – It can be tuple, list or just an object.
>>> filter_values(1)
... 1
>>> filter_values((1, 2))
... 2
streams.utils.float_or_none(item)[source]

Tries to convert item to float(). If it is not possible, returns None.

Parameters:item (object) – Element to convert into float().
>>> float_or_none(1)
... 1.0
>>> float_or_none("1")
... 1.0
>>> float_or_none("smth")
... None
streams.utils.int_or_none(item)[source]

Tries to convert item to int(). If it is not possible, returns None.

Parameters:item (object) – Element to convert into int().
>>> int_or_none(1)
... 1
>>> int_or_none("1")
... 1
>>> int_or_none("smth")
... None
streams.utils.key_mapper(argument)[source]

Maps predicate only to key (first element) of a item. If item is not tuple() then tuplifies it first.

Parameters:argument (tuple) – The tuple of (predicate and item).
>>> key_mapper((lambda x: x + 10, (1, 2)))
... (11, 2)
streams.utils.long_or_none(item)[source]

Tries to convert item to long(). If it is not possible, returns None.

Parameters:item (object) – Element to convert into long().
>>> long_or_none(1)
... 1L
>>> long_or_none("1")
... 1L
>>> long_or_none("smth")
... None
streams.utils.make_list(iterable)[source]

Makes a list from given iterable. But won’t create new one if iterable is a list() or tuple() itself.

Parameters:iterable (Iterable) – Some iterable entity we need to convert into list().
streams.utils.unicode_or_none(item)[source]

Tries to convert item to unicode(). If it is not possible, returns None.

Parameters:item (object) – Element to convert into unicode().
>>> unicode_or_none(1)
... u"1"
>>> unicode_or_none("1")
... u"1"
>>> unicode_or_none("smth")
... u"smth"

Note

This is relevant for Python 2 only. Python 3 will use native str().

streams.utils.value_mapper(argument)[source]

Maps predicate only to value (last element) of a item. If item is not tuple() then tuplifies it first.

Parameters:argument (tuple) – The tuple of (predicate and item).
>>> value_mapper((lambda x: x + 10, (1, 2)))
... (1, 12)