Python generators

Neatly manage stream processing pipelines for medium data

In these days of medium data [data that is too big to fit in memory on a single machine, but could otherwise be processed by one], it's important to know what features your programming language offers to help you process data using streaming. Generator functions in Python are one such feature. [For brevity, this post will refer to generator functions as generators].

Generators vs functions

Generators differ from functions in that while a function can only return a value once, generators allow you to "return" multiple times. The keyword return is not used for this however; instead yield is.

Basic examples

def generate_words():
    yield 'first'
    yield 'second'
    yield 'third'

# Will print out: "first, second, third, "
words = generate_words()
for word in words:
    print(word, end=', ')

Often the keyword yield is itself in a loop. For example, the above is equivalent to the below.

def generate_words():
    for word in ['first', 'second', 'third']:
        yield word

# Will print out: "first, second, third, "
words = generate_words()
for word in words:
    print(word, end=', ')

Generators can then be chained together to make a pipeline.

def generate_words():
    for word in ['first', 'second', 'third']:
        yield word

def process_words(words):
    for word in words:
        yield f'{word} apple'

# Will print out: "first apple, second apple, third apple, "
words = generate_words()
processed_words = process_words(words)
for word in processed_words:
    print(word, end=', ')

Declaring a generator

A generator is declared as a function, i.e. with def function_name(, but somewhere in its body it contains at least one instance of the keyword yield. [See criticism]

Using a generator

As in the above example, you call the generator and iterate over its return value: each yield encountered results in an iteration.

Purpose of generators

They allow you to

  • process data in a streaming way [i.e. without loading it all into memory at once];
  • while separating code with different responsibilities [which is often seen as a good thing], into a pipeline of chained generators;
  • without having to write custom classes that implement __iter__ and __next__ [which is quite boilerplate heavy, and so can be hard to see the data processing that's going on].

The earlier basic examples show some of this: the data generation code is separate from the processing code, which is separate from the printing code.

Real-world example

A more real-world, and complex, example is constructing a CSV dump of table from a Postgres database using psycopg2, using a single database query [not using an ORM], and streaming it from a Django view. You could do something like the following, which uses a chain of generators.

import csv
import os
import psycopg2
from django.http import StreamingHttpResponse

def fetch_db_rows(dsn, query):
    # Named cursor => server side cursor. Without this, _all_ data from the
    # query will be fetched into memory, which defeats the purpose of streaming
    with \
            psycopg2.connect(dsn) as conn, \
            conn.cursor(name='my_csv_dump') as cur:
        cur.itersize = 5000
        cur.arraysize = 5000
        cur.execute(query)

        while True:
            rows = cur.fetchmany()
            for row in rows:
                yield row
            if not rows:
                break

def to_csv_bytes(rows):
    # Python's csv.writer expects a file-like object with a `write` method
    # We give it one, but the `write` method just returns the bytes passed
    class DummyFile:
        def write(self, value):
            return value

    csv_writer = csv.writer(DummyFile(), quoting=csv.QUOTE_NONNUMERIC)

    for row in rows:
        yield csv_writer.writerow(row).encode('utf-8')

def my_streaming_csv_view(_):
    dsn = os.environ['DSN']
    query = 'SELECT * FROM my_table ORDER BY id'

    # Define a pipeline of two steps
    db_rows = fetch_db_rows(dsn, query)
    csv_bytes = to_csv_bytes(db_rows)

    # Django/the web-server adds to this pipeline: adding HTTP chunked
    # headers, and sending data to the underlying socket
    return StreamingHttpResponse(csv_bytes, content_type='text/csv')

This is ok, especially in terms of separation of components in the pipeline: fetching from the database is separate from conversion to CSV, which in turn is separate from any HTTP concerns.

However, StreamingHttpResponse has some hidden behaviour [depending on configuration/web-server]. Every time it iterates over the value passed, it results in an HTTP chunk [which includes a short header] being sent through various layers of code, to the socket, and maybe even over the wire. Especially if each CSV line is small, this has quite a high overhead per byte of actual data.

We can do better.

We can send HTTP chunks of [for example] 8KB, even if each CSV row is smaller or bigger than 8KB. To do this we add a buffering step to the pipeline: we don't need to touch the database code, the CSV-writing code, or the code of StreamingHttpResponse.

def buffer(chunk_size, bytes_items):
    queue = []
    queue_length = 0

    for bytes_item in bytes_items:
        queue.append(bytes_item)
        queue_length += len(bytes_item)

        while queue_length >= chunk_size:
            to_send_bytes = b''.join(queue)
            chunk, to_send_bytes = \
                to_send_bytes[:chunk_size], to_send_bytes[chunk_size:]

            queue = \
                [to_send_bytes] if to_send_bytes else \
                []
            queue_length = len(to_send_bytes)

            yield chunk

    if queue_length:
        yield b''.join(queue)

def my_streaming_csv_view(_):
    dsn = os.environ['DSN']
    query = 'SELECT * FROM my_table ORDER BY id'

    # Our pipeline has an added step to buffer the data into 8KB chunks
    db_rows = fetch_db_rows(dsn, query)
    csv_bytes = to_csv_bytes(db_rows)
    chunked_csv_bytes = buffer(8192, csv_bytes)

    return StreamingHttpResponse(chunked_csv_bytes, content_type='text/csv')

This mechanism of being able to define and maintain a pipeline, in code, with the minimum of boilerplate, and without needing any extra libraries, in my opinion is wonderful.

Possible limitations

Each component in the above pipeline is on the same thread: none of them can actually do anything while another is processing. Specifically, between each of the fetchmany calls to the database, the database connection is sitting idle, when it could have been fetching the next batch of data. You may be tempted to do something about this by using threads[/similar] with another intermediate buffer.

My suspicion is that in a lot of situations this wouldn't increase performance. The downstream connection to the client would already have buffers, and its bandwidth is likely to be much lower than upstream to the database, so the buffers would never get empty: the downstream connection may already be sending as many bytes per second as it can. Threads[/similar] may do nothing but use more resources on the server, hurting performance and increasing costs. My recommendation is to KISS and only complicate the pipeline if you have good evidence it will help in your production cases.

Criticism

I find it odd/confusing that the only way to differentiate a function from a generator is to look inside its body and look for the keyword yield. Thinking how asyncio functions are declared with async def my_func():, I think generator def my_func(): would have been clearer and more consistent.