Numpy Random Number Generators and Celery

May 10, 2014

I stumbled upon a surprising behavior while using Celery and random number generators in numpy recently, and decided to write up how I dealt with it in hopes that someone else will find this before spending as much time as I did to figure it out.

Here's a small example illustrating the problem. First, we have a celery task that picks a random number using numpy.random.randint:

from celery import Celery
import numpy as np

app = Celery('tasks',
             broker='redis://localhost:6379',
             backend='redis://localhost:6379')

@app.task
def pick_random(high):
    return np.random.randint(1, high)

Calling this task will return a random integer between 1 and high, inclusive. Save the file as tasks.py. Here's the script I used to run it (saved as randomtest.py):

#!/usr/bin/env python
import tasks

def expected_collisions(n, d):
    # http://en.wikipedia.org/wiki/Birthday_problem#Collision_counting
    n = float(n)
    d = float(d)
    return n - d + d * (((d - 1) / d) ** n)

def main():
    num = 100
    high = 100

    results = set()
    for i in range(num):
        task = tasks.pick_random.delay(high)
        n = task.get()
        results.add(n)

    actual_unique = len(results)

    expected_unique = num - expected_collisions(num, high)
    print 'Unique results:'
    print 'Expected: %.1f' % expected_unique
    print 'Actual: %d' % actual_unique

if __name__ == '__main__':
    main()

This script will call the task 100 times in a row, adding the returned values to a set, so that we can keep track of only the unique results. It also calculates the expected number of unique results we should get, assuming our random number generator is working correctly (see the Birthday Problem).

I started the celery workers in one terminal window with:

$ celery worker -A tasks

And ran the script in another with:

$ python randomtest.py

The output was:

Unique results:
Expected: 63.4
Actual: 16

So, based on probability, picking 100 different numbers from a pool of 100, we should have received about 63 unique numbers, but for some reason we only got 16. How could this be?

The problem is in the way celery creates the pool of workers. As I understand it, it first imports tasks.py, then forks separate processes for each of the workers. Any global state that is created before the fork happens will be shared between all of the worker processes. In this case, that state includes the state of the random number generator. When we import numpy, we set up the random number generator with a random seed. But, when we fork off the worker processes, they each have their own RNG, but they were all seeded with the same random seed, meaning they will produce exactly the same sequence of results. This can be useful if you want repeatable results across all of your workers, but potentially disastrous if you aren't expecting it.

How to fix it

The fix for this is relatively easy. We want to reseed the random number generator in each worker process again after they fork. To do this, we can bind a function to trigger on a certain celery signal. A natural choice for this is the worker_process_init signal:

worker_process_init

Dispatched in all pool child processes when they start.

This can be done by changing our tasks.py to bind a seed_rng function to that signal:

from celery import Celery
import celery.signals
import numpy as np

app = Celery('tasks',
             broker='redis://localhost:6379',
             backend='redis://localhost:6379')

@celery.signals.worker_process_init.connect()
def seed_rng(**_):
    """
    Seeds the numpy random number generator.
    """
    np.random.seed()

@app.task
def pick_random(high):
    return np.random.randint(1, high)

I chose to call the arguments for the new function **_, which simply ignores all of the keyword arguments passed to it by celery.

After making this change, the output of my simple script is now:

Unique results:
Expected: 63.4
Actual: 64

Finally, random numbers!