Parallelizing a for loop in python

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



Parallelizing a for loop in python



I have a dictionary where each key (date) contains a table (multiple lists of the format[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], .... I want to transform it into a DataFrame; this is done with the following code:


[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ...


df4 = pd.DataFrame(columns=sorted(set_days))

for date in dic.keys():
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1



This code works fine, but it takes more than two hours to run.
After some research, I've realized I could parallelize it via the multiprocessing library; the following code is the intended parallel version


multiprocessing


import multiprocessing

def func(date):
global df4, dic
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1

multiprocessing.Pool(processes=8).map(func, dic.keys())



The problem with this code is that, after executing multiprocessing.Pool(processes..., the df4 DataFrame is empty.


multiprocessing.Pool(processes...


df4



Any help would be much appreciated.



Example



Suppose the dictionary contains two days:


dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]

dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]



then the DataFrame should be of the form:


df4.loc[:, 1:50]
1 2 3 4 5 ... 46 47 48 49 50
20030812 24.25 NaN NaN NaN NaN ... NaN NaN NaN NaN 22.85
20030813 24.23 NaN NaN NaN NaN ... NaN NaN NaN 22.74 NaN



Also,


dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])

df1.head().to_dict()
Out:
{1: '20030812': 24.25, '20030813': 24.23,
2: '20030812': nan, '20030813': nan,
3: '20030812': nan, '20030813': nan,
4: '20030812': nan, '20030813': nan,
5: '20030812': nan, '20030813': nan,
6: '20030812': nan, '20030813': nan,
7: '20030812': nan, '20030813': nan,
8: '20030812': nan, '20030813': nan,
9: '20030812': nan, '20030813': nan,
10: '20030812': nan, '20030813': nan,
11: '20030812': nan, '20030813': nan,
12: '20030812': nan, '20030813': nan,
13: '20030812': nan, '20030813': nan,
14: '20030812': nan, '20030813': nan,
15: '20030812': nan, '20030813': nan,
16: '20030812': nan, '20030813': nan,
17: '20030812': nan, '20030813': nan,
18: '20030812': nan, '20030813': nan,
19: '20030812': nan, '20030813': 23.4,
20: '20030812': 23.54, '20030813': nan,
21: '20030812': nan, '20030813': nan,
22: '20030812': nan, '20030813': nan,
23: '20030812': nan, '20030813': nan,
24: '20030812': nan, '20030813': nan,
25: '20030812': nan, '20030813': nan,
26: '20030812': nan, '20030813': nan,
27: '20030812': nan, '20030813': nan,
28: '20030812': nan, '20030813': nan,
29: '20030812': nan, '20030813': nan,
30: '20030812': 23.13, '20030813': 22.97,
31: '20030812': nan, '20030813': nan,
32: '20030812': nan, '20030813': nan,
...





I think this might be an XY problem. Maybe you dont need the loop at all, and neither a multiprocessing pool. What is your df.head().to_dict() and what is your dic.keys? (post a sample)
– RafaelC
Aug 12 at 13:47


df.head().to_dict()


dic.keys





Are you neglecting val2 on purpose? Also, is changing how you are getting the dict an option?
– kabanus
Aug 13 at 7:24



val2


dict




2 Answers
2



To answer your original question (roughly: "Why is the df4 DataFrame empty?"), the reason this doesn't work is that when the Pool workers are launched, each worker inherits a personal copy-on-write view of the parent's data (either directly if multiprocessing is running on a UNIX-like system with fork, or via a kludgy approach to simulate it when running on Windows).


df4


Pool


multiprocessing


fork



Thus, when each worker does:


df4.loc[date, days] = val1



it's mutating the worker's personal copy of df4; the parent process's copy remains untouched.


df4



In general, there are three ways to handle this:



Change your worker function to return something that can be used in the parent process. For example, instead of trying to perform in-place mutation with df4.loc[date, days] = val1, return what's necessary to do it in the parent, e.g. return date, days, val1, then change the parent to:


df4.loc[date, days] = val1


return date, days, val1


for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()):
df4.loc[date, days] = val



Downside to this approach is that it requires each of the return values to be pickled (Python's version of serialization), piped from child to parent, and unpickled; if the worker task doesn't do very much work, especially if the return values are large (and in this case, that seems to be the case), it can easily spend more time on serialization and IPC than it gains in parallelism.



Using shared object/memory (demonstrated in this answer to "Multiprocessing writing to pandas dataframe"). In practice, this usually doesn't gain you much, since stuff that isn't based on the more "raw" ctypes sharing using multiprocessing.sharedctypes is still ultimately going to end up needing to pipe data from one process to another; sharedctypes based stuff can get a meaningful speed boost though, since once mapped, shared raw C arrays are nearly as fast to access as local memory.


ctypes


multiprocessing.sharedctypes


sharedctypes



If the work being parallelized is I/O bound, or uses third party C extensions for CPU bound work (e.g. numpy), you may be able to get the required speed boosts from threads, despite GIL interference, and threads do share the same memory. Your case doesn't appear to be either I/O bound or meaningfully dependent on third party C extensions which might release the GIL, so it probably won't help here, but in general, the simple way to switch from process-based parallelism to thread-based parallelism (when you're already using multiprocessing) is to change the import from:


numpy


multiprocessing


import


import multiprocessing



to


import multiprocessing.dummy as multiprocessing



which imports the thread-backed version of multiprocessing under the expected name, so code seamlessly switches from using processes to threads.


multiprocessing





Obviously, if you don't really need the parallelism, as your answer indicates, the correct solution is to fix the algorithmic choices that caused the slowdown and avoid pointless parallelism, I just figured explaining the cause of your problem might be helpful should you need to parallelize in the future.
– ShadowRanger
Aug 13 at 20:00



As RafaelC hinted, It was an XY problem.
I've been able to reduce the execution time to 20 seconds without multiprocessing.



I created a lista list that replaces the dictionary, and, rather than adding to the df4 DataFrame a row for each date, once the lista is full, I transform the lista into a DataFrame.


# Returns the largest day from all the dates (each date has a different number of days)
def longest_series(dic):
largest_series = 0
for date in dic.keys():
# get the last day's table of a specific date
current_series = dic[date][-1][0]
if largest_series < current_series:
largest_series = current_series
return largest_series


ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)

# creating lista list, lista is similar to dic
#The difference is that, in lista, every date has the same number of days
#i.e. from 1 to ls, and it does not contain the dates.

# It takes 15 seconds
lista = list()
for date in dic.keys():
present_days = list()
presen_values = list()
for day, val_252, _ in dic[date]:
present_days.append(day)
presen_values.append(val_252)

missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
missing_values = [None] * len(missing_days) # extra values added to date
all_days_index = list(np.argsort(present_days + missing_days)) # we need to preserve the order between days and values
all_day_values = presen_values + missing_values
lista.append(list(np.array(all_day_values)[all_days_index]))


# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard