Pythongasm — Home

Concurrency And Parallelism In Python

Introduction

Most of us have come across terms like multithreading, parallel processing, multiprocessing, concurrency, etc., though each of these terms has its own meaning.

In a broader sense, we need these because we want to avoid some kind of delay (or have an illusion of doing so) in the execution of "regular" programs. In order to avoid this delay, we need to write our program in such a way that it isn't executed in a sequential fashion, and all of these concepts boil down to two different concepts — concurrency and parallelism.

Concurrency

I came across concurrency while trying to download ~5000 images from the web, trying hard to meet a deadline. These images were supposed to be passed on to a team doing annotation. I had already collected the image URLs from Flickr.

This is how a sequential program to download images would look like:

import requests

def download_image(URL):

    image = requests.get(URL)
    filename = URL.split('/')[-1]

    with open(f'{filename}.png','wb') as f:
        f.write(image.content)


flickr_URLs = [
    'https://live.staticflickr.com/6022/5941812700_0f634b136e_b.jpg',
    'https://live.staticflickr.com/3379/3492581209_485d2bfafc_b.jpg',
    'https://live.staticflickr.com/7309/27729662905_e896a3f604_b.jpg',
    'https://live.staticflickr.com/8479/8238430093_eb19b654e0_b.jpg',
    'https://live.staticflickr.com/5064/5618934898_659bc060cd_b.jpg',
    'https://live.staticflickr.com/3885/14877957549_ccb7e55494_b.jpg',
    'https://live.staticflickr.com/5473/11720191564_76f3f56f12_b.jpg',
    'https://live.staticflickr.com/2837/13546560344_835fc79871_b.jpg',
    'https://live.staticflickr.com/140/389494506_55bcdc3664_b.jpg',
    'https://live.staticflickr.com/5597/15253681909_0cc91c77d5_b.jpg',
]

for url in flickr_URLs:
    download_image(url)

This program does the job but we spent a big chunk of time waiting for the Flickr servers to respond after every request. When scaling this program to 5000 images, this "waiting time" becomes humongous.

Also, the program then writes the downloaded image to disk, while we're waiting for this operation to be finished before our for loop sends the next request.

However, rather than waiting for these operations (i.e. getting response from server and writing that response to disk) to finish, shouldn't we send a new request in the meantime? Once we receive some response from a previously sent request, we can write the corresponding image to the disk. By doing this we will not let the latency block the execution.

We can achieve this by starting a new "thread", along with the main thread using built-in Python module threading. The main thread is responsible for the execution of the program, and our new thread will not block the main thread or other threads.

Here’s how you create a thread:

thread = threading.Thread(download_image, args=[url])
thread.start()

Concurrency in python

So that's one thread. We need to create many threads, so we'll just loop over. Here's what's the multi-threaded version of this program would look like:

import threading
import requests

def download_image(URL):
    ...

flickr_URLs = [...]

threads = []
for url in flickr_URLs:
    thread = threading.Thread(target=download_image, args=[url])
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

Note that we're joining all the threads we created to the main thread. This will stop the program from getting terminated — until the joined thread has been completed.

In this program we've created 20 threads =len(flickr_URLs). But can we create more if we've more images, like in this case where we've 5000 images? How many threads are too many?

Before answering this question, it's important to know that these threads are not running simultaneously. It might sound counterintuitive, since we're talking about finishing operations without waiting for other operations to finish.

Concurrency in python

As you can see the juggler is never handling more than one ball simultaneously. Instead, he is creating an illusion of doing so. In reality, he's just handling one ball at a time while the other ball is waiting.

That's exactly how our concurrent program is working. At any given time, only one thread is running, while the other thread is waiting for response from Flickr server or a downloaded file is being written to the disk.

Note

When I say Python can only run one thread at a time, I'm talking about the standard implementation, CPython. There are some implementations like PyPy and IronPython where multiple threads can be running simultaneously.

This continuous switching between threads — deciding which thread to run at what time is handled by the OS. Hence, while increasing the thread count will make things faster till a point, it will become very expensive for the OS to manage after a limit.

With a simple if-statement, we can limit the number of maximum threads, while still downloading all the images.

MAX_THREADS = 10
threads = []
for url in flickr_URLs:
    if len(threads)>MAX_THREADS:
        for thread in threads:
            thread.join()

        threads = []

    filename = url.split('/')[-1]   
    thread = threading.Thread(target=download_image, args=[url])
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

However, there's a modern and simpler way of implementing this limit, which we will see later in this article.

I/O-bound vs CPU-bound tasks

We need to understand that when running the sequential version (simple for loop calling the function n number of times), the delay was caused by latency in I/O (getting response from server or writing data to hard disk). It was NOT bottlenecked by the processing power of your CPU. If you had a better CPU, it won't change anything.

