Yes, Celery is amazing in its way; it is the most commonly used “Distributed Task Queue” library, and this did not happen accidentally.
But there are cases when you don’t need the whole featureset offered by Celery, like multi-broker support. Let’s say you just want to execute your tasks asynchronously, without adding a lot of extra bulk to your Python dependencies, and let’s say you happen to have a Redis instance lying around. Then Huey might be the one for you.
It’s a lightweight task queue. Lightweight you say? Yes, the only dependency is the Python Redis client. Yes, really. But compared to the skinny requirements list, it can do lots of amazing things. And believe me, you’ll see that in this short article. We used Huey actively for our e-commerce start-up Endticket, and it turned out to be a good design decision.
from huey import RedisHuey huey = RedisHuey('my-queue', host='your.redis.host')
Of course you can define multiple instances in your code. You can actually run a Huey consumer per queue, giving you the control over how many consumers you want per queue, and you can also organize tasks on the queue level.So, important tasks can be put into a queue and less important ones into a different one. Easy, right?
#config.py from huey import RedisHuey importanthuey = RedisHuey('my-important-queue', host='your.redis.host') lowpriohuey = RedisHuey('my-lowprio-queue', host='your.redis.host')
You just have to import your defined Huey instance, and use its task decorator to decorate any regular function. When you apply the task decorator to a function, every time it gets called, instead of directly calling the function, an instance of the QueueTask class is created.
#tasks.py from config import huey # import the huey we instantiated in config.py @huey.task() def write_post(quality): print("I'm writing a(n) %s post" % quality) return True
As I mentioned before, decorated functions won’t get executed instantly when you call the function, but instead a QueueTask object is created, and the task gets enqueued for later execution. However, enqueueing a task involves no magic formulas and hard-to-remember syntaxes. You simply call the function, like you would normally.
# main.py from tasks import write_post # import our task from config import huey if __name__ == '__main__': quality = raw_input("What quality post are you writing?") write_post(quality) #QueueTask instance created and enqueued It will be executed instantly in the worker! write_post.schedule(args=(quality,), delay=120) #Task enqueud and scheduled to be executed 120 seconds later print('Enqueued job to write a(n) %s post' % quality)
Ok, you successfully managed to enqueue your task, and it’s resting in the Redis queue waiting to be executed, but how will the system execute your task? You just have to fire up a task executor!
$ huey_consumer.py main.huey
If you did everything well in the previous steps, the executor should be running and executing the task you enqueue.
If you have multiple queues defined, you must run a consumer for every queue.
$ huey_consumer.py main.importanthuey $ huey_consumer.py main.lowpriohuey
If your system is distributed and there are multiple nodes for processing async tasks, that’s not a problem. You can run one executor on every host; Huey can handle multiple workers for a single queue without a problem.
Yes, you can do this as well, and to be honest, in a simple way. You just have to use the periodic_task decorator of your huey instance, and the system does the rest.
from config import huey # import the huey we instantiated in config.py @huey.periodic_task(crontab(minute='0', hour='3')) def update_caches(): update_all_cache_entries()
You just have to make sure you have at least one consumer running for the given queue, and your tasks will be executed periodically.
When you enqueue a task it returns an AsyncData object:
# main.py from tasks import write_post # import our task from huey.exceptions import DataStoreTimeout from config import huey if __name__ == '__main__': quality = raw_input("What quality post are you writing?") result = write_post.schedule(args=(quality,), delay=120) print(type(result)) #huey.api.AsyncData result.get() #Try to get the result, if job finished already, the result is returned, else you get a None try: result.get(blocking=True, timeout=120) # You can block until you get your result, of course you can specify your timeout. Be aware that if timeout is reached and still no result is available a "huey.exceptions.DataStoreTimeout" will be raised. except DataStoreTimeout: print("Too slow mate :( ")
We run Huey using Supervisord inside a Docker container. Your supervisor config should look something similar to this. So if your queue happens to die, you can see it in supervisor logs, and will also get auto-restarted by supervisor. Nice and easy.
[program:my-queue] command=bash -c "echo starting main.huey && sleep $(( $RANDOM / 2500 + 4)) && exec huey_consumer.py main.huey" environment=PYTHONPATH=%(here)s/.. numprocs=1 process_name=%(program_name)s-%(process_num)d stopwaitsecs=5 stdout_logfile=%(here)s/huey.log redirect_stderr=true
About the author
BálintCsergő is a software engineer from Budapest, currently working as an infrastructure engineer at Hortonworks. He loves Unix systems, PHP, Python, Ruby, the Oracle database, Arduino, Java, C#, music, and beer.