Using Execnet for Parallel and Distributed Processing with NLTK

8 min read


Python Text Processing with NLTK 2.0 Cookbook

Python Text Processing with NLTK 2.0 Cookbook

Use Python’s NLTK suite of libraries to maximize your Natural Language Processing capabilities.

  • Quickly get to grips with Natural Language Processing – with Text Analysis, Text Mining, and beyond
  • Learn how machines and crawlers interpret and process natural languages
  • Easily work with huge amounts of data and learn how to handle distributed processing
  • Part of Packt’s Cookbook series: Each recipe is a carefully organized sequence of instructions to complete the task as efficiently as possible    


NLTK is great for in-memory single-processor natural language processing. However, there are times when you have a lot of data to process and want to take advantage of multiple CPUs, multi-core CPUs, and even multiple computers. Or perhaps you want to store frequencies and probabilities in a persistent, shared database so multiple processes can access it simultaneously. For the first case, we’ll be using execnet to do parallel and distributed processing with NLTK. For the second case, you’ll learn how to use the Redis data structure server/database to store frequency distributions and more.

Distributed tagging with execnet

Execnet is a distributed execution library for python. It allows you to create gateways and channels for remote code execution. A gateway is a connection from the calling process to a remote environment. The remote environment can be a local subprocess or an SSH connection to a remote node. A channel is created from a gateway and handles communication between the channel creator and the remote code.

Since many NLTK processes require 100 percent CPU utilization during computation, execnet is an ideal way to distribute that computation for maximum resource usage. You can create one gateway per CPU core, and it doesn’t matter whether the cores are in your local computer or spread across remote machines. In many situations, you only need to have the trained objects and data on a single machine, and can send the objects and data to the remote nodes as needed.

Getting ready

You’ll need to install execnet for this to work. It should be as simple as sudo pip install execnet or sudo easy_install execnet. The current version of execnet, as of this writing, is 1.0.8. The execnet homepage, which has API documentation and examples, is at

How to do it…

We start by importing the required modules, as well as an additional module remote_tag. py that will be explained in the next section. We also need to import pickle so we can serialize the tagger. Execnet does not natively know how to deal with complex objects such as a part-of-speech tagger, so we must dump the tagger to a string using pickle.dumps(). We’ll use the default tagger that’s used by the nltk.tag.pos_tag() function, but you could load and dump any pre-trained part-of-speech tagger as long as it implements the TaggerI interface.

Once we have a serialized tagger, we start execnet by making a gateway with execnet. makegateway(). The default gateway creates a Python subprocess, and we can call the remote_exec() method with the remote_tag module to create a channel. With an open channel, we send over the serialized tagger and then the first tokenized sentence of the treebank corpus.

You don’t have to do any special serialization of simple types such as lists and tuples, since execnet already knows how to handle serializing the built-in types.

Now if we call channel.receive(), we get back a tagged sentence that is equivalent to the first tagged sentence in the treebank corpus, so we know the tagging worked. We end by exiting the gateway, which closes the channel and kills the subprocess.

>>> import execnet, remote_tag, nltk.tag,
>>> from nltk.corpus import treebank
>>> import cPickle as pickle
>>> tagger = pickle.dumps(
>>> gw = execnet.makegateway()
>>> channel = gw.remote_exec(remote_tag)
>>> channel.send(tagger)
>>> channel.send(treebank.sents()[0])
>>> tagged_sentence = channel.receive()
>>> tagged_sentence == treebank.tagged_sents()[0] True
>>> gw.exit()

Visually, the communication process looks like this:

Using Execnet for Parallel and Distributed Processing with NLTK

How it works…

The gateway’s remote_exec() method takes a single argument that can be one of the following three types:

  1. A string of code to execute remotely.
  2. The name of a pure function that will be serialized and executed remotely.
  3. The name of a pure module whose source will be executed remotely.

We use the third option with the module, which is defined as follows:

import cPickle as pickle

if __name__ == ‘__channelexec__’:
tagger = pickle.loads(channel.receive())

for sentence in channel:

A pure module is a module that is self-contained. It can only access Python modules that are available where it executes, and does not have access to any variables or states that exist wherever the gateway is initially created. To detect that the module is being executed by execnet, you can look at the __name__ variable. If it’s equal to ‘__channelexec__’, then it is being used to create a remote channel. This is similar to doing if __name__ == ‘__ main__’ to check if a module is being executed on the command line.

The first thing we do is call channel.receive() to get the serialized tagger, which we load using pickle.loads(). You may notice that channel is not imported anywhere—that’s because it is included in the global namespace of the module. Any module that execnet executes remotely has access to the channel variable in order to communicate with the channel` creator.

Once we have the tagger, we iteratively tag() each tokenized sentence that we receive from the channel. This allows us to tag as many sentences as the sender wants to send, as iteration will not stop until the channel is closed. What we’ve essentially created is a compute node for part-of-speech tagging that dedicates 100 percent of its resources to tagging whatever sentences it receives. As long as the channel remains open, the node is available for processing.

There’s more…

This is a simple example that opens a single gateway and channel. But execnet can do a lot more, such as opening multiple channels to increase parallel processing, as well as opening gateways to remote hosts over SSH to do distributed processing.

Multiple channels

We can create multiple channels, one per gateway, to make the processing more parallel. Each gateway creates a new subprocess (or remote interpreter if using an SSH gateway) and we use one channel per gateway for communication. Once we’ve created two channels, we can combine them using the MultiChannel class, which allows us to iterate over the channels, and make a receive queue to receive messages from each channel.

After creating each channel and sending the tagger, we cycle through the channels to send an even number of sentences to each channel for tagging. Then we collect all the responses from the queue. A call to queue.get() will return a 2-tuple of (channel, message) in case you need to know which channel the message came from.

If you don’t want to wait forever, you can also pass a timeout keyword argument with the maximum number of seconds you want to wait, as in queue.get(timeout=4). This can be a good way to handle network errors.

Once all the tagged sentences have been collected, we can exit the gateways. Here’s the code:

>>> import itertools
>>> gw1 = execnet.makegateway()
>>> gw2 = execnet.makegateway()
>>> ch1 = gw1.remote_exec(remote_tag)
>>> ch1.send(tagger)
>>> ch2 = gw2.remote_exec(remote_tag)
>>> ch2.send(tagger)
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> queue = mch.make_receive_queue()
>>> channels = itertools.cycle(mch)
>>> for sentence in treebank.sents()[:4]:
… channel =
… channel.send(sentence)
>>> tagged_sentences = [] >>> for i in range(4):
… channel, tagged_sentence = queue.get()
… tagged_sentences.append(tagged_sentence)
>>> len(tagged_sentences)
>>> gw1.exit()
>>> gw2.exit()

Local versus remote gateways

The default gateway spec is popen, which creates a Python subprocess on the local machine. This means execnet.makegateway() is equivalent to execnet. makegateway(‘popen’). If you have passwordless SSH access to a remote machine, then you can create a remote gateway using execnet.makegateway(‘ssh=remotehost’) where remotehost should be the hostname of the machine. A SSH gateway spawns a new Python interpreter for executing the code remotely. As long as the code you’re using for remote execution is pure, you only need a Python interpreter on the remote machine.

Channels work exactly the same no matter what kind of gateway is used; the only difference will be communication time. This means you can mix and match local subprocesses with remote interpreters to distribute your computations across many machines in a network. There are many more details on gateways in the API documentation at


Please enter your comment!
Please enter your name here