7 min read

On the Confluent website, you can find this title:

Stream data changes everything

From the createors of Kafka, a real-time messaging system, this is not a surprising assertion. Yet, data streaming infrastructures have gained in popularity and many projects require the data to be processed as soon as it shows up. This contributed to the development of famous technologies like Spark Stremaing, Apache Storm and more broadly websockets.

This latest piece of software in particular brought real-time data feeds to web applications, trying to solve low-latency connections. Coupled with the asynchronous Node.js, you can build a powerful event-based reactive system. But what about Python? Given the popularity of the language in data science, would it be possible to bring the benefits of this kind of data ingestion? As this two-part post series will show, it turns out that modern Python (Python 3.4 or later) supports asynchronous data streaming apps.

Introducing asyncio

Python 3.4 introduced in the standard library the module asyncio to provision the language with:

Asynchronous I/O, event loop, coroutines and tasks

While Python treats functions as first-class objects (meaning you can assign them to variables and pass them as arguments), most developers follow an imperative programming style. It seems on purpose:

It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code. – Guido van Rossum

So Asyncio is the pythonic answer to asynchronous programming. This paradigm makes a lot of sense for otherwise costly I/O operations or when we need events to trigger code.


For fun and profit, let’s build such a project. We will simulate a dummy electrical circuit composed of three components:

  • A clock regularly ticking
  • A board I/O pin randomly choosing to toggle its binary state on clock events
  • A buzzer buzzing when the I/O pin flips to one

This set us up with an interesting machine-to-machine communication problem to solve.

Note that the code snippets in this post make use of features like async and await introduced in Python 3.5. While it would be possible to backport to Python 3.4, I highly recommend that you follow along with the same version or newer. Anaconda or Pyenv can ease the installation process if necessary.

$ python --version
Python 3.5.1

$ pip --version
pip 8.1.2

Asynchronous webscoket Client/Server

Our first step, the clock, will introduce both asyncio and websocket basics. We need a straightforward method that fires tick signals through a websocket and wait for acknowledgement.

# filename: sketch.py

async def clock(socket, port, tacks=3, delay=1)

The async keyword is sugar syntaxing introduced in Python 3.5 to replace the previous @asyncio.coroutine. The official pep 492 explains it all but the tldr : API quality.

To simplify websocket connection plumbing, we can take advantage of the eponymous package: pip install websockets==3.5.1. It hides the protocol’s complexity behind an elegant context manager.

# filename: sketch.py

# the path "datafeed" in this uri will be a parameter available in the other side but we won't use it for this example
uri = 'ws://{socket}:{port}/datafeed'.format(socket=socket, port=port)

# manage asynchronously the connection
async with websockets.connect(uri) as ws:
for payload in range(tacks):
     print('[ clock ] > {}'.format(payload))
     # send payload and wait for acknowledgement
     await ws.send(str(payload))
     print('[ clock ] < {}'.format(await ws.recv()))

The keyword await was introduced with async and replaces the old yield from to read values from asynchronous functions. Inside the context manager the connection stays open and we can stream data to the server we contacted.

The server: IOPin

At the core of our application are entities capable of speaking to each other directly. To make things fun, we will expose the same API as Arduino sketches, or a setup method that runs once at startup and a loop called when new data is available.

# -*- coding: utf-8 -*-
# vim_fenc=utf-8
# filename: factory.py

import abc
import asyncio

import websockets

class FactoryLoop(object):
   """ Glue components to manage the evented-loop model. """

   __metaclass__ = abc.ABCMeta

   def__init__(self, *args, **kwargs):
       # call user-defined initialization
       self.setup(*args, **kwargs)

   def out(self, text):
       print('[ {} ] {}'.format(type(self).__name__, text))

   def setup(self, *args, **kwargs):

   async def loop(self, channel, data):

   def run(self, host, port):
           server = websockets.serve(self.loop, host, port)
           self.out('serving on {}:{}'.format(host, port))
           self.out('Cannot bind to this port! Is the server already running?')
           self.out('Keyboard interruption, aborting.')

The child objects will be required to implement setup and loop, while this class will take care of:

  • Initializing the sketch
  • Registering a websocket server based on a asynchronous callback (loop)
  • Telling the event loop to poll for… events

The websockets states the server callback is expected to have the signature on_connection(websocket, path). This is too low-level for our purpose. Instead, we can write a decorator to manage asyncio details, message passing, or error handling. We will only call self.loop with application-level-relevant information: the actual message and the websocket path.

# filename: factory.py

import functools

import websockets

def reactive(fn):

   async def on_connection(klass, websocket, path):
       """Dispatch events and wrap execution."""
       klass.out('** new client connected, path={}'.format(path))
       # process messages as long as the connection is opened or
       # an error is raised

               message = await websocket.recv()
               aknowledgement = await fn(klass, path, message)
               await websocket.send(aknowledgement or 'n/a')
           except websockets.exceptions.ConnectionClosed as e:
               klass.out('done processing messages: {}n'.format(e))
   return on_connection

Now we can develop a readable IOPin object.

# filename: sketch.py

import factory

class IOPin(factory.FactoryLoop):
   """Set an IO pin to 0 or 1 randomly."""

   def setup(self, chance=0.5, sequence=3):
       self.chance = chance
       self.sequence = chance

   def state(self):
       """Toggle state, sometimes."""
       return0if random.random() < self.chance else1

   async def loop(self, channel, msg):
       """Callback on new data."""
       self.out('new tick triggered on {}: {}'.format(channel, msg))
       bits_stream = [self.state() for _ in range(self.sequence)]
       self.out('toggling pin state: {}'.format(bits_stream))
       # ...
      # ... toggle pin state here
       # ...

We finally need some glue to run both the clock and IOPin and test if the latter toggles its state when the former fires new ticks. The following snippet uses a convenient library, click 6.6, to parse command-line arguments.

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim_fenc=utf-8
# filename: arduino.py

import sys
import asyncio

import click

import sketchs

@click.option('-s', '--socket', default='localhost', help='Websocket to bind to')
@click.option('-p', '--port', default=8765, help='Websocket port to bind to')
@click.option('-t', '--tacks', default=5, help='Number of clock ticks')
@click.option('-d', '--delay', default=1, help='Clock intervals')
def main(sketch, **flags):
   if sketch == 'clock':
       # delegate the asynchronous execution to the event loop
   elif sketch == 'iopin':
       # arguments in the constructor go as is to our `setup` method
       sketchs.IOPin(chance=0.6).run(flags['socket'], flags['port'])
       print('unknown sketch, please choose clock, iopin or buzzer')


if__name__ == '__main__':

Don’t forget to chmod +x the script and start the server in a first terminal ./arduino.py iopin. When it is listening for connections, start the clock with ./arduino.py clock and watch them communicate! Note that we used here common default host and port so they can find each other.

We have a good start with our app, and now in Part 2 we will further explore peer-to-peer communication, service discovery, and the streaming machine-to-machine concept.

About the author

Xavier Bruhiere is a lead developer at AppTurbo in Paris, where he develops innovative prototypes to support company growth. He is addicted to learning, hacking on intriguing hot techs (both soft and hard), and practicing high intensity sports.


Please enter your comment!
Please enter your name here