5 min read

Yes, Celery is amazing in its way; it is the most commonly used “Distributed Task Queue” library, and this did not happen accidentally.

But there are cases when you don’t need the whole featureset offered by Celery, like multi-broker support. Let’s say you just want to execute your tasks asynchronously, without adding a lot of extra bulk to your Python dependencies, and let’s say you happen to have a Redis instance lying around. Then Huey might be the one for you.

What does Huey offer?

It’s a lightweight task queue. Lightweight you say? Yes, the only dependency is the Python Redis client. Yes, really. But compared to the skinny requirements list, it can do lots of amazing things. And believe me, you’ll see that in this short article. We used Huey actively for our e-commerce start-up Endticket, and it turned out to be a good design decision.

Defining your Huey instances

from huey import RedisHuey
huey = RedisHuey('my-queue', host='your.redis.host')

Of course you can define multiple instances in your code. You can actually run a Huey consumer per queue, giving you the control over how many consumers you want per queue, and you can also organize tasks on the queue level.So, important tasks can be put into a queue and less important ones into a different one. Easy, right?

#config.py
from huey import RedisHuey

importanthuey = RedisHuey('my-important-queue', host='your.redis.host')
lowpriohuey = RedisHuey('my-lowprio-queue', host='your.redis.host')

We have a Huey instance! But how do I define a task?

You just have to import your defined Huey instance, and use its task decorator to decorate any regular function. When you apply the task decorator to a function, every time it gets called, instead of directly calling the function, an instance of the QueueTask class is created.

#tasks.py
from config import huey # import the huey we instantiated in config.py

@huey.task()
def write_post(quality):
    print("I'm writing a(n) %s post" % quality)
    return True

Task are supposed to be called, right?

As I mentioned before, decorated functions won’t get executed instantly when you call the function, but instead a QueueTask object is created, and the task gets enqueued for later execution. However, enqueueing a task involves no magic formulas and hard-to-remember syntaxes. You simply call the function, like you would normally.

# main.py
from tasks import write_post  # import our task
from config import huey


if __name__ == '__main__':
    quality = raw_input("What quality post are you writing?")
    write_post(quality) #QueueTask instance created and enqueued It will be executed instantly in the worker!
    write_post.schedule(args=(quality,), delay=120) #Task enqueud and scheduled to be executed 120 seconds later
    print('Enqueued job to write a(n) %s post' % quality)

Task execution in Huey

Ok, you successfully managed to enqueue your task, and it’s resting in the Redis queue waiting to be executed, but how will the system execute your task? You just have to fire up a task executor!

$ huey_consumer.py main.huey

If you did everything well in the previous steps, the executor should be running and executing the task you enqueue.

If you have multiple queues defined, you must run a consumer for every queue.

$ huey_consumer.py main.importanthuey
$ huey_consumer.py main.lowpriohuey

If your system is distributed and there are multiple nodes for processing async tasks, that’s not a problem. You can run one executor on every host; Huey can handle multiple workers for a single queue without a problem.

Periodic tasks, using Huey in a crontab-like manner

Yes, you can do this as well, and to be honest, in a simple way. You just have to use the periodic_task decorator of your huey instance, and the system does the rest.

from config import huey # import the huey we instantiated in config.py

@huey.periodic_task(crontab(minute='0', hour='3'))
def update_caches():
    update_all_cache_entries()

You just have to make sure you have at least one consumer running for the given queue, and your tasks will be executed periodically.

I want to see results, not promises!

When you enqueue a task it returns an AsyncData object:

# main.py
from tasks import write_post  # import our task
from huey.exceptions import DataStoreTimeout
from config import huey

if __name__ == '__main__':
    quality = raw_input("What quality post are you writing?")
    result =  write_post.schedule(args=(quality,), delay=120)
    print(type(result)) #huey.api.AsyncData
    result.get() #Try to get the result, if job finished already, the result is returned, else you get a None
   try:
    result.get(blocking=True, timeout=120) # You can block until you get your result, of course you can specify your timeout. Be aware that if timeout is reached and still no result is available a "huey.exceptions.DataStoreTimeout" will be raised.
   except DataStoreTimeout:
    print("Too slow mate :( ")

Running in production

We run Huey using Supervisord inside a Docker container. Your supervisor config should look something similar to this. So if your queue happens to die, you can see it in supervisor logs, and will also get auto-restarted by supervisor. Nice and easy.

[program:my-queue]
command=bash -c "echo starting main.huey && sleep $(( $RANDOM / 2500 + 4)) && exec huey_consumer.py main.huey"
environment=PYTHONPATH=%(here)s/..
numprocs=1
process_name=%(program_name)s-%(process_num)d
stopwaitsecs=5
stdout_logfile=%(here)s/huey.log
redirect_stderr=true

About the author

BálintCsergő is a software engineer from Budapest, currently working as an infrastructure engineer at Hortonworks. He loves Unix systems, PHP, Python, Ruby, the Oracle database, Arduino, Java, C#, music, and beer.

LEAVE A REPLY

Please enter your comment!
Please enter your name here