The Fanout Pattern Explained
With my work at AspirEDU, we use Celery to capture education information on a daily basis. We’ve gone through a few different strategies for designing the Celery task signatures1. The most powerful of which has been the Fanout Pattern.
The fanout pattern is a single task replacing itself with a variable number of other tasks.
Let’s look at some code. If we have the task signature:
task1.si() | task2.si() | task3.si()
Each task must complete before the next one starts. However, let’s assume in task2
that we
are fetching some collection and need to run another task for each item in the
collection. The fanout pattern would involve replacing task2
with
item1, item2, ..., itemN
where N is the number of items in the collection.
Here’s that same point again, but in code:
# We schedule this:
(
task1.si()
| task2.si()
| task3.si()
).apply_async()
# But this is what runs:
(
task1.si()
| task2.si()
| group([item1.si(), item2.si(), itemN.si()])
| task3.si()
).apply_async()
Keep in mind that you don’t need to replace the task with a group of other tasks. In Celery, you could replace the currently running task with any other task signature, such as a single task or a complex task signature.
Below is a deeper dive into an example and the pattern, but you could probably get away with
the above example and reading the docs on Task.replace
.
Realistic Taco Bell example problem
A more realistic example of this would be you’re tasked with capturing all the menu options across every Taco Bell2 franchise then sending an email with the least frequently listed items3. The API endpoints you need to consume data from are:
fetch_franchises -> list[Franchise]
fetch_menu(franchise_id: int) -> Menu
Let’s imagine we make a single Celery task for responsible each of these API endpoints.
from celery import shared_task
@shared_task
def capture_franchises():
for franchise in fetch_franchises():
# Do something
@shared_task
def capture_menu_for_franchise(franchise_id: int):
for menu_item in fetch_menu(franchise_id).menu_items:
# Do something
Now our task for sending the report:
from celery import shared_task
from django.core.mail import send_mail
from .models import MenuItem # Our imaginary django model
@shared_task
def send_report():
least_listed_menu_items = MenuItem.objects.least_listed().values_list('name', flat=True)
send_mail(
subject="Least listed items report",
message=f"{', '.join(least_listed_menu_items)}",
recipient_list=["menu_design@taco-bell.better-simple.com"],
from_email="fanout_pattern@better-simple",
)
The challenge is how do we create a Celery task signature that will run capture_franchises
, then run a
capture_menu_for_franchise
for every franchise found, then after all of those tasks finish call run
the send_report
task.
One solution is the fanout pattern.
The fanout pattern in practice
The key to the fanout pattern is the ability to replace a task with another task signature.
In Celery, to replace a task you must use create a
bound task
via @shared_task(bind=True)
or @app.task(bind=True)
4. This grants you access to the
task request via the
self
argument to your task. Then on self
there’s the
replace
function.
That’s a lot of links and documentation. So here’s the code that should highlight my point:
from celery import group
from .models import Franchise # Our imaginary django model
@shared_task(bind=True)
def capture_franchises(self):
for franchise in fetch_franchises():
# Do something
# We now have fetched all franchises, let's replace this
# task with another task signature
self.replace(
group([
capture_menu_for_franchise.si(franchise_id=franchise.id)
for franchise in Franchise.objects.all()
])
)
You can see that we’re replacing the task capture_franchises
with
a group of capture_menu_for_franchise
tasks via the self.replace()
line.
Now we need to chain the report task to the above.
def run_least_listed_report():
signature = capture_franchises.si() | send_report.si()
signature.apply_async()
A word on task design
If you’re like me, the above doesn’t sit well with you. The signature
definition doesn’t
make sense to a reader, capture_franchises.si() | send_report.si()
. The franchises
are being captured, then the report is sent out? If the menus are also being captured, does
that imply there is other data also being captured in capture_franchises
?
Instead, I think a more appropriate solution would be:
@shared_task
def capture_franchises():
for franchise in fetch_franchises():
# Do something
@shared_task(bind=True)
def capture_franchises_menu_fanout(self):
franchise_ids = list(Franchise.objects.all().values_list("id", flat=True))
if franchise_ids:
# Celery doesn't handle group([]) with an empty collection well
# so don't replace if there are no franchises stored
self.replace(
group([
capture_menu_for_franchise.si(franchise_id=franchise_id)
for franchise_id in franchise_ids
])
)
The new signature would be:
capture_franchises.si() | capture_franchises_menu_fanout.si() | send_report.si()
Which is more descriptive and easier to extend or reproduce for a similar workflow elsewhere.
Recap
The fanout pattern is great way to implement a dynamic workflow in your Celery task signature. You can use it to reduce the size of your signatures and programmatically select the next task based on external data.
I hope you found this helpful. If you have questions, don’t hesitate to reach out to me. You can find me on the Fediverse, Django Discord server or via email.
-
A Celery task signature meaning one or more Celery tasks linked together. ↩
-
Cheesy gordita crunch and chicken quesadilla. It’s the answer to the question you didn’t know you had. ↩
-
We’re using the least frequently listed items because it requires you to search every franchise no matter what. ↩
-
I use
shared_task
since it’s recommended for Django, but you can useapp.task
if you prefer. ↩