Streams API¶
This chapter contains documentation on Streams API. As a rule you have to use documentation on Stream class only but if you want you can check internals also.
streams¶
streams module contains just a Stream class. Basically you want to use only this class and nothing else from the module.
-
class
streams.Stream(iterator)[source]¶ Stream class provides you with the basic functionality of Streams. Please checkout member documentation to get an examples.
-
__init__(iterator)[source]¶ Initializes the
Stream.Actually it does some smart handling of iterator. If you give it an instance of
dictor its derivatives (such ascollections.OrderedDict), it will iterate through it’s items (key and values). Otherwise just normal iterator would be used.Parameters: iterator (Iterable) – Iterator which has to be converted into Stream.
-
__reversed__()[source]¶ To support
reversed()iterator.
-
all(predicate=<type 'bool'>, **concurrency_kwargs)[source]¶ Check if all elements matching given
predicateexist in the stream. Ifpredicateis not defined,bool()is used.Parameters: - predicate (function) – Predicate to apply to each element of the
Stream. - concurrency_kwargs (dict) – The same concurrency keywords as for
Stream.map().
Returns: The result if we have matched elements or not.
>>> stream = Stream.range(5) >>> stream.all(lambda item: item > 100) ... False
- predicate (function) – Predicate to apply to each element of the
-
any(predicate=<type 'bool'>, **concurrency_kwargs)[source]¶ Check if any element matching given
predicateexists in the stream. Ifpredicateis not defined,bool()is used.Parameters: - predicate (function) – Predicate to apply to each element of the
Stream. - concurrency_kwargs (dict) – The same concurrency keywords as for
Stream.map().
Returns: The result if we have matched elements or not.
>>> stream = Stream.range(5) >>> stream.any(lambda item: item < 100) ... True
- predicate (function) – Predicate to apply to each element of the
-
average()[source]¶ Calculates the average of elements in the stream.
Returns: The average of elements. >>> stream = Stream.range(10000) >>> stream.average() ... 4999.5
-
chain()[source]¶ If elements of the stream are iterable, tries to flat that stream.
Returns: new processed Streaminstance.>>> stream = Stream.range(3) >>> stream = stream.tuplify() >>> stream = stream.chain() >>> list(stream) >>> [0, 0, 1, 1, 2, 2]
-
classmethod
concat(*streams)[source]¶ Lazily concatenates several stream into one. The same as Java 8 concat.
Parameters: streams – The Streaminstances you want to concatenate.Returns: new processed Streaminstance.>>> stream1 = Stream(range(2)) >>> stream2 = Stream(["2", "3", "4"]) >>> stream3 = Stream([list(), dict()]) >>> concatenated_stream = Stream.concat(stream1, stream2, stream3) >>> list(concatenated_stream) ... [0, 1, "2", "3", "4", [], {}]
-
count(element=<object object>)[source]¶ Returns the number of elements in the stream. If
elementis set, returns the count of particular element in the stream.Parameters: element (object) – The element we need to count in the stream Returns: The number of elements of the count of particular element.
-
decimals()[source]¶ Tries to convert everything to
decimal.Decimaland keeps only successful attempts.Returns: new processed Streaminstance.>>> stream = Stream([1, 2.0, "3", "4.0", None, {}]) >>> stream = stream.longs() >>> list(stream) ... [Decimal('1'), Decimal('2'), Decimal('3'), Decimal('4.0')]
Note
It is not the same as
stream.map(Decimal)because it removes failed attempts.Note
It tries to use
cdecimalmodule if possible.
-
distinct()[source]¶ Removes duplicates from the stream.
Returns: new processed Streaminstance.Note
All objects in the stream have to be hashable (support
__hash__()).Note
Please use it carefully. It returns new
Streambut will keep every element in your memory.
-
divisible_by(number)[source]¶ Filters stream for the numbers divisible by the given one.
Parameters: number (int) – Number which every element should be divisible by. Returns: new processed Streaminstance.>>> stream = Stream.range(6) >>> stream = stream.divisible_by(2) >>> list(stream) ... [0, 2, 4]
-
evens()[source]¶ Filters and keeps only even numbers from the stream.
Returns: new processed Streaminstance.>>> stream = Stream.range(6) >>> stream = stream.evens() >>> list(stream) ... [0, 2, 4]
-
exclude(predicate, **concurrency_kwargs)[source]¶ Excludes items from
Streamaccording to the predicate. You can consider behaviour as the same as foritertools.ifilterfalse().As
Stream.filter()it also supports parallelization. Please checkoutStream.map()keyword arguments.Parameters: - predicate (function) – Predicate for filtering elements of the
Stream. - concurrency_kwargs (dict) – The same concurrency keywords as for
Stream.map().
Returns: new processed
Streaminstance.>>> stream = Stream.range(6) >>> stream = stream.exclude(lambda item: item % 2 == 0) >>> list(stream) ... [1, 3, 5]
- predicate (function) – Predicate for filtering elements of the
-
exclude_nones()[source]¶ Excludes
Nonefrom the stream.Returns: new processed Streaminstance.>>> stream = Stream([1, 2, None, 3, None, 4]) >>> stream = stream.exclude_nones() >>> list(stream) ... [1, 2, 3, 4]
-
filter(predicate, **concurrency_kwargs)[source]¶ Does filtering according to the given
predicatefunction. Also it supports parallelization (if predicate is pretty heavy function).You may consider it as equivalent of
itertools.ifilter()but for stream with a possibility to parallelize this process.Parameters: - predicate (function) – Predicate for filtering elements of the
Stream. - concurrency_kwargs (dict) – The same concurrency keywords as for
Stream.map().
Returns: new processed
Streaminstance.>>> stream = Stream.range(5) >>> stream = stream.filter(lambda item: item % 2 == 0) >>> list(stream) ... [0, 2, 4]
- predicate (function) – Predicate for filtering elements of the
-
first¶ Returns a first element from iterator and does not changes internals.
>>> stream = Stream.range(10) >>> stream.first ... 0 >>> stream.first ... 0 >>> list(stream) ... [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
-
floats()[source]¶ Tries to convert everything to
float()and keeps only successful attempts.Returns: new processed Streaminstance.>>> stream = Stream([1, 2, "3", "4", None, {}, 5]) >>> stream = stream.floats() >>> list(stream) ... [1.0, 2.0, 3.0, 4.0, 5.0]
Note
It is not the same as
stream.map(float)because it removes failed attempts.
-
instances_of(cls)[source]¶ Filters and keeps only instances of the given class.
Parameters: cls (class) – Class for filtering. Returns: new processed Streaminstance.>>> int_stream = Stream.range(4) >>> str_stream = Stream.range(4).strings() >>> result_stream = Stream.concat(int_stream, str_stream) >>> result_stream = result_stream.instances_of(str) >>> list(result_stream) ... ['0', '1', '2', '3']
-
ints()[source]¶ Tries to convert everything to
int()and keeps only successful attempts.Returns: new processed Streaminstance.>>> stream = Stream([1, 2, "3", "4", None, {}, 5]) >>> stream = stream.ints() >>> list(stream) ... [1, 2, 3, 4, 5]
Note
It is not the same as
stream.map(int)because it removes failed attempts.
-
classmethod
iterate(function, seed_value)[source]¶ Returns seed stream. The same as for Java 8 iterate.
Returns an infinite sequential ordered Stream produced by iterative application of a function
fto an initial element seed, producing a Stream consisting ofseed,f(seed),f(f(seed)), etc.The first element (position 0) in the Stream will be the provided seed. For
n > 0, the element at position n, will be the result of applying the function f to the element at positionn - 1.Parameters: - function (function) – The function to apply to the seed.
- seed_value (object) – The seed value of the function.
Returns: new processed
Streaminstance.>>> stream = Stream.iterate(lambda value: value ** 2, 2) >>> iterator = iter(stream) >>> next(iterator) ... 2 >>> next(iterator) ... 4 >>> next(iterator) ... 8
-
key_map(predicate, **concurrency_kwargs)[source]¶ Maps only key in (key, value) pair. If element is single one, then it would be
Stream.tuplify()first.Parameters: - predicate (function) – Predicate to apply to the key of element in
the
Stream. - concurrency_kwargs (dict) – The same concurrency keywords as for
Stream.map().
Returns: new processed
Streaminstance.>>> stream = Stream.range(4) >>> stream = stream.tuplify() >>> stream = stream.key_map(lambda item: item ** 3) >>> list(stream) ... [(0, 0), (1, 1), (8, 2), (27, 3)] >>> stream = Stream.range(4) >>> stream = stream.key_map(lambda item: item ** 3) >>> list(stream) ... [(0, 0), (1, 1), (8, 2), (27, 3)]
- predicate (function) – Predicate to apply to the key of element in
the
-
keys()[source]¶ Iterates only keys from the stream (first element from the
tuple). If element is single then it will be used.Returns: new processed Streaminstance.>>> stream = Stream.range(5) >>> stream = stream.key_map(lambda item: item ** 3) >>> stream = stream.keys() >>> list(stream) ... [0, 1, 8, 27, 64]
-
largest(size)[source]¶ Returns
sizelargest elements from the stream.Returns: new processed Streaminstance.>>> stream = Stream.range(3000) >>> stream.largest(5) >>> list(stream) >>> [2999, 2998, 2997, 2996, 2995]
-
limit(size)[source]¶ Limits stream to given
size.Parameters: size (int) – The size of new Stream.Returns: new processed Streaminstance.>>> stream = Stream.range(1000) >>> stream = stream.limit(5) >>> list(stream) ... [0, 1, 2, 3, 4]
-
longs()[source]¶ Tries to convert everything to
long()and keeps only successful attempts.Returns: new processed Streaminstance.>>> stream = Stream([1, 2, "3", "4", None, {}, 5]) >>> stream = stream.longs() >>> list(stream) ... [1L, 2L, 3L, 4L, 5L]
Note
It is not the same as
stream.map(long)because it removes failed attempts.
-
map(predicate, **concurrency_kwargs)[source]¶ The corner method of the
Streamand others are basing on it. It supports parallelization out of box. Actually it works just likeitertools.imap().Parameters: Returns: new processed
Streaminstance.Parallelization is configurable by keywords. There is 2 keywords supported:
parallelandprocess. If you set one keyword toTruethenStreamwould try to map everything concurrently. If you want more intelligent tuning just set the number of workers you want.For example, you have a list of URLs to fetch
>>> stream = Stream(urls)
You can fetch them in parallel
>>> stream.map(requests.get, parallel=True)
By default, the number of workers is the number of cores on your computer. But if you want to have 64 workers, you are free to do it
>>> stream.map(requests.get, parallel=64)
The same for
processwhich will try to use processes.>>> stream.map(requests.get, process=True)
and
>>> stream.map(requests.get, process=64)
Note
Python multiprocessing has its caveats and pitfalls, please use it carefully (especially
predicate). Read the documentation onmultiprocessingand try to google best practices.Note
If you set both
parallelandprocesskeywords onlyparallelwould be used. If you want to disable some type of concurrency just set it toNone.>>> stream.map(requests.get, parallel=None, process=64)
is equal to
>>> stream.map(requests.get, process=64)
The same for
parallel>>> stream.map(requests.get, parallel=True, process=None)
is equal to
>>> stream.map(requests.get, parallel=True)
Note
By default no concurrency is used.
-
median()[source]¶ Returns median value from the stream.
Returns: The median of the stream. >>> stream = Stream.range(10000) >>> stream.median() ... 5000
Note
Please be noticed that all elements from the stream would be fetched in the memory.
-
nth(nth_element)[source]¶ Returns Nth element from the stream.
Parameters: nth_element (int) – Number of element to return. Returns: Nth element. >>> stream = Stream.range(10000) >>> stream.average() ... 4999.5
Note
Please be noticed that all elements from the stream would be fetched in the memory (except of the case where
nth_element == 1).
-
odds()[source]¶ Filters and keeps only odd numbers from the stream.
Returns: new processed Streaminstance.>>> stream = Stream.range(6) >>> stream = stream.odds() >>> list(stream) ... [1, 3, 5]
-
only_falses()[source]¶ Keeps only those elements where
bool(item) == False.Returns: new processed Streaminstance.>>> stream = Stream([1, 2, None, 0, {}, [], 3]) >>> stream = stream.only_trues() >>> list(stream) ... [None, 0, {}, []]
Opposite to
Stream.only_trues().
-
only_nones()[source]¶ Keeps only
Nonein the stream (for example, for counting).Returns: new processed Streaminstance.>>> stream = Stream([1, 2, None, 3, None, 4]) >>> stream = stream.only_nones() >>> list(stream) ... [None, None]
-
only_trues()[source]¶ Keeps only those elements where
bool(element) == True.Returns: new processed Streaminstance.>>> stream = Stream([1, 2, None, 0, {}, [], 3]) >>> stream = stream.only_trues() >>> list(stream) ... [1, 2, 3]
-
partly_distinct()[source]¶ Excludes some duplicates from the memory.
Returns: new processed Streaminstance.Note
All objects in the stream have to be hashable (support
__hash__()).Note
It won’t guarantee you that all duplicates will be removed especially if your stream is pretty big and cardinallity is huge.
-
peek(predicate)[source]¶ Does the same as Java 8 peek.
Parameters: predicate (function) – Predicate to apply on each element. Returns: new processed Streaminstance.Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.
-
classmethod
range(*args, **kwargs)[source]¶ Creates numerial iterator. Absoultely the same as
Stream.range(10)andStream(range(10))(in Python 2:Stream(xrange(10))). All arguments go torange()(xrange()) directly.Returns: new processed Streaminstance.>>> stream = Stream.range(6) >>> list(stream) ... [0, 1, 2, 3 ,4, 5] >>> stream = Stream.range(1, 6) >>> list(stream) ... [1, 2, 3, 4, 5] >>> stream = Stream.range(1, 6, 2) >>> list(stream) ... [1, 3, 5]
-
reduce(function, initial=<object object>)[source]¶ Applies
reduce()for the iteratorParameters: - function (function) – Reduce function
- initial (object) – Initial value (if nothing set, first element) would be used.
>>> Stream = stream.range(5) >>> stream.reduce(operator.add) ... 10
-
regexp(regexp, flags=0)[source]¶ Filters stream according to the regular expression using
re.match(). It also supports the same flags asre.match().Parameters: Returns: new processed
Streaminstance.>>> stream = Stream.range(100) >>> stream = stream.strings() >>> stream = stream.regexp(r"^1") >>> list(stream) ... ['1', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
-
reversed()[source]¶ Reverses the stream.
Returns: new processed Streaminstance.- ... note::
- If underlying iterator won’t support reversing, we are in trouble and need to fetch everything into the memory.
-
skip(size)[source]¶ Skips first
sizeelements.Parameters: size (int) – The amount of elements to skip. Returns: new processed Streaminstance.>>> stream = Stream.range(10) >>> stream = stream.skip(5) >>> list(stream) ... [5, 6, 7, 8, 9]
-
smallest(size)[source]¶ Returns
sizelargest elements from the stream.Returns: new processed Streaminstance.>>> stream = Stream.range(3000) >>> stream.smallest(5) >>> list(stream) >>> [0, 1, 2, 3, 4]
-
sorted(key=None, reverse=False)[source]¶ Sorts the stream elements.
Parameters: - key (function) – Key function for sorting
- reverse (bool) – Do we need to sort in descending order?
Returns: new processed
Streaminstance.- ... note::
- Of course no magic here, we need to fetch all elements for sorting into the memory.
-
strings()[source]¶ Tries to convert everything to
unicode()(strfor Python 3) and keeps only successful attempts.Returns: new processed Streaminstance.>>> stream = Stream([1, 2.0, "3", "4.0", None, {}]) >>> stream = stream.strings() >>> list(stream) ... ['1', '2.0', '3', '4.0', 'None', '{}']
Note
It is not the same as
stream.map(str)because it removes failed attempts.Note
It tries to convert to
unicodeif possible, notbytes.
-
sum()[source]¶ Returns the sum of elements in the stream.
>>> Stream = stream.range(10) >>> stream = stream.decimals() >>> stream = stream.sum() ... Decimal('45')
Note
Do not use
sum()here. It does sum regarding to defined__add__()of the classes. So it can sumdecimal.Decimalwithintfor example.
-
tuplify(clones=2)[source]¶ Tuplifies iterator. Creates a tuple from iterable with
cloneselements.Parameters: clones (int) – The count of elements in result tuple. Returns: new processed Streaminstance.>>> stream = Stream.range(2) >>> stream = stream.tuplify(3) >>> list(stream) ... [(0, 0, 0), (1, 1, 1)]
-
value_map(predicate, **concurrency_kwargs)[source]¶ Maps only value in (key, value) pair. If element is single one, then it would be
Stream.tuplify()first.Parameters: - predicate (function) – Predicate to apply to the value of element
in the
Stream. - concurrency_kwargs (dict) – The same concurrency keywords as for
Stream.map().
Returns: new processed
Streaminstance.>>> stream = Stream.range(4) >>> stream = stream.tuplify() >>> stream = stream.value_map(lambda item: item ** 3) >>> list(stream) ... [(0, 0), (1, 1), (2, 8), (3, 27)] >>> stream = Stream.range(4) >>> stream = stream.value_map(lambda item: item ** 3) >>> list(stream) ... [(0, 0), (1, 1), (2, 8), (3, 27)]
- predicate (function) – Predicate to apply to the value of element
in the
-
values()[source]¶ Iterates only values from the stream (last element from the
tuple). If element is single then it will be used.Returns: new processed Streaminstance.>>> stream = Stream.range(5) >>> stream = stream.key_map(lambda item: item ** 3) >>> stream = stream.values() >>> list(stream) ... [0, 1, 2, 3, 4]
-