Run Chord callbacks in Celery (4.3.0) regardless of the exit status of it's tasks
You’re probably here because you’ve stumbled upon a problem with Celery chords. The Chord callback function isn’t executed if one of the tasks in the Chord’s body fails.
This sucks because in most cases, The callback’s execution confirms that all the tasks have been executed by the workers.
We can quickly reproduce this problem in a small python file (I’m using the Redis backend with docker, but you can use any backend that supports chords)
Start a new Redis
container with:
docker run -p 6379:6379 redis
Create a new file called test_celery.py
from celery import Celery, group
redis_url = "redis://localhost:6379"
app = Celery('test_celery', backend=redis_url, broker=redis_url)
@app.task
def good_task():
return 'Good task'
@app.task
def other_good_task():
return 'Other good task'
@app.task
def bad_task():
raise Exception('Bad Task!!!')
@app.task
def tasks_completed(results):
print('Tasks completed', results)
if __name__ == '__main__':
# A chained group of tasks is automatically Upgraded to a Chord
(group(good_task.s(), bad_task.s(), other_good_task.s()) | tasks_completed.s()).delay()
Let’s run the worker file with:
celery worker -A test_celery --loglevel=info
Now, execute the chord:
python test_celery.py
If you look at the logs you’ll see the exception raised in the bad_task
Task but the callback function (i.e tasks_completed
) isn’t executed.
line 352, in _unpack_chord_result
raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
celery.exceptions.ChordError: Dependency 4d39d906-0118-4984-b4a6-68fa97f220b4 raised Exception('Bad Task!!!')
[2019-07-06 18:51:03,297: INFO/ForkPoolWorker-1] Task test_celery.other_good_task[ee3d4a1c-7232-4344-8703-1ff750bf9023] succeeded in 0.005841678001161199s: 'Other good task'
The Recommendation from the Celery team is to make your tasks are bullet proof. But this is almost impossible especially when your tasks are communicate with external API’s or depend on external resources.
In celery <= 3.1 There used to be a setting called CELERY_CHORD_PROPAGATES
to control this behaviour but it was removed and this is all I could find concerning the removal:
CELERY_CHORD_PROPAGATES setting will eventually be removed, as it was just added to be backward compatible with the old undefined behavior, which was just accidental and never considered to be a feature.
Anyway, to achieve this we have to monkey patch the redis backend to return a custom value instead of raising a ChordError
.
The trackeback already gives us a clue on where to look (Line 352)
import celery
from celery import Celery, group, states
from celery.backends.redis import RedisBackend
def patch_celery():
"""Patch the redis backend."""
def _unpack_chord_result(
self, tup, decode,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES,
):
_, tid, state, retval = decode(tup)
if state in EXCEPTION_STATES:
retval = self.exception_to_python(retval)
if state in PROPAGATE_STATES:
# retval is an Exception
return '{}: {}'.format(retval.__class__.__name__, str(retval))
return retval
celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result
return celery
patch_celery
is a factory function that returns a patched version of Celery’s Redis backend. Instead of raising an exception, we return the Exception class name alongside the exception body.
Putting everything together, we have this:
import celery
from celery import Celery, group, states
from celery.backends.redis import RedisBackend
def patch_celery():
"""Patch redis backend."""
def _unpack_chord_result(
self, tup, decode,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES,
):
_, tid, state, retval = decode(tup)
if state in EXCEPTION_STATES:
retval = self.exception_to_python(retval)
if state in PROPAGATE_STATES:
# retval is an Exception
return '{}: {}'.format(retval.__class__.__name__, str(retval))
return retval
celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result
return celery
redis_url = "redis://localhost:6379"
app = patch_celery().Celery('test_celery', backend=redis_url, broker=redis_url)
@app.task
def good_task():
return 'Good task'
@app.task
def other_good_task():
return 'Other good task'
@app.task
def bad_task():
raise Exception('Bad Task!!!')
@app.task
def tasks_completed(results):
print('Tasks completed', results)
if __name__ == '__main__':
# A chained group of tasks is automatically Upgraded to a Chord
(group(good_task.s(), bad_task.s(), other_good_task.s()) | tasks_completed.s()).delay()
Now restart the celery worker process and run
python test_celery.py
again
and you should see something like this in the logs:
raise Exception('Bad Task!!!')
Exception: Bad Task!!!
[2019-07-06 19:08:28,909: INFO/MainProcess] Received task: test_celery.other_good_task[8a13fc4f-ca80-44a8-b8b6-d0433e34166b]
[2019-07-06 19:08:28,923: INFO/ForkPoolWorker-1] Task test_celery.other_good_task[8a13fc4f-ca80-44a8-b8b6-d0433e34166b] succeeded in 0.009584861996700056s: 'Other good task'
[2019-07-06 19:08:28,923: INFO/MainProcess] Received task: test_celery.tasks_completed[d3cbd175-c477-4eaa-b008-eb87efe9dbc1]
[2019-07-06 19:08:28,925: WARNING/ForkPoolWorker-2] Tasks completed
[2019-07-06 19:08:28,925: WARNING/ForkPoolWorker-2] ['Good task', 'Exception: Bad Task!!!', 'Other good task']
[2019-07-06 19:08:28,926: INFO/ForkPoolWorker-2] Task test_celery.tasks_completed[d3cbd175-c477-4eaa-b008-eb87efe9dbc1] succeeded in 0.0014016950008226559s: None
We can see that the exception is still raised but now, tasks_completed
is executed and we also see the Exception returned as one of the results!
[2019-07-06 19:08:28,925: WARNING/ForkPoolWorker-2] ['Good task', 'Exception: Bad Task!!!', 'Other good task']
You can also do some interesting things with the task id. Like retrieving the function name, retrying the failed task etc. Hopefully, CELERY_CHORD_PROPAGATES
makes it’s way back to Celery.