Django.fun

map() против submit() с ProcessPoolExecutor в Python

Use map() when converting a for-loop to use processes and use submit() when you need more control over asynchronous tasks when using the ProcessPoolExecutor in Python.

In this tutorial you will discover the difference between map() and submit() when executing tasks with the ProcessPoolExecutor in Python.

Let’s get started.

Table of Contents

Use map() to Execute Tasks with the ProcessPoolExecutor

Use map() to convert a for-loop to use processes.

Perhaps the most common pattern when using the ProcessPoolExecutor is to convert a for-loop that executes a function on each item in a collection to use processes.

It assumes that the function has no side effects, meaning it does not access any data outside of the function and does not change the data provided to it. It takes data and produces a result.

These types of for loops can be written explicitly in Python, for example:

...

# apply a function to each element in a collection

for item in mylist:

result = task(item)

A better practice is to use the built-in map() function that applies the function to each item in the iterable for you.

...

# apply the function to each element in the collection

results = map(task, mylist)

The built-in map() function does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:

...

# iterate the results from map

for result in results:

print(result)

Therefore, it is common to see this operation in a for-loop idiom as follows:

...

# iterate the results from map

for result in map(task, mylist):

print(result)

We can perform this same operation using the process pool, except each call of the function with an item in the iterable is a task that is executed asynchronously using processes.

For example:

...

# iterate the results from map

for result in executor.map(task, mylist):

print(result)

Like the built-in map() function, the ProcessPoolExecutor map() function returns an iterable over the results returned by the target function applied to the provided iterable of items.

Although the tasks are executed asynchronously, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.

In this way, we can think of the ProcessPoolExecutor version of the map() function as an asynchronous version of the built-in map() function and is ideal if you are looking to update your for loop to use processes.

The example below demonstrates using the map() function with a task that will sleep a random amount of time less than one second and return the provided value.

# SuperFastPython.com

# example of the map and wait pattern for the ProcessPoolExecutor

from time import sleep

from random import random

from concurrent.futures import ProcessPoolExecutor

 

# custom task that will sleep for a variable amount of time

def task(name):

    # sleep for less than a second

    sleep(random())

    return name

 

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor(10) as executor:

        # execute tasks concurrently and process results in order

        for result in executor.map(task, range(10)):

            # retrieve the result

            print(result)

Running the example, we can see that the results are reported in the order that the tasks were created and sent into the process pool.

0

1

2

3

4

5

6

7

8

9

Like the built-in map() function, the ProcessPoolExecutor map() function can take more than one iterable. This means your function can take more than one argument.

...

# example of calling map with more than one iterable

for result in executor.map(task, mylist1, mylist2):

print(result)

Unlike the built-in map() function, the tasks are sent into the process pool immediately after calling map() instead of being executed in a lazy manner as results are requested.

Put another way, the tasks will execute and complete in their own time regardless of whether we execute the iterable of results returned by calling map().

...

# example of calling map and not iterating the results

_ = executor.map(task, mylist)

Now that we are familiar with the map() function, let’s take a look at the submit() function.

Use submit() to Execute Tasks with the ProcessPoolExecutor

Use submit() when you want more control over asynchronous tasks.

The submit() function will take the name of the target task function you wish to execute asynchronously as well as any arguments to the function. It will then return a Future object.

...

# submit a task to the process pool and get a future object

future = executor.submit(task, arg1, arg2)

The Future object can be kept and used to query the status of the asynchronous task, such as whether it is running(), done(), or has been cancelled().

...

# check if a task is running

if future.running():

# do something...

It can also be used to get the result() from the task when it is completed or the exception() if one was raised during the execution of the task.

...

# get the result from a task via it's future object

result = future.result()

The Future object can also be used to cancel() the task before it has started running and to add a callback function via add_done_callback() that will be executed once the task has completed.

...

# cancel the task if has not yet started running

if future.cancel():

print('Task was cancelled')

It is a common pattern to submit many tasks to a process pool and store the Future objects in a collection.

For example, it is common to use a list comprehension.

...

# create many tasks and store the future objects in a list

futures = [executor.submit(work) for _ in range(100)]

