M0AGX / LB9MG

Amateur radio and embedded systems

Multiprocessing - Python's best kept secret

I use Python for most of my offline data processing needs. Recently I had to analyze a big set of data from an experiment. The unit task was not that difficult, open a file, run a couple of algorithms, and calculate several metrics from the signal. It took around ten seconds. All done in Scipy and NumPy. The "only" problem that I had thousands of files with a total volume around 20 GB. It should be trivial to parallelize - or so I thought.

As all measurements are in independent files, and the results are fully independent of each other this is an embarrassingly parallel problem. The processing algorithm is serial so the only way to parallelize is to run it on multiple files at once, and run each on an independent CPU core.

Even though CPython is essentially single-threaded (due to the GIL) I hoped that maybe SciPy is internally multithreaded or that my signal processing code is memory-bound or IO-bound.

First attempt - threads

I structured my script around a main thread that would spin up the worker threads, enumerate data files from a directory, pass the file paths (as plain strings) to a request queue, and fetch the results (as a list of ints) from a results queue. All using standard threading.Thread and queue.Queue.

The worker thread basically looks like this:

1
2
3
4
5
6
7
8
9
def worker_func(*args):
    while True:
        file_name_to_process = request_queue.get()
        if file_name_to_process:
            result = do_the_big_math(file_name_to_process)
            result_queue.put(result)
        else: # None means that there are no more files to process
            result_queue.put(None)
            break

The value None put into the request queue is used to tell the thread to quit, and the quitting thread also puts None in the results queue to tell the main thread that it has quit.

The main thread looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
request_queue = queue.Queue()
result_queue = queue.Queue()
worker_count = 5

for i in range(0, worker_count):
    handle = threading.Thread(target=worker_thread_func)
    handle.start()

directory = os.fsencode(sys.argv[1])
list_of_files = os.listdir(directory)
print("Files to process %d" % len(list_of_files))

start_timestamp = time.time()

for file in list_of_files:
    file_name = sys.argv[1] + os.fsdecode(file)
    request_queue.put(file_name)

# Tell workers to quit at the end
for i in range(0, worker_count):
    request_queue.put(None)

results = []
print("Waiting for results...")

info_interval = 500 # Print status message every N results
prev_timestamp = time.time()
quit_worker_count = 0
c = 0

while True:
    result = result_queue.get()
    if result == None:
        quit_worker_count += 1
        if quit_worker_count == worker_count:
            break
        continue

    c += 1

    if c % info_interval == 0:
        current_timestamp = time.time()
        delta_timestamp = current_timestamp - prev_timestamp
        prev_timestamp = current_timestamp
        rate_files_per_second = info_interval / delta_timestamp
        eta_seconds = (len(list_of_files) - c) / rate_files_per_second
        print("%05d/%05d done, %05.02f files/s, ETA %d s" % (c, len(list_of_files), rate_files_per_second, eta_seconds))

    results.append(result)

stop_timestamp = time.time()
processing_time = stop_timestamp - start_timestamp
processing_rate = len(list_of_files) / processing_time
print("Processing time %d s, avg %.02f files/s" % (processing_time, processing_rate))

Results

After starting the script I noticed that there is only one CPU core busy at 100%. htop was showing the main Python process and its threads. Unfortunately every thread consumed only a fraction of a CPU so the processing code was definitely CPU-bound. The processing rate was about 80 files/second.

I had to break my application into independent processes to avoid Python GIL. Starting independent processes has a somewhat "bad reputation" for being slow and heavyweight. This was valid (and still is) for the C10k problem and network servers in general. Starting a new process for every network request is indeed wasteful. However in my case I start the processes only once and keep them busy 100% of the time anyway. Plus modern Linux handles the fork call very efficiently using CoW. I was about to start implementing os.fork() in my Python code the same way I would do in plain C but... I came across the multiprocessing module!

Second attempt - multiprocessing

Python multiprocessing module essentially reuses the threading module semantics & APIs but for independent processes. It means that I you don't have to deal with os.fork() and implement your own inter-process communication.

What is the important difference? Threads have access to a common address space of the process which means that a global variable changed from one thread is also visible for all the other threads (but that requires the global interpreter lock and can't prevent every possible mishap). With independent processes the state of all variables and objects is copied from the parent process to the child process when they are started. From that moment they become fully independent. To exchange data between processes you have to use IPC.

The Linux Programming Interface book is a great resource about "good 'ol Unix IPC". I could perhaps use Unix sockets, pipes, or shared memory to exchange the file paths to process and the results, but fortunately there is a synchronized queue multiprocessing.Queue that behaves identically like the one from the threading module. Initialization and "connection" to the queue is handled automatically. I can pass any Python object through the queue just like between threads.

The code stays basically the same:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
request_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
worker_count = 5

for i in range(0, worker_count):
    handle = multiprocessing.Process(target=worker_thread_func)
    handle.start()

directory = os.fsencode(sys.argv[1])
list_of_files = os.listdir(directory)
print("Files to process %d" % len(list_of_files))

start_timestamp = time.time()

for file in list_of_files:
    file_name = sys.argv[1] + os.fsdecode(file)
    request_queue.put(file_name)

# Tell workers to quit at the end
for i in range(0, worker_count):
    request_queue.put(None)

results = []
print("Waiting for results...")

info_interval = 500 # Print status message every N results
prev_timestamp = time.time()
quit_worker_count = 0
c = 0

while True:
    result = result_queue.get()
    if result == None:
        quit_worker_count += 1
        if quit_worker_count == worker_count:
            break
        continue

    c += 1

    if c % info_interval == 0:
        current_timestamp = time.time()
        delta_timestamp = current_timestamp - prev_timestamp
        prev_timestamp = current_timestamp
        rate_files_per_second = info_interval / delta_timestamp
        eta_seconds = (len(list_of_files) - c) / rate_files_per_second
        print("%05d/%05d done, %05.02f files/s, ETA %d s" % (c, len(list_of_files), rate_files_per_second, eta_seconds))

    results.append(result)

stop_timestamp = time.time()
processing_time = stop_timestamp - start_timestamp
processing_rate = len(list_of_files) / processing_time
print("Processing time %d s, avg %.02f files/s" % (processing_time, processing_rate))

Only 3 lines of code are different! The queues and multiprocess.Process instead of threading.Thread. The overall behaviour stayed identical but I could see the workers as independent processes in htop and each of them was consuming 100% of the CPU core it was running on. I bumped up the number of worker threads to 24 (the number of logical cores that my CPU provides). I also added a call to os.nice(15) in the main process so that the workers inherit the low priority to keep my system responsive.

Using all 24 workers I was able to fully load my CPU and increase the processing speed to 1400 files/second!