User Guide

I supposed you’ve worked with Django and you’ve been using its ORM a lot. I will try to lead you to the idea of functional streams by example. Actually I did no Django for a while and syntax might be outdated a bit or I may confuse you so you are free to correct me through issue or pull request. Please do it, I appreciate your feedback.

If you didn’t work with any ORM just try to follow the idea, I will try to explain what is going on and things that really matter.

What is Stream?

Let’s go back to default Django example: libraries and books. Let’s assume that we have app up and running and it does some data management from your beloved database. Let’s say you want to fetch some recent books.

from library.models import Book

books = Book.objects.filter(pub_date__year=2014)

Good, isn’t it? You have a collection of models called Book which possibly presents books in your app. And you want to have only those which were published in 2014. Good, figured out. Let’s go further. Let’s say you want to be more specific and you want to have only bestsellers. It is ok.

from library.models import Book

books = Book.objects.filter(pub_date__year=2014)
bestsellers = books.order_by("-sales_count")[:10]

You can do it like this. But why is it better than this approach?

from operator import attrgetter
from library.models import Book

books = Book.objects.all()
books = [book for book in books if book.pub_date.year == 2014]
bestsellers = sorted(books, key=attrgetter("sales_count"), reverse=True)
bestsellers = bestsellers[:10]

You will get the same result, right? Actually no. Look, on filtering step you fetch all objects from the database and process them all. It is ok if you have a dozen of models in your database but it can be big bottleneck if your data is growing. That’s why everyone is trying to move as much filtering as possible into the database. Database knows how to manage your data accurately and what do to in the most efficient way. It will use indexes etc to speedup whole process and you do not need to do full scan everytime. It is best practice to fetch only that data you actually need from the database.

So instead of

SELECT * FROM book

you do

SELECT * FROM book
    WHERE EXTRACT(year FROM "pub_date") == 2014
    ORDER BY sales_count DESC
    LIMIT 10

Sounds legit. But let’s checkout how it looks like when do you work with ORM. Let’s go back to our example:

books = Book.objects.filter(pub_date__year=2014)
bestsellers = books.order_by("-sales_count")[:10]

or in a short way

bestsellers = Book.objects \
    .filter(pub_date__year=2014) \
    .order_by("-sales_count")[:10]

You may assume it like a data stream you are processing on every step. First you set initial source of data, this is Book.objects.all(). Good. You may consider it as an iterable flow of data and you apply processing functions on that stream, first if filtering, second is sorting, third is slicing. You process the flow, not every objects, this is crucial concept. Everytime after execution of some flow (or QuerySet) method you get another instance of the same flow but with your modifications.

You may suppose that Streams library to provide you the same functionality but for any iterable. Of course this is not that efficient as Django ORM which knows the context of database and helps you to execute your queries in the most efficient way.

How to use Streams

Now you got an idea of Streams: to manage data flow itself, not every component. You can build your own toy map/reduce stuff with it if you really need to have it. Our you can just filter and process your data to exclude some Nones etc in parallel or to have some generic way to do it. It is up to you, I’ll just show you some examples and if you want to have more information just go to the API documentation

So, for simplicity let’s assume that you have giant gzipped CSV, in 10 GB. And you can use only 1GB of your memory so it is not possible to put everything in memory at once. This CSV has 5 columns, author_id, book_name.

Yeah, books again. Why not?

So your boss asked you to implement function which will read this csvfile and do some optional filtering on it. Also you must fetch the data from predefined external sources, search on prices in different shops (Amazon at least) and write some big XML file with an average price.

I some explanation on the go.

from csv import reader
from gzip import open as gzopen
from collections import namedtuple
try:
    from xml.etree import cElementTree as ET
except ImportError:
    from xml.etree import ElementTree as ET
from streams import Stream
from other_module import shop_prices_fetch, author_fetch, publisher_fetch


def extract_averages(csv_filename, xml_filename,
                     author_prefix=None, count=None, publisher=None, shops=None,
                     available=None):
    file_handler = gzopen(csv_filename, "r")
    try:
        csv_iterator = reader(file_handler)

        # great, we have CSV iterator right now which will read our
        # file line by line now let's convert it to stream
        stream = Stream(csv_iterator)

        # now let's fetch author names. Since every row looks like a
        # tuple of (key, value) where key is an author_id and value is
        # a book name we can do key_mapping here. And let's do it in
        # parallel it is I/O bound
        stream = stream.key_map(author_fetch, parallel=True)

        # okay, now let's keep only author name here
        stream = stream.key_map(lambda author: author["name"])

        # we have author prefix, right?
        if author_prefix is not None:
            stream = stream.filter(lambda (author, book): author.startswith(author_prefix))

        # let's fetch publisher now. Let's do it in 10 threads
        if publisher is not None:
            stream = stream.map(
                lambda (author, book): (author, book, publisher_fetch(author, book)),
                parallel=10
            )
            stream = stream.filter(lambda item: item[-1] == publisher)
            # we do not have to have publisher now, let's remove it
            stream = stream.map(lambda item: item[:2])

        # good. Let's compose the list of shops here
        stream.map(
            lambda (author, book): (author, book, shop_prices_fetch(author, book, shops))
        )

        # now let's make averages
        stream.map(lambda item: item[:2] + sum(item[3]) / len(item[3]))

        # let's remove unavailable books now.
        if available is not None:
            if available:
                stream = stream.filter(lambda item: item[-1])
            else:
                stream = stream.filter(lambda item: not item[-1])

        # ok, great. Now we have only those entries which we are requiring
        # let's compose xml now. Remember whole our data won't fit in memory.
        with open(xml_filename, "w") as xml:
            xml.write("<?xml version='1.0' encoding='UTF-8' standalone='yes'?>\n")
            xml.write("<books>\n")
            for author, book, average in stream:
                book_element = ET.Element("book")
                ET.SubElement(book_element, "name").text = unicode(book)
                ET.SubElement(book_element, "author").text = unicode(author)
                ET.SubElement(book_element, "average_price").text = unicode(average)
                xml.write(ET.dumps(book_element) + "\n")
            xml.write("</books>\n")
    finally:
        file_handler.close()

That’s it. On every step we’ve manipulated with given stream to direct it in the way we need. We’ve parallelized where neccessary and actually nothing was executed before we started to iterate the stream. Stream is lazy and it yields one record by one so we haven’t swaped.

I guess it is a time to proceed to API documentation. Actually you need to check only Stream class methods documentation, the rest of are utility ones.