As articles state everywhere, we're living in a fast pace digital age. Project complexity, or business growth, challenges existing development patterns. That's why many developers are evolving from the monolithic application toward micro-services. Facebook is moving away from its big blue app. Soundcloud is embracing microservices.
Yet this can be a daunting process, so what for?
Once upon a time, there was a fat code block called Intuition, my algorithmic trading platform. In this post, we will engineer a simplified version, divided into well defined components.
First, we're going to write the business logic, following the single responsibility principle, and one of my favorite code mantras:
Prefer composition over inheritance
The point is to identify key components of the problem, and code a specific solution for each of them. It will articulate our application around the collaboration of clear abstractions.
As an illustration, start with the RandomAlgo class. Python tends to be the go-to language for data analysis and rapid prototyping. It is a great fit for our purpose.
class RandomAlgo(object):
""" Represent the algorithm flow.
Heavily inspired from quantopian.com and processing.org """
def initialize(self, params):
""" Called once to prepare the algo. """
self.threshold = params.get('threshold', 0.5)
# As we will see later, we return here data channels we're interested in
return ['quotes']
def event(self, data):
""" This method is called every time a new batch of data is ready.
:param data: {'sid': 'GOOG', 'quote': '345'} """
# randomly choose to invest or not
if random.random() > self.threshold:
print('buying {0} of {1}'.format(data['quote'], data['sid']))
This implementation focuses on a single thing: detecting buy signals. But once you get such a signal, how do you invest your portfolio? This is the responsibility of a new component.
class Portfolio(object):
def__init__(self, amount):
""" Starting amount of cash we have. """
self.cash = amount
def optimize(self, data):
""" We have a buy signal on this data. Tell us how much cash we should bet. """
# We're still baby traders and we randomly choose what fraction of our cash available to invest
to_invest = random.random() * self.cash
self.cash = self.cash - to_invest
return to_invest
Then we can improve our previous algorithm's event method, taking advantage of composition.
def initialize(self, params):
# ...
self.portfolio = Portfolio(params.get('starting_cash', 10000))
def event(self, data):
# ...
print('buying {0} of {1}'.format(portfolio.optimize(data), data['sid']))
Here are two simple components that produce readable and efficient code. Now we can develop more sophisticated portfolio optimizations without touching the algorithm internals. This is also a huge gain early in a project when we're not sure how things will evolve.
Developers should only focus on this core logic. In the next section, we're going to unfold a separate part of the system. The communication layer will solve one question: how do we produce and consume events?
Let's state the problem. We want each algorithm to receive interesting events and publish its own data. The kind of challenge Internet of Things (IoT) is tackling. We will find empirically that our modular approach allows us to pick the right tool, even within a-priori unrelated fields.
The code below leverages MQTT to bring M2M messaging to the application. Notice we're diversifying our stack with node.js. Indeed it's one of the most convenient languages to deal with event-oriented systems (Javascript, in general, is gaining some traction in the IoT space).
var mqtt = require('mqtt');
// connect to the broker, responsible to route messages
// (thanks mosquitto)
var conn = mqtt.connect('mqtt://test.mosquitto.org');
conn.on('connect', function () {
// we're up ! Time to initialize the algorithm
// and subscribe to interesting messages
});
// triggered on topic we're listening to
conn.on('message', function (topic, message) {
console.log('received data:', message.toString());
// Here, pass it to the algo for processing
});
That's neat! But we still need to connect this messaging layer with the actual python algorithm. RPC (Remote Procedure Call) protocol comes in handy for the task, especially with zerorpc. Here is the full implementation with more explanations.
// command-line interfaces made easy
var program = require('commander');
// the MQTT client for Node.js and the browser
var mqtt = require('mqtt');
// a communication layer for distributed systems
var zerorpc = require('zerorpc');
// import project properties
var pkg = require('./package.json')
// define the cli
program
.version(pkg.version)
.description(pkg.description)
.option('-m, --mqtt [url]', 'mqtt broker address', 'mqtt://test.mosquitto.org')
.option('-r, --rpc [url]', 'rpc server address', 'tcp://127.0.0.1:4242')
.parse(process.argv);
// connect to mqtt broker
var conn = mqtt.connect(program.mqtt);
// connect to rpc peer, the actual python algorithm
var algo = new zerorpc.Client()
algo.connect(program.rpc);
conn.on('connect', function () {
// connections are ready, initialize the algorithm
var conf = { cash: 50000 };
algo.invoke('initialize', conf, function(err, channels, more) {
// the method returns an array of data channels the algorithm needs
for (var i = 0; i < channels.length; i++) {
console.log('subscribing to channel', channels[i]);
conn.subscribe(channels[i]);
}
});
});
conn.on('message', function (topic, message) {
console.log('received data:', message.toString());
// make the algorithm to process the incoming data
algo.invoke('event', JSON.parse(message.toString()), function(err, res, more) {
console.log('algo output:', res);
// we're done
algo.close();
conn.end();
});
});
The code above calls our algorithm's methods. Here is how to expose them over RPC.
import click, zerorpc
# ... algo code ...
@click.command()
@click.option('--addr', default='tcp://127.0.0.1:4242', help='address to bind rpc server')
def serve(addr):
server = zerorpc.Server(RandomAlgo())
server.bind(addr)
click.echo(click.style('serving on {} ...'.format(addr), bold=True, fg='cyan'))
# listen and serve
server.run()
if__name__ == '__main__':
serve()
At this point we are ready to run the app. Let's fire up 3 terminals, install requirements, and make the machines to trade.
sudo apt-get install curl libpython-dev libzmq-dev
# Install pip
curl https://bootstrap.pypa.io/get-pip.py | python
# Algorithm requirements
pip install zerorpc click
# Messaging requirements
npm init
npm install --save commander mqtt zerorpc
# Activate backend
python ma.py --addr tcp://127.0.0.1:4242
# Manipulate algorithm and serve messaging system
node app.js --rpc tcp://127.0.0.1:4242
# Publish messages
node_modules/.bin/mqtt pub -t 'quotes' -h 'test.mosquitto.org' -m '{"goog": 3.45}'
In this state, our implementation is over-engineered. But we designed a sustainable architecture to wire up small components. And from here we can extend the system.
Given our API definition, a contributor can hack on any component without breaking the project as a whole. In a fast pace environment, with constant iterations, this architecture can make or break products.
This is especially true in the raising container world. Assuming we package each component into specialized containers, we smooth the way to a scalable infrastructure that we can test, distribute, deploy and grow.
Not sure where to start when it comes to containers and microservices? Visit our Docker page!
Xavier Bruhiere is the CEO of Hive Tech. He contributes to many community projects, including Occulus Rift, Myo, Docker and Leap Motion. In his spare time he enjoys playing tennis, the violin and the guitar. You can reach him at @XavierBruhiere.