Parallelizing a for loop in python
Clash Royale CLAN TAG#URR8PPP
Parallelizing a for loop in python
I have a dictionary where each key (date) contains a table (multiple lists of the format[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ...
. I want to transform it into a DataFrame; this is done with the following code:
[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ...
df4 = pd.DataFrame(columns=sorted(set_days))
for date in dic.keys():
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1
This code works fine, but it takes more than two hours to run.
After some research, I've realized I could parallelize it via the multiprocessing
library; the following code is the intended parallel version
multiprocessing
import multiprocessing
def func(date):
global df4, dic
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1
multiprocessing.Pool(processes=8).map(func, dic.keys())
The problem with this code is that, after executing multiprocessing.Pool(processes...
, the df4
DataFrame is empty.
multiprocessing.Pool(processes...
df4
Any help would be much appreciated.
Example
Suppose the dictionary contains two days:
dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]
dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]
then the DataFrame should be of the form:
df4.loc[:, 1:50]
1 2 3 4 5 ... 46 47 48 49 50
20030812 24.25 NaN NaN NaN NaN ... NaN NaN NaN NaN 22.85
20030813 24.23 NaN NaN NaN NaN ... NaN NaN NaN 22.74 NaN
Also,
dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])
df1.head().to_dict()
Out:
{1: '20030812': 24.25, '20030813': 24.23,
2: '20030812': nan, '20030813': nan,
3: '20030812': nan, '20030813': nan,
4: '20030812': nan, '20030813': nan,
5: '20030812': nan, '20030813': nan,
6: '20030812': nan, '20030813': nan,
7: '20030812': nan, '20030813': nan,
8: '20030812': nan, '20030813': nan,
9: '20030812': nan, '20030813': nan,
10: '20030812': nan, '20030813': nan,
11: '20030812': nan, '20030813': nan,
12: '20030812': nan, '20030813': nan,
13: '20030812': nan, '20030813': nan,
14: '20030812': nan, '20030813': nan,
15: '20030812': nan, '20030813': nan,
16: '20030812': nan, '20030813': nan,
17: '20030812': nan, '20030813': nan,
18: '20030812': nan, '20030813': nan,
19: '20030812': nan, '20030813': 23.4,
20: '20030812': 23.54, '20030813': nan,
21: '20030812': nan, '20030813': nan,
22: '20030812': nan, '20030813': nan,
23: '20030812': nan, '20030813': nan,
24: '20030812': nan, '20030813': nan,
25: '20030812': nan, '20030813': nan,
26: '20030812': nan, '20030813': nan,
27: '20030812': nan, '20030813': nan,
28: '20030812': nan, '20030813': nan,
29: '20030812': nan, '20030813': nan,
30: '20030812': 23.13, '20030813': 22.97,
31: '20030812': nan, '20030813': nan,
32: '20030812': nan, '20030813': nan,
...
df.head().to_dict()
dic.keys
Are you neglecting
val2
on purpose? Also, is changing how you are getting the dict
an option?– kabanus
Aug 13 at 7:24
val2
dict
2 Answers
2
To answer your original question (roughly: "Why is the df4
DataFrame empty?"), the reason this doesn't work is that when the Pool
workers are launched, each worker inherits a personal copy-on-write view of the parent's data (either directly if multiprocessing
is running on a UNIX-like system with fork
, or via a kludgy approach to simulate it when running on Windows).
df4
Pool
multiprocessing
fork
Thus, when each worker does:
df4.loc[date, days] = val1
it's mutating the worker's personal copy of df4
; the parent process's copy remains untouched.
df4
In general, there are three ways to handle this:
Change your worker function to return something that can be used in the parent process. For example, instead of trying to perform in-place mutation with df4.loc[date, days] = val1
, return what's necessary to do it in the parent, e.g. return date, days, val1
, then change the parent to:
df4.loc[date, days] = val1
return date, days, val1
for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()):
df4.loc[date, days] = val
Downside to this approach is that it requires each of the return values to be pickled (Python's version of serialization), piped from child to parent, and unpickled; if the worker task doesn't do very much work, especially if the return values are large (and in this case, that seems to be the case), it can easily spend more time on serialization and IPC than it gains in parallelism.
Using shared object/memory (demonstrated in this answer to "Multiprocessing writing to pandas dataframe"). In practice, this usually doesn't gain you much, since stuff that isn't based on the more "raw" ctypes
sharing using multiprocessing.sharedctypes
is still ultimately going to end up needing to pipe data from one process to another; sharedctypes
based stuff can get a meaningful speed boost though, since once mapped, shared raw C arrays are nearly as fast to access as local memory.
ctypes
multiprocessing.sharedctypes
sharedctypes
If the work being parallelized is I/O bound, or uses third party C extensions for CPU bound work (e.g. numpy
), you may be able to get the required speed boosts from threads, despite GIL interference, and threads do share the same memory. Your case doesn't appear to be either I/O bound or meaningfully dependent on third party C extensions which might release the GIL, so it probably won't help here, but in general, the simple way to switch from process-based parallelism to thread-based parallelism (when you're already using multiprocessing
) is to change the import
from:
numpy
multiprocessing
import
import multiprocessing
to
import multiprocessing.dummy as multiprocessing
which imports the thread-backed version of multiprocessing
under the expected name, so code seamlessly switches from using processes to threads.
multiprocessing
Obviously, if you don't really need the parallelism, as your answer indicates, the correct solution is to fix the algorithmic choices that caused the slowdown and avoid pointless parallelism, I just figured explaining the cause of your problem might be helpful should you need to parallelize in the future.
– ShadowRanger
Aug 13 at 20:00
As RafaelC hinted, It was an XY problem.
I've been able to reduce the execution time to 20 seconds without multiprocessing.
I created a lista list that replaces the dictionary, and, rather than adding to the df4 DataFrame a row for each date, once the lista is full, I transform the lista into a DataFrame.
# Returns the largest day from all the dates (each date has a different number of days)
def longest_series(dic):
largest_series = 0
for date in dic.keys():
# get the last day's table of a specific date
current_series = dic[date][-1][0]
if largest_series < current_series:
largest_series = current_series
return largest_series
ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)
# creating lista list, lista is similar to dic
#The difference is that, in lista, every date has the same number of days
#i.e. from 1 to ls, and it does not contain the dates.
# It takes 15 seconds
lista = list()
for date in dic.keys():
present_days = list()
presen_values = list()
for day, val_252, _ in dic[date]:
present_days.append(day)
presen_values.append(val_252)
missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
missing_values = [None] * len(missing_days) # extra values added to date
all_days_index = list(np.argsort(present_days + missing_days)) # we need to preserve the order between days and values
all_day_values = presen_values + missing_values
lista.append(list(np.array(all_day_values)[all_days_index]))
# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
I think this might be an XY problem. Maybe you dont need the loop at all, and neither a multiprocessing pool. What is your
df.head().to_dict()
and what is yourdic.keys
? (post a sample)– RafaelC
Aug 12 at 13:47