We can iterate the list of Future objects to get results in the order that the tasks were submitted, for example:

...

# get results from tasks in the order they were submitted

for future in futures:

# get the result

result = future.result()

Recall that the call to the result() function on the Future will not return until the task is done.

The collection of Future objects can then be handed off to utility functions provided by the concurrent.futures module, such as wait() and as_completed().

The wait() module function takes a collection of Future objects and by default will return all tasks that are done, although can be configured to return when any task raises an exception or is done.

...

# wait for all tasks to be done

wait(futures)

The as_completed() module function takes a collection of Future objects and will return the Future objects in the order that the tasks are completed as they are completed. This is instead of the order that they were submitted to the process pool, allowing your program to be more responsive.

...

# respond to tasks as they are completed

for future in as_completed(futures):

# get the result

result = future.result()

The processing of Future objects in the order they are completed may be the most common usage pattern of the submit() function with the ProcessPoolExecutor.

The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing results in the order that they were completed.

# SuperFastPython.com

# example of the submit and use as completed pattern for the ProcessPoolExecutor

from time import sleep

from random import random

from concurrent.futures import ProcessPoolExecutor

from concurrent.futures import as_completed

 

# custom task that will sleep for a variable amount of time

def task(name):

    # sleep for less than a second

    sleep(random())

    return name

 

# entry point

if __name__ == '__main__':

    # start the process pool

    with ProcessPoolExecutor(10) as executor:

        # submit tasks and collect futures

        futures = [executor.submit(task, i) for i in range(10)]

        # process task results as they are available

        for future in as_completed(futures):

            # retrieve the result

            print(future.result())

Running the example we can see that the results are retrieved and printed in the order that the tasks completed, not the order that the tasks were submitted to the process pool.

5

9

6

1

0

7

3

8

4

2

Now that we are familiar with how to use submit() to execute tasks in the ProcessPoolExecutor, let’s take a look at a comparison between map() and submit().

map() vs submit() with the ProcessPoolExecutor

Let’s compare the map() and submit() functions for the ProcessPoolExecutor.

Both the map() and submit() functions are similar in that they both allow you to execute tasks asynchronously using processes.

The map() function is simpler:

  • It is a multiprocess version of the built-in map() function.
  • It assumes you want to call the same function many times with different values.
  • It only takes iterables as arguments to the target function.
  • It only allows you to iterate results from the target function.

In an effort to keep your code simpler and easier to read, you should try to use map() first, before you try to use the submit() function.

The simplicity of the map() function means it is also limited:

  • It does not provide control over the order that task results are used.
  • It does not provide a way to check the status of tasks.
  • It does not allow you to cancel tasks before they start running.
  • It does not allow you control over how to handle an exception raised by a task function.

If the map() function is too restrictive, you may want to consider the submit() function instead.

The map() function gives more control:

  • It assumes you want to submit one task at a time.
  • It allows a different target function with variable number of arguments for each task.
  • It allows you to check on the status of each task.
  • It allows you to cancel a task before it has started running.
  • It allows callback functions to be called automatically when tasks are done.
  • It allows you to handle an exception raised by a target task function.
  • It allows you control over when you would like to get the result of the task, if at all.
  • It can be used with module functions like wait() and as_completed() to work with tasks in groups.

The added control provided when using submit() comes with added complexity:

  • It requires that you manage the Future object for each task.
  • It requires that you explicitly retrieve the result for each task.
  • It requires extra code if you need to apply the same function with different arguments.

Now that we have compared and contrasted the map() and submit() functions on the ProcessPoolExecutor, which one should you use?

Use map() if:

  • You are already using the built-in map() function.
  • You are calling a (near-)pure function in a for-loop for each item in an iterable.

Use submit() if:

  • You need to check the status of tasks while they are executing.
  • You need control over the order that you process results from tasks.
  • You need to conditionally cancel the execution of tasks.
  • You can simplify your code by using callback functions called when tasks are done.

So which are you going to use for your program?
Let me know in the comments below.

Takeaways

You now know when to use map() and submit() to execute tasks with the ProcessPoolExecutor.

https://superfastpython.com/processpoolexecutor-map-vs-submit/

Поделитесь с другими: