I stumbled upon a surprising behavior while using Celery and random number generators in numpy recently, and decided to write up how I dealt with it in hopes that someone else will find this before spending as much time as I did to figure it out.
Here's a small example illustrating the problem. First, we have a celery task
that picks a random number using
from celery import Celery import numpy as np app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379') @app.task def pick_random(high): return np.random.randint(1, high)
Calling this task will return a random integer between 1 and
Save the file as
tasks.py. Here's the script I used to run it (saved as
#!/usr/bin/env python import tasks def expected_collisions(n, d): # http://en.wikipedia.org/wiki/Birthday_problem#Collision_counting n = float(n) d = float(d) return n - d + d * (((d - 1) / d) ** n) def main(): num = 100 high = 100 results = set() for i in range(num): task = tasks.pick_random.delay(high) n = task.get() results.add(n) actual_unique = len(results) expected_unique = num - expected_collisions(num, high) print 'Unique results:' print 'Expected: %.1f' % expected_unique print 'Actual: %d' % actual_unique if __name__ == '__main__': main()
This script will call the task 100 times in a row, adding the returned values to
set, so that we can keep track of only the unique results. It also
calculates the expected number of unique results we should get, assuming our
random number generator is working correctly (see the Birthday Problem).
I started the celery workers in one terminal window with:
$ celery worker -A tasks
And ran the script in another with:
$ python randomtest.py
The output was:
Unique results: Expected: 63.4 Actual: 16
So, based on probability, picking 100 different numbers from a pool of 100, we should have received about 63 unique numbers, but for some reason we only got 16. How could this be?
The problem is in the way celery creates the pool of workers. As I understand
it, it first imports
tasks.py, then forks separate processes for each of the
workers. Any global state that is created before the fork happens will be shared
between all of the worker processes. In this case, that state includes the
state of the random number generator. When we
import numpy, we
set up the random number generator with a random seed. But, when we fork off the
worker processes, they each have their own RNG, but they were all seeded with
the same random seed, meaning they will produce exactly the same sequence of
results. This can be useful if you want repeatable results across all of your
workers, but potentially disastrous if you aren't expecting it.
How to fix it
The fix for this is relatively easy. We want to reseed the random number generator
in each worker process again after they fork. To do this, we can bind a function
to trigger on a certain celery signal. A natural choice for this is the
Dispatched in all pool child processes when they start.
This can be done by changing our
tasks.py to bind a
seed_rng function to that signal:
from celery import Celery import celery.signals import numpy as np app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379') @celery.signals.worker_process_init.connect() def seed_rng(**_): """ Seeds the numpy random number generator. """ np.random.seed() @app.task def pick_random(high): return np.random.randint(1, high)
I chose to call the arguments for the new function
**_, which simply ignores
all of the keyword arguments passed to it by celery.
After making this change, the output of my simple script is now:
Unique results: Expected: 63.4 Actual: 64
Finally, random numbers!