This is an example of I/O bound task, where we're waiting for something to finish up, and trying to fill that wait time with something else — it's very similar to how we cook. You need a hot oven in which you'll put raw cookie. So while you are preheating the oven, you can utilise this time for cutting out cookies from dough.

That's concurrency.

But what if the performance is bottlenecked by the CPU (processing power) instead of networking or I/O?

To interpret this, suppose you're doing the dishes. Can you apply the concept of concurrency here? First pick the chopping board and start washing it, now quickly switch to the bowl, pour some water, and again switch back to the chopping board. We also have knife, so leave the board and start washing the knife. Quite inefficient, right?

At best, it won’t make any difference to the execution time of this task. And in most of the cases, doing this will make the process slower as you are taking some time in switching back and forth to different utensils. Yes, multithreading can actually slow down your program execution when implemented in wrong situations.

That brings us to parallelism.

Parallelism

If you’re looking to expedite this task, you need manpower. Some friend who washes the bowl while you cleanse knives and forks — more friends, the better.

This is how a CPU bound task looks like, there is no utilising-of-waiting-time here, you just need more CPU cores/processing power, and it does it for you.

Let's implement this in Python.

Consider the function productOfPrimes such that productOfPrimes(10)=210 (product of 2, 3, 5, 7), which is pretty heavy on the CPU.

import time

def productOfPrimes(n):

    ALL_PRIMES_UPTO_N = []

    for i in range(2, n):

        PRIME = True
        for j in range(2, int(i/2)+1):
            if i%j==0:
                PRIME = False
                break

        if PRIME:
            ALL_PRIMES_UPTO_N.append(i)


    print(f"{len(ALL_PRIMES_UPTO_N)} PRIME NUMBERS FOUND")

    product = 1
    for prime in ALL_PRIMES_UPTO_N:
        product = product*prime

    return product

init_time = time.time()
LIMITS = [50328, 22756, 39371, 44832]

for lim in LIMITS: 
    productOfPrimes(lim)

end_time = time.time()
print(f"TIME TAKEN: {fin_time-init_time}")

The execution took ~10 seconds on 1.1 GHz quad core Intel i5 processor. However, out of 4 cores, we just used one.

To manage multiple cores and processes, we'll use built-in multiprocessing. Syntactically, it’s quite similar to how we started threads:

import multiprocessing

def productOfPrimes():
    …

if __name__ == "__main__":  
    processes = []
    LIMITS = [50328,22756,39371,44832] 

    for lim in LIMITS:
        process = multiprocessing.Process(target=productOfPrimes, args=[lim])
        process.start()
        processes.append(process)

    for process in processes:
        process.join()

The above program does the same thing in little over 4 seconds, because it leveraged multiple cores available on the machine.

Note

We've intentionally put if __name__ == "__main__" to avoid a RuntimeError we get on many non-linux systems. This is because the processes created by the multiprocessing module (unlike multithreading) import your main module, and since your main has instructions to spawn new processes, every new process will keep doing that, creating a never-ending loop, and eventually throwing RuntimeError.

Now let's talk a bit about max number of processes we can create. How many processes can we spawn more than 4 processes on a quad-core CPU simultaneously? Shouldn't it be only 4? Yes, but that doesn't mean the OS can't hold more than 4 processes in memory. Hence, if you spawn 20 processes, you won't get any error like "NoMoreCoresLeftError". The OS will just manage these 20 processes over whatever cores are available.

Run this bash command to see the number of processes running on your system:

ps -e | wc -l

Pool of threads and processes

We’ve seen how we can implement the concepts of concurrency and parallelism using threading and multiprocessing modules. However, we’ve a sexier, more Pythonic way of doing this using the concurrent.futures module (ships with Python).

import concurrent.futures

def download_image(URL):
    ...

flickr_URLs = [...]
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(download_image, flickr_URLs)

for result in results:
        print(result)

We can apply the same syntactical sugar with ProcessPoolExecutor:

import concurrent.futures

def productOfPrimes(n):
    ...

LIMITS = [...]

if __name__ == "__main__": 
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(productOfPrimes, LIMITS)

for result in results:
    print(result)

Threads, processes and the OS

Technically, threads run inside a process. When we create 4 threads, they share the same process, thus the same memory, and a lot of other OS level stuff (process control block, address space, etc.). The same is not true for processes. Each process has memory space of its own and run independently.

On the other hand, processes can run simultaneously — unlike in multithreading where the OS just keeps switching over and over to manage latency inside the same process.

Conclusion

We’ve seen how we can implement concurrency and parallelism in Python which are fundamentally very different, and have use cases of their own. As a general rule, use multithreading for I/O bound tasks and multiprocessing for CPU bound tasks. When not sure, use multiprocessing. There are more things to talk about like problems with threading, GIL, asynchronicity, etc. which are not in the scope of this article.

Further Readings and Attributions


Permanent link to this article
pygs.me/003

Built with using FastAPI