How do I parallelize a simple Python loop?

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



How do I parallelize a simple Python loop?



This is probably a trivial question, but how do I parallelize the following loop in python?


# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)

# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)



I know how to start single threads in Python but I don't know how to "collect" the results.



Multiple processes would be fine too - whatever is easiest for this case. I'm using currently Linux but the code should run on Windows and Mac as-well.



What's the easiest way to parallelize this code?




9 Answers
9



Using multiple threads on CPython won't give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing module instead:


multiprocessing


pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))



Note that this won't work in the interactive interpreter.



To avoid the usual FUD around the GIL: There wouldn't be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.





Since this is the chosen answer, is it possible to have a more comprehensive example? What are the arguments of calc_stuff?
– Eduardo Pignatelli
Apr 11 at 15:28


calc_stuff





@EduardoPignatelli Please just read the documentation of the multiprocessing module for more comprehensive examples. Pool.map() basically works like map(), but in parallel.
– Sven Marnach
Apr 11 at 16:30


multiprocessing


Pool.map()


map()





Is there a way to simply add in a tqdm loading bar to this structure of code? I've used tqdm(pool.imap(calc_stuff, range(0, 10 * offset, offset))) but I don't get a full loading bar graphic.
– user8188120
Jul 5 at 13:35






@user8188120 I've never heard of tqdm before, so sorry, I can't help with that.
– Sven Marnach
Jul 6 at 14:15



To parallelize a simple for loop, joblib brings a lot of value to raw use of multiprocessing. Not only the short syntax, but also things like transparent bunching of iterations when they are very fast (to remove the overhead) or capturing of the traceback of the child process, to have better error reporting.



Disclaimer: I am the original author of joblib.





While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - From Review
– Liam
Feb 9 at 14:38





And as a library developer, I respectfully disagree: our documentation is kept to date as the library and the ecosystem evolve, while we cannot control the amount of outdated information that there is on stackoverflow.
– Gael Varoquaux
Feb 11 at 13:45





I tried joblib with jupyter, it is not working. After the Parallel-delayed call, the page stopped working.
– Jie
May 23 at 18:13





Hi, I have a problem using joblib (stackoverflow.com/questions/52166572/…), do you have any clue what may be the cause? Thanks very much.
– Ting Sun
Sep 5 at 3:08



What's the easiest way to parallelize this code?



I really like concurrent.futures for this, available in Python3 since version 3.2 - and via backport to 2.6 and 2.7 on PyPi.


concurrent.futures



You can use threads or processes and use the exact same interface.



Put this in a file - futuretest.py:


import concurrent.futures
import time, random # add some random sleep time

offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter

def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)

def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes

# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: repr(output1)')
print(f'total time to execute sum(output2) = sum(repr(output2))')
print(f'time saved by parallelizing: sum(output2) - (finish-start)')
print(f'returned in order given: repr(output3)')

if __name__ == '__main__':
main()



And here's the output:


$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]



Now change ProcessPoolExecutor to ThreadPoolExecutor, and run the module again:


ProcessPoolExecutor


ThreadPoolExecutor


$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]



Now you have done both multithreading and multiprocessing!



Sampling is far too small to compare the results.



However, I suspect that multithreading will be faster than multiprocessing in general, especially on Windows, since Windows doesn't support forking so each new process has to take time to launch. On Linux or Mac they'll probably be closer.



You can nest multiple threads inside multiple processes, but it's recommended to not use multiple threads to spin off multiple processes.





does ThreadPoolExecutor bypass the limitations imposed by GIL? also wouldnt you need to join() in order to wait for the executors to finish or is this taken care of implicitly inside the context manager
– PirateApp
Apr 19 at 5:05






No and no, yes to "handled implicitly"
– Aaron Hall
Apr 19 at 14:59



why dont you use threads, and one mutex to protect one global list?


import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()


threads =
output =
mutex = thread.allocate_lock()

for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()

for t in threads:
t.join()

#here you have output list filled with data



keep in mind, you will be as fast as your slowest thread





I know this is a very old answer, so it's a bummer to get a random downvote out of nowhere. I only downvoted because threads won't parallelize anything. Threads in Python are bound to only one thread executing on the interpreter at a time because of the global interpreter lock, so they support concurrent programming, but not parallel as OP is requesting.
– skrrgwasme
Mar 3 '17 at 7:12






@skrrgwasme I know you know this, but when you use the words "they won't parallelize anything", that might mislead readers. If the operations take a long time because they are IO bound, or sleeping while they wait for an event, then the interpreter is freed up to run the other threads, so this will result in the speed increase people are hoping for in those cases. Only CPU bound threads are really affected by what skrrgwasme says.
– Jonathan Hartley
Sep 11 '17 at 16:23





Probably the threaded version will run slower in that case.
– user1767754
Jan 18 at 9:00



This could be useful when implementing multiprocessing and parallel/ distributed computing in Python.



YouTube tutorial on using techila package



Techila is a distributed computing middleware, which integrates directly with Python using the techila package. The peach function in the package can be useful in parallelizing loop structures. (Following code snippet is from the Techila Community Forums)


techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)





While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes.
– S.L. Barth
Oct 22 '15 at 9:29





@S.L.Barth thank you for the feedback. I added a small sample code to the answer.
– TEe
Oct 22 '15 at 12:26



from joblib import Parallel, delayed
import multiprocessing

inputs = range(10)
def processInput(i):
return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)



The above works beautifully on my machine (Ubuntu, package joblib was pre-installed, but can be installed via pip install joblib).


pip install joblib



Taken from https://blog.dominodatalab.com/simple-parallelization/



There are a number of advantages to using Ray:



In your case, you could start Ray and define a remote function


import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3



and then invoke it in parallel


output1, output2, output3 = , ,

# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)



To run the same example on a cluster, the only line that would change would be the call to ray.init(). The relevant documentation can be found here.



Note that I'm helping to develop Ray.



very simple example of parallel processing is


from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()

def yourfunction():

for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)

# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()



Have a look at this;



http://docs.python.org/library/queue.html



This might not be the right way to do it, but I'd do something like;



Actual code;


from multiprocessing import Process, JoinableQueue as Queue

class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes =
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()



Hope that helps.






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard