Streams API

This chapter contains documentation on Streams API. As a rule you have to use documentation on Stream class only but if you want you can check internals also.

streams

streams module contains just a Stream class. Basically you want to use only this class and nothing else from the module.

class streams.Stream(iterator)[source]

Stream class provides you with the basic functionality of Streams. Please checkout member documentation to get an examples.

__init__(iterator)[source]

Initializes the Stream.

Actually it does some smart handling of iterator. If you give it an instance of dict or its derivatives (such as collections.OrderedDict), it will iterate through it’s items (key and values). Otherwise just normal iterator would be used.

Parameters:iterator (Iterable) – Iterator which has to be converted into Stream.
__iter__()[source]

To support iteration protocol.

__len__()[source]

To support len() function if given iterator supports it.

__reversed__()[source]

To support reversed() iterator.

all(predicate=<type 'bool'>, **concurrency_kwargs)[source]

Check if all elements matching given predicate exist in the stream. If predicate is not defined, bool() is used.

Parameters:
  • predicate (function) – Predicate to apply to each element of the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords as for Stream.map().
Returns:

The result if we have matched elements or not.

>>> stream = Stream.range(5)
>>> stream.all(lambda item: item > 100)
... False
any(predicate=<type 'bool'>, **concurrency_kwargs)[source]

Check if any element matching given predicate exists in the stream. If predicate is not defined, bool() is used.

Parameters:
  • predicate (function) – Predicate to apply to each element of the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords as for Stream.map().
Returns:

The result if we have matched elements or not.

>>> stream = Stream.range(5)
>>> stream.any(lambda item: item < 100)
... True
average()[source]

Calculates the average of elements in the stream.

Returns:The average of elements.
>>> stream = Stream.range(10000)
>>> stream.average()
... 4999.5
chain()[source]

If elements of the stream are iterable, tries to flat that stream.

Returns:new processed Stream instance.
>>> stream = Stream.range(3)
>>> stream = stream.tuplify()
>>> stream = stream.chain()
>>> list(stream)
>>> [0, 0, 1, 1, 2, 2]
classmethod concat(*streams)[source]

Lazily concatenates several stream into one. The same as Java 8 concat.

Parameters:streams – The Stream instances you want to concatenate.
Returns:new processed Stream instance.
>>> stream1 = Stream(range(2))
>>> stream2 = Stream(["2", "3", "4"])
>>> stream3 = Stream([list(), dict()])
>>> concatenated_stream = Stream.concat(stream1, stream2, stream3)
>>> list(concatenated_stream)
... [0, 1, "2", "3", "4", [], {}]
count(element=<object object>)[source]

Returns the number of elements in the stream. If element is set, returns the count of particular element in the stream.

Parameters:element (object) – The element we need to count in the stream
Returns:The number of elements of the count of particular element.
decimals()[source]

Tries to convert everything to decimal.Decimal and keeps only successful attempts.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2.0, "3", "4.0", None, {}])
>>> stream = stream.longs()
>>> list(stream)
... [Decimal('1'), Decimal('2'), Decimal('3'), Decimal('4.0')]

Note

It is not the same as stream.map(Decimal) because it removes failed attempts.

Note

It tries to use cdecimal module if possible.

distinct()[source]

Removes duplicates from the stream.

Returns:new processed Stream instance.

Note

All objects in the stream have to be hashable (support __hash__()).

Note

Please use it carefully. It returns new Stream but will keep every element in your memory.

divisible_by(number)[source]

Filters stream for the numbers divisible by the given one.

Parameters:number (int) – Number which every element should be divisible by.
Returns:new processed Stream instance.
>>> stream = Stream.range(6)
>>> stream = stream.divisible_by(2)
>>> list(stream)
... [0, 2, 4]
evens()[source]

Filters and keeps only even numbers from the stream.

Returns:new processed Stream instance.
>>> stream = Stream.range(6)
>>> stream = stream.evens()
>>> list(stream)
... [0, 2, 4]
exclude(predicate, **concurrency_kwargs)[source]

Excludes items from Stream according to the predicate. You can consider behaviour as the same as for itertools.ifilterfalse().

As Stream.filter() it also supports parallelization. Please checkout Stream.map() keyword arguments.

Parameters:
  • predicate (function) – Predicate for filtering elements of the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords as for Stream.map().
Returns:

new processed Stream instance.

>>> stream = Stream.range(6)
>>> stream = stream.exclude(lambda item: item % 2 == 0)
>>> list(stream)
... [1, 3, 5]
exclude_nones()[source]

Excludes None from the stream.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, None, 3, None, 4])
>>> stream = stream.exclude_nones()
>>> list(stream)
... [1, 2, 3, 4]
filter(predicate, **concurrency_kwargs)[source]

Does filtering according to the given predicate function. Also it supports parallelization (if predicate is pretty heavy function).

You may consider it as equivalent of itertools.ifilter() but for stream with a possibility to parallelize this process.

Parameters:
  • predicate (function) – Predicate for filtering elements of the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords as for Stream.map().
Returns:

new processed Stream instance.

>>> stream = Stream.range(5)
>>> stream = stream.filter(lambda item: item % 2 == 0)
>>> list(stream)
... [0, 2, 4]
first

Returns a first element from iterator and does not changes internals.

>>> stream = Stream.range(10)
>>> stream.first
... 0
>>> stream.first
... 0
>>> list(stream)
... [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
floats()[source]

Tries to convert everything to float() and keeps only successful attempts.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, "3", "4", None, {}, 5])
>>> stream = stream.floats()
>>> list(stream)
... [1.0, 2.0, 3.0, 4.0, 5.0]

Note

It is not the same as stream.map(float) because it removes failed attempts.

instances_of(cls)[source]

Filters and keeps only instances of the given class.

Parameters:cls (class) – Class for filtering.
Returns:new processed Stream instance.
>>> int_stream = Stream.range(4)
>>> str_stream = Stream.range(4).strings()
>>> result_stream = Stream.concat(int_stream, str_stream)
>>> result_stream = result_stream.instances_of(str)
>>> list(result_stream)
... ['0', '1',  '2', '3']
ints()[source]

Tries to convert everything to int() and keeps only successful attempts.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, "3", "4", None, {}, 5])
>>> stream = stream.ints()
>>> list(stream)
... [1, 2, 3, 4, 5]

Note

It is not the same as stream.map(int) because it removes failed attempts.

classmethod iterate(function, seed_value)[source]

Returns seed stream. The same as for Java 8 iterate.

Returns an infinite sequential ordered Stream produced by iterative application of a function f to an initial element seed, producing a Stream consisting of seed, f(seed), f(f(seed)), etc.

The first element (position 0) in the Stream will be the provided seed. For n > 0, the element at position n, will be the result of applying the function f to the element at position n - 1.

Parameters:
  • function (function) – The function to apply to the seed.
  • seed_value (object) – The seed value of the function.
Returns:

new processed Stream instance.

>>> stream = Stream.iterate(lambda value: value ** 2, 2)
>>> iterator = iter(stream)
>>> next(iterator)
... 2
>>> next(iterator)
... 4
>>> next(iterator)
... 8
key_map(predicate, **concurrency_kwargs)[source]

Maps only key in (key, value) pair. If element is single one, then it would be Stream.tuplify() first.

Parameters:
  • predicate (function) – Predicate to apply to the key of element in the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords as for Stream.map().
Returns:

new processed Stream instance.

>>> stream = Stream.range(4)
>>> stream = stream.tuplify()
>>> stream = stream.key_map(lambda item: item ** 3)
>>> list(stream)
... [(0, 0), (1, 1), (8, 2), (27, 3)]
>>> stream = Stream.range(4)
>>> stream = stream.key_map(lambda item: item ** 3)
>>> list(stream)
... [(0, 0), (1, 1), (8, 2), (27, 3)]
keys()[source]

Iterates only keys from the stream (first element from the tuple). If element is single then it will be used.

Returns:new processed Stream instance.
>>> stream = Stream.range(5)
>>> stream = stream.key_map(lambda item: item ** 3)
>>> stream = stream.keys()
>>> list(stream)
... [0, 1, 8, 27, 64]
largest(size)[source]

Returns size largest elements from the stream.

Returns:new processed Stream instance.
>>> stream = Stream.range(3000)
>>> stream.largest(5)
>>> list(stream)
>>> [2999, 2998, 2997, 2996, 2995]
limit(size)[source]

Limits stream to given size.

Parameters:size (int) – The size of new Stream.
Returns:new processed Stream instance.
>>> stream = Stream.range(1000)
>>> stream = stream.limit(5)
>>> list(stream)
... [0, 1, 2, 3, 4]
longs()[source]

Tries to convert everything to long() and keeps only successful attempts.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, "3", "4", None, {}, 5])
>>> stream = stream.longs()
>>> list(stream)
... [1L, 2L, 3L, 4L, 5L]

Note

It is not the same as stream.map(long) because it removes failed attempts.

map(predicate, **concurrency_kwargs)[source]

The corner method of the Stream and others are basing on it. It supports parallelization out of box. Actually it works just like itertools.imap().

Parameters:
  • predicate (function) – Predicate to map each element of the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords.
Returns:

new processed Stream instance.

Parallelization is configurable by keywords. There is 2 keywords supported: parallel and process. If you set one keyword to True then Stream would try to map everything concurrently. If you want more intelligent tuning just set the number of workers you want.

For example, you have a list of URLs to fetch

>>> stream = Stream(urls)

You can fetch them in parallel

>>> stream.map(requests.get, parallel=True)

By default, the number of workers is the number of cores on your computer. But if you want to have 64 workers, you are free to do it

>>> stream.map(requests.get, parallel=64)

The same for process which will try to use processes.

>>> stream.map(requests.get, process=True)

and

>>> stream.map(requests.get, process=64)

Note

Python multiprocessing has its caveats and pitfalls, please use it carefully (especially predicate). Read the documentation on multiprocessing and try to google best practices.

Note

If you set both parallel and process keywords only parallel would be used. If you want to disable some type of concurrency just set it to None.

>>> stream.map(requests.get, parallel=None, process=64)

is equal to

>>> stream.map(requests.get, process=64)

The same for parallel

>>> stream.map(requests.get, parallel=True, process=None)

is equal to

>>> stream.map(requests.get, parallel=True)

Note

By default no concurrency is used.

median()[source]

Returns median value from the stream.

Returns:The median of the stream.
>>> stream = Stream.range(10000)
>>> stream.median()
... 5000

Note

Please be noticed that all elements from the stream would be fetched in the memory.

nth(nth_element)[source]

Returns Nth element from the stream.

Parameters:nth_element (int) – Number of element to return.
Returns:Nth element.
>>> stream = Stream.range(10000)
>>> stream.average()
... 4999.5

Note

Please be noticed that all elements from the stream would be fetched in the memory (except of the case where nth_element == 1).

odds()[source]

Filters and keeps only odd numbers from the stream.

Returns:new processed Stream instance.
>>> stream = Stream.range(6)
>>> stream = stream.odds()
>>> list(stream)
... [1, 3, 5]
only_falses()[source]

Keeps only those elements where bool(item) == False.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, None, 0, {}, [], 3])
>>> stream = stream.only_trues()
>>> list(stream)
... [None, 0, {}, []]

Opposite to Stream.only_trues().

only_nones()[source]

Keeps only None in the stream (for example, for counting).

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, None, 3, None, 4])
>>> stream = stream.only_nones()
>>> list(stream)
... [None, None]
only_trues()[source]

Keeps only those elements where bool(element) == True.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2, None, 0, {}, [], 3])
>>> stream = stream.only_trues()
>>> list(stream)
... [1, 2, 3]
partly_distinct()[source]

Excludes some duplicates from the memory.

Returns:new processed Stream instance.

Note

All objects in the stream have to be hashable (support __hash__()).

Note

It won’t guarantee you that all duplicates will be removed especially if your stream is pretty big and cardinallity is huge.

peek(predicate)[source]

Does the same as Java 8 peek.

Parameters:predicate (function) – Predicate to apply on each element.
Returns:new processed Stream instance.

Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.

classmethod range(*args, **kwargs)[source]

Creates numerial iterator. Absoultely the same as Stream.range(10) and Stream(range(10)) (in Python 2: Stream(xrange(10))). All arguments go to range() (xrange()) directly.

Returns:new processed Stream instance.
>>> stream = Stream.range(6)
>>> list(stream)
... [0, 1, 2, 3 ,4, 5]
>>> stream = Stream.range(1, 6)
>>> list(stream)
... [1, 2, 3, 4, 5]
>>> stream = Stream.range(1, 6, 2)
>>> list(stream)
... [1, 3, 5]
reduce(function, initial=<object object>)[source]

Applies reduce() for the iterator

Parameters:
  • function (function) – Reduce function
  • initial (object) – Initial value (if nothing set, first element) would be used.
>>> Stream = stream.range(5)
>>> stream.reduce(operator.add)
... 10
regexp(regexp, flags=0)[source]

Filters stream according to the regular expression using re.match(). It also supports the same flags as re.match().

Parameters:
  • regexp (str) – Regular expression for filtering.
  • flags (int) – Flags from re.
Returns:

new processed Stream instance.

>>> stream = Stream.range(100)
>>> stream = stream.strings()
>>> stream = stream.regexp(r"^1")
>>> list(stream)
... ['1', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
reversed()[source]

Reverses the stream.

Returns:new processed Stream instance.
... note::
If underlying iterator won’t support reversing, we are in trouble and need to fetch everything into the memory.
skip(size)[source]

Skips first size elements.

Parameters:size (int) – The amount of elements to skip.
Returns:new processed Stream instance.
>>> stream = Stream.range(10)
>>> stream = stream.skip(5)
>>> list(stream)
... [5, 6, 7, 8, 9]
smallest(size)[source]

Returns size largest elements from the stream.

Returns:new processed Stream instance.
>>> stream = Stream.range(3000)
>>> stream.smallest(5)
>>> list(stream)
>>> [0, 1, 2, 3, 4]
sorted(key=None, reverse=False)[source]

Sorts the stream elements.

Parameters:
  • key (function) – Key function for sorting
  • reverse (bool) – Do we need to sort in descending order?
Returns:

new processed Stream instance.

... note::
Of course no magic here, we need to fetch all elements for sorting into the memory.
strings()[source]

Tries to convert everything to unicode() (str for Python 3) and keeps only successful attempts.

Returns:new processed Stream instance.
>>> stream = Stream([1, 2.0, "3", "4.0", None, {}])
>>> stream = stream.strings()
>>> list(stream)
... ['1', '2.0', '3', '4.0', 'None', '{}']

Note

It is not the same as stream.map(str) because it removes failed attempts.

Note

It tries to convert to unicode if possible, not bytes.

sum()[source]

Returns the sum of elements in the stream.

>>> Stream = stream.range(10)
>>> stream = stream.decimals()
>>> stream = stream.sum()
... Decimal('45')

Note

Do not use sum() here. It does sum regarding to defined __add__() of the classes. So it can sum decimal.Decimal with int for example.

tuplify(clones=2)[source]

Tuplifies iterator. Creates a tuple from iterable with clones elements.

Parameters:clones (int) – The count of elements in result tuple.
Returns:new processed Stream instance.
>>> stream = Stream.range(2)
>>> stream = stream.tuplify(3)
>>> list(stream)
... [(0, 0, 0), (1, 1, 1)]
value_map(predicate, **concurrency_kwargs)[source]

Maps only value in (key, value) pair. If element is single one, then it would be Stream.tuplify() first.

Parameters:
  • predicate (function) – Predicate to apply to the value of element in the Stream.
  • concurrency_kwargs (dict) – The same concurrency keywords as for Stream.map().
Returns:

new processed Stream instance.

>>> stream = Stream.range(4)
>>> stream = stream.tuplify()
>>> stream = stream.value_map(lambda item: item ** 3)
>>> list(stream)
... [(0, 0), (1, 1), (2, 8), (3, 27)]
>>> stream = Stream.range(4)
>>> stream = stream.value_map(lambda item: item ** 3)
>>> list(stream)
... [(0, 0), (1, 1), (2, 8), (3, 27)]
values()[source]

Iterates only values from the stream (last element from the tuple). If element is single then it will be used.

Returns:new processed Stream instance.
>>> stream = Stream.range(5)
>>> stream = stream.key_map(lambda item: item ** 3)
>>> stream = stream.values()
>>> list(stream)
... [0, 1, 2, 3, 4]