4 min read

This two-part series explores asynchronous programming with Python using Asyncio. In Part 1 of this series, we started by building a project that shows how you can use Reactive Python in asynchronous programming. Let’s pick it back up here by exploring peer-to-peer communication and then just touching on service discovery before examining the streaming machine-to-machine concept.

Peer-to-peer communication

So far we’ve established a websocket connection to process clock events asynchronously. Now that one pin swings between 1’s and 0’s, let’s wire a buzzer and pretend it buzzes on high states (1) and remains silent on low ones (0). We can rephrase that in Python, like so:

# filename: sketches.py

import factory

class Buzzer(factory.FactoryLoop):
    """Buzz on light changes."""

    def setup(self, sound):
        # customize buzz sound
        self.sound = sound

    async def loop(self, channel, signal):
        behavior = self.sound if signal == '1' else '...'
        self.out('signal {} received -> {}'.format(signal, behavior))
        return behavior

So how do we make them to communicate? Since they share a common parent class, we implement a stream method to send arbitrary data and acknowledge reception with, also, arbitrary data. To sum up, we want IOPin to use this API:

class IOPin(factory.FactoryLoop):

    # [ ... ]

    async def loop(self, channel, msg):
        # [ ... ]
        await self.stream('buzzer', bits_stream)
        return 'acknowledged'

Service discovery

The first challenge to solve is service discovery. We need to target specific nodes within a fleet of reactive workers.

This topic, however, goes past the scope of this post series. The shortcut below will do the job (that is, hardcode the nodes we will start), while keeping us focused on reactive messaging.

# -*- coding: utf-8 -*-
# vim_fenc=utf-8
# filename: mesh.py

"""Provide nodes network knowledge."""

import websockets

class Node(object):

    def __init__(self, name, socket, port):
        print('[ mesh ] registering new node: {}'.format(name))
        self.name = name
        self._socket = socket
        self._port = port

    def uri(self, path):
        return 'ws://{socket}:{port}/{path}'.format(socket=self._socket,

    def connection(self, path=''):
        # instanciate the same connection as `clock` method
        return websockets.connect(self.uri(path))

# TODO service discovery
def grid():
    """Discover and build nodes network."""
    # of course a proper service discovery should be used here
    # see consul or zookkeeper for example

    # note: clock is not a server so it doesn't need a port
    return [
        Node('clock', 'localhost', None),
        Node('blink', 'localhost', 8765),
        Node('buzzer', 'localhost', 8765 + 1)

Streaming machine-to-machine chat

Let’s provide FactoryLoop with the knowledge of the grid and implement an asynchronous communication channel.

# filename: factory.py (continued)

import mesh

class FactoryLoop(object):

  def __init__(self, *args, **kwargs):
      # now every instance will know about the other ones
      self.grid = mesh.grid()

    # ...

  def node(self, name):
      """Search for the given node in the grid."""
      return next(filter(lambda x: x.name == name, self.grid))

  async def stream(self, target, data, channel):
      self.out('starting to stream message to {}'.format(target))

      # use the node webscoket connection defined in mesh.py
      # the method is exactly the same as the clock
      async with self.node(target).connection(channel) as ws:
          for partial in data:
              self.out('> sending payload: {}'.format(partial))
              # websockets requires bytes or strings
              await ws.send(str(partial))

We added a bit of debugging lines to better understand how the data flows through the network. Every implementation of the FactoryLoop can both react to events and communicate with other nodes it is aware of.

Wrapping up

Time to update arduino.py and run our cluster of three reactive workers in three

# [ ... ]def main(sketch, **flags):
# [ ... ] elif sketch == 'buzzer':
sketchs.Buzzer(sound='buzz buzz buzz').run(flags['socket'], flags['port'])

Launch three terminals or use a tool such as foreman to spawn multiple processes. Either way, keep in mind that you will need to track the scripts output.

way, keep in mind that you will need to track the scripts output.
$ # start IOPin and Buzzer on the same ports we hardcoded in mesh.py
$ ./arduino.py buzzer --port 8766
$ ./arduino.py iopin --port 8765

$ # now that they listen, trigger actions with the clock (targetting IOPin port)
$ ./arduino.py clock --port 8765
[ ... ]

$ # Profit !

We just saw one worker reacting to a clock and another reacting to randomly generated events. The websocket protocol allowed us to exchange streaming data and receive arbitrary responses, unlocking sophisticated fleet orchestration. While we limited this example to two nodes, a powerful service discovery mechanism could bring to life a distributed network of microservices.

By completing this post series, you should now have a better understanding of how to use Python with Asyncio for asynchronous programming.

About the author

Xavier Bruhiere is a lead developer at AppTurbo in Paris, where he develops innovative prototypes to support company growth. He is addicted to learning, hacking on intriguing hot techs (both soft and hard), and practicing high-intensity sports.

Subscribe to the weekly Packt Hub newsletter. We'll send you this year's Skill Up Developer Skills Report.

* indicates required


Please enter your comment!
Please enter your name here