Flask by example 9 (Running background tasks with celery)
What is celery?
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
There are three main components in celery
- The workers
- The brokers
- The task queue
Let’s think of our web application as a big hotel, at any point in time there are customers in the hotel. Those customers can place an order for food and refreshments from the hotel’s restaurant. Let’s also assume that we have only 3 cooks and numerous “room service staff”
Customer X calls the restaurant for rice, Y calls for salad and Z calls for a glass of orange juice (Which is relatively easy to prepare). These customers have created three Tasks
with this and the cooks start preparing the food. One of the cooks (Worker
) finishes his task (Obviously the glass of orange juice). The cook communicates to the room service staff that he’s done with the juice, who in turn communicates with customer Z via the desk phone (broker
) that the juice is ready and asks if he can bring it to his room.
The cook then enters into an idle state and waits for the next order (Task
) to come in.
The brokers job is simply to serve as a communication pipeline between the workers and the creator of the Task, in this case the broker is a combination of the “room service staff” and the phone line.
On a typical day, more than 3 customers might order for food at the same time, which exhausts the number of available hands to handle their requests, The cooks then execute the tasks in the order in which they came in. The cooks have a central computer where they can see all the orders of the customers, this is the Task queue
. as each cook finishes his current task, he quickly picks the next available task from the queue. They all repeat this procedure until the queue is empty.
Tasks can also be scheduled to be executed at a particular time. For example, a customer might call the restaurant and tell them he wants a bottle of wine by 6:00pm. Celery also supports this and it’s exactly what we’re going to implement in this part of the series.
This is basically how celery works, registered tasks are enqueued, available workers then pick a task from the queue and execute it. When they’re through with the task, they send a signal (through the use of the broker) that the task has been completed. The result may also be returned or stored (if needed).
The explanation above is more of a process/set of rules, so it’s more appropriate to call celery a protocol not a software.
This same protocol can be implemented in pretty much any modern programming language today. node_celery is an example.
Installation
Celery can be installed from pip with:
same goes for rabbitmq
Creating the celery object
Flask used to have an integration for celery, but from celery 3.0 that integration was no longer necessary.
Include this at the top of votr.py
What this code does is to subclass the Task class of celery and make each celery task aware of the application context in flask (i.e with app.app_context()
).
Without this you’ll get an error that the application was not registered on the db or that you’re working outside of the application context.
Don’t forget to update the config.py
file to include the new variable CELERY_BROKER
CELERY_BROKER = 'amqp://guest@localhost//'
Registering the Task(s)
Now lets create a new file called tasks.py
to hold our celery tasks.
Our task is very simple, it’s accepts a topic’s id, get’s the topic associated with that id and sets the status to False
which closes it.
To make celery aware of this function as a task we have to decorate it with @celery.task
.
The connect
function uses plain SQLAlchemy to connect to the database and create a session for us. This is just one of the little things Flask-SQLAlchemy
does for us behind the scenes.
running the task
Now at the point of creation of a poll, we want to dispatch the task to celery and tell it to run close_poll
at a particular time we provide
Edit the api_polls method to look like this
Just after the new poll has been created, we imported the close_poll
function from tasks.py (To avoid circular imports) and then called it’s apply_async
method while passing it the estimated time (as a datetime object) we want the task to execute as eta
eta
is formed from a UTC timestamp that we’ve not passed in yet. we’ll add that in the next section
Adding a time picker to the react front-end
Since most polls are short-lived, they don’t last more than a few hours or days, it makes more sense to use a timepicker not a full blown calendar, so i decided to build one react-simple-time-picker
.
It can be installed from npm
but since we’re not using Node, i’ve created a bundle that can be included in the html file.
Download the bundle, include it alongside polls.js and also in polls.html with
Finally edit PollForm component in polls.js to use react-simple-time-picker
. (Note that the unimportant methods have been cut away)
We imported the timepicker at the top of the file, we put it at the top of the file so we don’t get an exception when we try to use it a file that doesn’t have the included bundle
We also added a new state close_date
to track the closing date of the poll, by default we will close each poll in 24 hours if the close date was not specified. we’re also converting the time to a utc timestamp expressed in seconds (e.getTime() / 1000
). so we can easily use python’s datetime.utcfromtimestamp
to convert it to a datetime object that celery’s apply_async method expects as the eta argument.
The Timepicker is really simple. we just declared it with:
<SimpleTimePicker days="7" onChange={this.onDateChange} />
and passed in the number of days we want by default and our method onDateChange
as a prop called onChange
which helps us to keep track of the time generated by react-simple-time-picker
Your poll form should look like this now:
Finally we have to run celery, so it can accept and run the tasks
Open a new terminal and run celery with
celery -A tasks.celery worker --loglevel=info
Output
In the sample diagram, you can see that i already have a task running. by default, celery keeps unexecuted tasks in it’s queue even when it’s restarted. This means that tasks can survive a server reboot.
Also if a task’s eta has expired, celery would immediately execute the task(s) when it’s restarted.
Results are disabled, because we don’t care about the result of our task, we just want to close polls at a particular time.
Displaying the time left on a poll
It would be nice for our users to know how long a poll is active for before it gets closed. We should also be able to display the time to them in a nice format, such that they can either see the time left in days, hours or minutes depending on how much time is left.
To do that we’re going to add another field close_date
to the Topics
model and also modify it’s to_json
method to return the new column to us.
save the close_date in api_polls
.
Make sure you run the migrations before you proceed.
On the react side, we’ll have to modify LivePreviewProps
render method to calculate the date
and then we can display the date in LivePreview
with:
The poll form should show the time beside the vote count now
There is an important thing to note here: don’t put too much computation on the client side (though our date calculation should not cause any major slowdown), You want to keep your UI as fast as possible.
If you need to do heavy computation, you’re better off doing it on the server, or dispatching the task to celery to run in the background. Nobody likes a slow UI
We’ve come to the end of this part, you’ve learnt how to use celery to schedule background tasks and how to run those tasks at a later date.
If you’re still looking for some other things celery can help you achieve here are some examples on how a lot of companies use celery (or a similar solution) in production:
- Sending emails in the background when you register on their website
- Generating complex reports and graphs
- CSV export of a large dataset
In summary, you should use celery for any long-running user facing tasks you have in your application, so you don’t block or slowdown the UI and make your users unhappy.
Please read the celery docs to find out more about the advanced features of celery
I’ll also like to let you know about Flower which is a nice web app for monitoring your celery tasks in real time.
In the next part, we’re going to write some unit tests and discuss the importance of testing and code coverage.
untested code >= broken code
See you in the next part. Thanks for reading