I stumbled upon a surprising behavior while using Celery and random number generators in numpy recently, and decided to write up how I dealt with it in hopes that someone else will find this before spending as much time as I did to figure it out.
Here's a small example illustrating the problem. First, we have a celery task
that picks a random number using numpy.random.randint
:
from celery import Celery
import numpy as np
app = Celery('tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379')
@app.task
def pick_random(high):
return np.random.randint(1, high)
Calling this task will return a random integer between 1 and high
, inclusive.
Save the file as tasks.py
. Here's the script I used to run it (saved as
randomtest.py
):
#!/usr/bin/env python
import tasks
def expected_collisions(n, d):
# http://en.wikipedia.org/wiki/Birthday_problem#Collision_counting
n = float(n)
d = float(d)
return n - d + d * (((d - 1) / d) ** n)
def main():
num = 100
high = 100
results = set()
for i in range(num):
task = tasks.pick_random.delay(high)
n = task.get()
results.add(n)
actual_unique = len(results)
expected_unique = num - expected_collisions(num, high)
print 'Unique results:'
print 'Expected: %.1f' % expected_unique
print 'Actual: %d' % actual_unique
if __name__ == '__main__':
main()
This script will call the task 100 times in a row, adding the returned values to
a set
, so that we can keep track of only the unique results. It also
calculates the expected number of unique results we should get, assuming our
random number generator is working correctly (see the Birthday Problem).
I started the celery workers in one terminal window with:
$ celery worker -A tasks
And ran the script in another with:
$ python randomtest.py
The output was:
Unique results:
Expected: 63.4
Actual: 16
So, based on probability, picking 100 different numbers from a pool of 100, we should have received about 63 unique numbers, but for some reason we only got 16. How could this be?
The problem is in the way celery creates the pool of workers. As I understand
it, it first imports tasks.py
, then forks separate processes for each of the
workers. Any global state that is created before the fork happens will be shared
between all of the worker processes. In this case, that state includes the
state of the random number generator. When we import numpy
, we
set up the random number generator with a random seed. But, when we fork off the
worker processes, they each have their own RNG, but they were all seeded with
the same random seed, meaning they will produce exactly the same sequence of
results. This can be useful if you want repeatable results across all of your
workers, but potentially disastrous if you aren't expecting it.
How to fix it
The fix for this is relatively easy. We want to reseed the random number generator
in each worker process again after they fork. To do this, we can bind a function
to trigger on a certain celery signal. A natural choice for this is the
worker_process_init
signal:
worker_process_init
Dispatched in all pool child processes when they start.
This can be done by changing our tasks.py
to bind a seed_rng
function to that signal:
from celery import Celery
import celery.signals
import numpy as np
app = Celery('tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379')
@celery.signals.worker_process_init.connect()
def seed_rng(**_):
"""
Seeds the numpy random number generator.
"""
np.random.seed()
@app.task
def pick_random(high):
return np.random.randint(1, high)
I chose to call the arguments for the new function **_
, which simply ignores
all of the keyword arguments passed to it by celery.
After making this change, the output of my simple script is now:
Unique results:
Expected: 63.4
Actual: 64
Finally, random numbers!