Python ThreadPoolExecutor: Use Cases for Parallel Processing

Abdul R. Wahab
6 min readApr 29

--

Background

In many applications, there are times we need to perform tasks that take a lot of time to execute.

These tasks might include something like: downloading data from an internet source, processing large sets of data, or performing complex computations.

To handle such tasks efficiently, we need a way to run them in parallel, so that they can be executed simultaneously and the overall execution time can be reduced. This is where Python’s ThreadPoolExecutor module comes in.

ThreadPoolExecutor is a built-in Python module that allows us to create a pool of threads to execute tasks in parallel.

In this segment, we will explore the ThreadPoolExecutor module in detail, including its use cases, functionality, and examples.

Source: https://www.guyrking.com/

What is the ThreadPoolExecutor?

The ThreadPoolExecutor is a Python module that provides a high-level interface for managing a pool of worker threads.

It allows us to submit tasks to the pool, which are then executed by one of the worker threads in the pool. ThreadPoolExecutor uses a fixed-size pool of threads, which means that the number of worker threads is specified when the pool is created and cannot be changed dynamically.

ThreadPoolExecutor provides several advantages over manually creating and managing threads:

  1. It abstracts away the details of thread creation and management, making it easier to write threaded code.
  2. It provides a way to limit the number of threads used for a particular task, which can help prevent resource exhaustion and improve performance.
  3. It provides a way to cleanly shut down the thread pool once all tasks have been completed.

Use Cases

The ThreadPoolExecutor is useful in a wide range of applications where parallelism is required.

A few common use cases include:

Network I/O

When downloading data from the internet, the main bottleneck is often the time it takes to establish a connection and transfer the data.

By using ThreadPoolExecutor, we can download multiple files simultaneously, which can significantly reduce the overall download time.

CPU-intensive / bound tasks

When performing complex computations, the CPU is often the bottleneck.

By using ThreadPoolExecutor, we can distribute the workload across multiple threads, which can improve performance and reduce execution time.

Front-end / UI applications

UI applications often need to perform time-consuming tasks such as loading data or generating reports. By using ThreadPoolExecutor, we can run these tasks in the background, without blocking the UI thread, which can improve responsiveness and user experience.

Functionality

The ThreadPoolExecutor provides several functions for submitting tasks to the thread pool and managing the pool itself.

The main functions are:

  1. submit(fn, *args, **kwargs): This function submits a task to the thread pool. The first argument is the function to be executed, followed by any arguments and keyword arguments that the function requires. The function returns a Future object, which represents the result of the computation. The Future object can be used to check whether the computation has completed, retrieve the result of the computation, or handle any exceptions that occurred during the computation.
  2. map(fn, *iterables): This function submits a batch of tasks to the thread pool, one for each element of the input iterables. The function returns an iterator that yields the results of the computations as they become available. Note: The order of the results may not match the order of the inputs, since the tasks are executed in parallel.
  3. shutdown(wait=True): This function shuts down the thread pool, stopping all worker threads. By default, it waits for all tasks to complete before shutting down. If wait=False, it immediately shuts down the thread pool, discarding any unfinished tasks.

Code Examples

Let’s look at some examples to see how ThreadPoolExecutor can be implemented in practice.

Example 1: Downloading files from the internet

In this sample, we will download multiple files from the internet using ThreadPoolExecutor.

We will create a pool of 5 worker threads and submit each download task to the pool using the submit() function.

import requests
import concurrent.futures

urls = [
"https://www.example.com/file1.txt",
"https://www.example.com/file2.txt",
"https://www.example.com/file3.txt",
"https://www.example.com/file4.txt",
"https://www.example.com/file5.txt"
]

def download(url):
response = requests.get(url)
return response.content

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

# Submit each download task to the thread pool
futures = [executor.submit(download, url) for url in urls]

# Wait for all tasks to complete and retrieve the results
results = [future.result() for future in concurrent.futures.as_completed(futures)]

# Print the contents of each file
for result in results:
print(result.decode())

In this example, we:

  1. Define a list of URLs that we want to download.
  2. Define a function download(url) that downloads the content of the specified URL using the requests module. Inside the with statement, we create a ThreadPoolExecutor with a maximum of 5 worker threads.
  3. We then submit each download task to the thread pool using the submit() function and store the resulting Future objects in a list called futures.
  4. We then use the as_completed() function to wait for all tasks to complete and retrieve the results. The as_completed() function returns an iterator that yields futures as they complete.
  5. We use a list comprehension to iterate over the iterator and retrieve the result of each future.
  6. Finally, we print the contents of each file to the console.

Example 2: Computing Fibonacci numbers

In this example, we will compute the first 20 Fibonacci numbers using ThreadPoolExecutor.

We will create a pool of 4 worker threads and submit each computation task to the pool using the submit() function.

import concurrent.futures

def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n-1) + fibonacci(n-2)

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# Submit each computation task to the thread pool
futures = [executor.submit(fibonacci, n) for n in range(20)]

# Wait for all tasks to complete and retrieve the results
results = [future.result() for future in concurrent.futures.as_completed(futures)]

# Print the results
print(results)

In this example, we:

  1. Define a function fibonacci(n) that computes the nth Fibonacci number recursively. Inside the with statement, we create a ThreadPoolExecutor with a maximum of 4 worker threads.
  2. We then submit each computation task to the thread pool using the submit() function and store the resulting Future objects in a list called futures.
  3. We then use the as_completed() function to wait for all tasks to complete and retrieve the results.
  4. We use a list comprehension to iterate over the iterator and retrieve the result of each future.
  5. Finally, we print the results to the console.

Unit Tests

It is important to test our code to ensure that it works as expected and to catch any bugs.

Let’s look at some unit tests for our examples to ensure that they work correctly.

import unittest
import concurrent.futures
import main # make sure to replace with the name of the file containing the examples

class TestThreadPoolExecutor(unittest.TestCase):
def test_download_files(self):
urls = [
"https://www.example.com/file1.txt",
"https://www.example.com/file2.txt",
"https://www.example.com/file3.txt",
"https://www.example.com/file4.txt",
"https://www.example.com/file5.txt"
]

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

# Submit each download task to the thread pool
futures = [executor.submit(main.download, url) for url in urls]

# Wait for all tasks to complete and retrieve the results
results = [future.result() for future in concurrent.futures.as_completed(futures)]

# Verify that we have downloaded the correct content
self.assertEqual(len(results), 5)
self.assertIn(b"File 1 content", results[0])
self.assertIn(b"File 2 content", results[1])
self.assertIn(b"File 3 content", results[2])
self.assertIn(b"File 4 content", results[3])
self.assertIn(b"File 5 content", results[4])


def test_compute_fibonacci(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# Submit each computation task to the thread pool
futures = [executor.submit(main.fibonacci, n) for n in range(20)]

# Wait for all tasks to complete and retrieve the results
results = [future.result() for future in concurrent.futures.as_completed(futures)]

# Verify that we have computed the correct Fibonacci numbers
expected_results = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]
self.assertEqual(results, expected_results)

if name == "main":
unittest.main()

Closing thoughts 👏

Pretty much, ThreadPoolExecutor is a powerful tool for concurrent programming in Python.

It provides a simple and easy-to-use interface for creating a pool of worker threads and executing tasks in parallel.

Hope that this segment has provided a useful introduction to ThreadPoolExecutor in Python and that you will find it helpful in your own programming projects.

Thanks for following along. Feel free to comment below with any questions / comments.

--

--

Abdul R. Wahab

Multi-domain Technical Lead specialized in building products users love. Today, I manage & secure big data in the AWS cloud. All views shared are my own.