10.1. Celery

  • A task queue implementation for Python web applications

  • Asynchronously execute work outside the HTTP request-response cycle

  • Can run batch jobs in the background on a regular schedule

Celery

A task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. Celery can be used to run batch jobs in the background on a regular schedule.

10.1.1. Install

$ pip install redis
$ pip install celery
$ pip install django-celery-results
$ pip install django-celery-beat

10.1.2. Configuration

  • https://docs.celeryq.dev/en/stable/userguide/configuration.html#new-lowercase-settings

  • Version 4.0 introduced new lower case settings and setting organization.

  • The major difference between previous versions, apart from the lower case names, are the renaming of some prefixes, like celery_beat_ to beat, celeryd_ to worker, and most of the top level celery settings have been moved into a new task_ prefix.

  • Celery will still be able to read old configuration files until Celery 6.0. Afterwards, support for the old configuration files will be removed. We provide the celery upgrade command that should handle plenty of cases (including Django).

Create file: myproject/celery.py

>>> import os
>>> from celery import Celery
>>>
>>>
>>> os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
'myproject.settings'
>>>
>>> app = Celery(
...    main='myproject',
...    broker='redis://localhost:6379',
...    backend='django-db')
>>>
>>> app.conf.task_track_started = True
>>> app.conf.task_send_sent_event = True
>>>
>>> app.autodiscover_tasks()

Modify file: myproject/__init__.py

>>> from .celery import app as celery_app  
>>>
>>> __all__ = ['celery_app']

10.1.3. Worker

  • Executes tasks

  • Workers that handle whatever tasks you put

  • Each worker will perform a task

  • When the task is completed will pick up the next one

  • The cycle will repeat continuously

  • Waiting idly when there are no more tasks

In foreground:

$ celery --app=addressbook worker --loglevel=INFO

By default it'll create pid and log files in the current directory. To protect against multiple workers launching on top of each other you're encouraged to put these in a dedicated directory:

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery

Run in background:

$ celery multi start worker1 \
    --app=myapp \
    --loglevel=INFO \
    --pidfile=/var/run/celery/%n.pid \
    --logfile=/var/log/celery/%n%I.log

Stop:

$ celery multi stop worker1 \
    --pidfile=/var/run/celery/%n.pid \
    --logfile=/var/log/celery/%n%I.log

10.1.4. Run Tasks

  • T.delay(arg, kwarg=value) - Star arguments shortcut to .apply_async. (.delay(*args, **kwargs) calls .apply_async(args, kwargs)).

  • T.apply_async((arg,), {'kwarg': value})

  • T.apply_async(countdown=10) - executes in 10 seconds from now.

  • T.apply_async(eta=now + timedelta(seconds=10)) - executes in 10 seconds from now, specified using eta

  • T.apply_async(countdown=60, expires=120) - executes in one minute from now, but expires after 2 minutes.

  • T.apply_async(expires=now + timedelta(days=2)) - expires in 2 days, set using datetime.

Run task:

>>> add.delay(2, 2)  

10.1.5. Status

  • PENDING -> STARTED -> SUCCESS

Run task:

>>> res = add.delay(2, 2)  

If you have a result backend configured you can retrieve the return value of a task:

>>> res.get(timeout=1)  
4

Attributes:

>>> res.id  
d6b3aea2-fb9b-4ebc-8da4-848818db9114
>>> res.state  
'SUCCESS'

Methods:

>>> res.failed()  
False
>>> res.successful()  
True

10.1.6. Beat

Celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster.

By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.

Entries:

>>> 
... from celery import Celery
... from celery.schedules import crontab
...
... app = Celery()
...
... @app.on_after_configure.connect
... def setup_periodic_tasks(sender, **kwargs):
...     # Calls test('hello') every 10 seconds.
...     sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
...
...     # Calls test('world') every 30 seconds
...     sender.add_periodic_task(30.0, test.s('world'), expires=10)
...
...     # Executes every Monday morning at 7:30 a.m.
...     sender.add_periodic_task(
...         crontab(hour=7, minute=30, day_of_week=1),
...         test.s('Happy Mondays!'),
...     )
...
... @app.task
... def test(arg):
...     print(arg)
...
... @app.task
... def add(x, y):
...     z = x + y
...     print(z)

The add.s and test.s call used here is called a signature. Sometimes you may want to pass the signature of a task invocation to another process or as an argument to another function. More information: https://docs.celeryq.dev/en/stable/userguide/canvas.html#signatures

>>> add.signature((2, 2), debug=True)  
tasks.add(2, 2, debug=True)

There's also a shortcut using star arguments (args unpacking):

>>> add.s(2, 2, debug=True)  
tasks.add(2, 2, debug=True)
$ celery -A proj beat

There's also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

$ pip install django-celery-beat

Modify MYPROJECT/settings.py:

>>> 
... INSTALLED_APPS = [
...     ...,
...     'django_celery_beat',
... ]
$ python manage.py migrate
celery -A proj beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

10.1.7. Retry

>>> @app.task(  
...     autoretry_for=(ConnectionError,),
...     retry_kwargs={'max_retries': 5})
... def refresh_timeline(user):
...     return twitter.refresh_timeline(user)

10.1.8. Reject

The task may raise Reject to reject the task message using AMQPs basic_reject method. This won't have any effect unless Task.acks_late is enabled.

Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.

Reject can also be used to re-queue messages, but please be very careful when using this as it can easily result in an infinite message loop.

>>> 
... import errno
... from celery.exceptions import Reject
...
... @app.task(bind=True, acks_late=True)
... def render_scene(self, path):
...     file = get_file(path)
...     try:
...         renderer.render_scene(file)
...
...     # if the file is too big to fit in memory
...     # we reject it so that it's redelivered to the dead letter exchange
...     # and we can manually inspect the situation.
...     except MemoryError as exc:
...         raise Reject(exc, requeue=False)
...     except OSError as exc:
...         if exc.errno == errno.ENOMEM:
...             raise Reject(exc, requeue=False)
...
...     # For any other error we retry after 10 seconds.
...     except Exception as exc:
...         raise self.retry(exc, countdown=10)

Re-queue it:

>>> 
... from celery.exceptions import Reject
...
... @app.task(bind=True, acks_late=True)
... def requeues(self):
...     if not self.request.delivery_info['redelivered']:
...         raise Reject('no reason', requeue=True)
...     print('received two times')

10.1.9. Security

You can override how positional arguments and keyword arguments are represented in logs and monitoring events using the argsrepr and kwargsrepr calling arguments:

>>> 
... add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> 
... charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

Warning: Sensitive information will still be accessible to anyone able to read your task message from the broker, or otherwise able intercept it. For this reason you should probably encrypt your message if it contains sensitive information, or in this example with a credit card number the actual number could be stored encrypted in a secure store that you retrieve and decrypt in the task itself.

10.1.10. Good Practices

https://docs.celeryq.dev/en/stable/userguide/tasks.html#avoid-launching-synchronous-subtasks

Having a task wait for the result of another task is really inefficient, and may even cause a deadlock if the worker pool is exhausted.

Make your design asynchronous instead, for example by using callbacks.

Bad:

>>> 
... @app.task
... def update_page_info(url):
...     page = fetch_page.delay(url).get()
...     info = parse_page.delay(url, page).get()
...     store_page_info.delay(url, info)
...
... @app.task
... def fetch_page(url):
...     return myhttplib.get(url)
...
... @app.task
... def parse_page(page):
...     return myparser.parse_document(page)
...
... @app.task
... def store_page_info(url, info):
...     return PageInfo.objects.create(url, info)

Good:

>>> 
... def update_page_info(url):
...     # fetch_page -> parse_page -> store_page
...     chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
...     chain()
...
... @app.task()
... def fetch_page(url):
...     return myhttplib.get(url)
...
... @app.task()
... def parse_page(page):
...     return myparser.parse_document(page)
...
... @app.task(ignore_result=True)
... def store_page_info(info, url):
...     PageInfo.objects.create(url=url, info=info)

10.1.11. State

Since Celery is a distributed system, you can't know which process, or on what machine the task will be executed. You can't even know if the task will run in a timely manner.

The ancient async sayings tells us that "asserting the world is the responsibility of the task". What this means is that the world view may have changed since the task was requested, so the task is responsible for making sure the world is how it should be; If you have a task that re-indexes a search engine, and the search engine should only be re-indexed at maximum every 5 minutes, then it must be the tasks responsibility to assert that, not the callers.

Another gotcha is Django model objects. They shouldn't be passed on as arguments to tasks. It's almost always better to re-fetch the object from the database when the task is running instead, as using old data may lead to race conditions.

Imagine the following scenario where you have an article and a task that automatically expands some abbreviations in it:

>>> 
... class Article(models.Model):
...     title = models.CharField()
...     body = models.TextField()
...
... @app.task
... def expand_abbreviations(article):
...     article.body.replace('MyCorp', 'My Corporation')
...     article.save()

First, an author creates an article and saves it, then the author clicks on a button that initiates the abbreviation task:

>>> 
... article = Article.objects.get(id=102)
... expand_abbreviations.delay(article)

Now, the queue is very busy, so the task won't be run for another 2 minutes. In the meantime another author makes changes to the article, so when the task is finally run, the body of the article is reverted to the old version because the task had the old body in its argument.

Fixing the race condition is easy, just use the article id instead, and re-fetch the article in the task body:

>>> 
... @app.task
... def expand_abbreviations(article_id):
...     article = Article.objects.get(id=article_id)
...     article.body.replace('MyCorp', 'My Corporation')
...     article.save()
...
... expand_abbreviations.delay(article_id)

There might even be performance benefits to this approach, as sending large messages may be expensive.

10.1.12. Further Reading