Sunday, September 8, 2024
Homepandaspandas multi-process concurrency and python acceleration

pandas multi-process concurrency and python acceleration

1. Background

Completing some operations on large-scale data often wastes a lot of time. In order to make full use of software and hardware resources, two mainstream optimization methods have evolved, namely “vectorization” and “parallelization”.

2. swifter

swifter is a package used to accelerate functions used on pandas DataFrame or Series. It uses a combination of “vectorization” and “parallelization” methods.
Install:

pip install -U pandas # upgrade pandas
pip install swifter # first time installation
pip install -U swifter # upgrade to latest version if already installed
or conda Install
conda install -c conda-forge swifter

2.1 A demo

import pandas as pd
import swifter

df = pd.DataFrame({'x': [1, 2, 3, 4], 'y': [5, 6, 7, 8]})
# runs on single core
df['x2'] = df['x'].apply(lambda x: x**2)
# runs on multiple cores
df['x2'] = df['x'].swifter.apply(lambda x: x**2)
# use swifter apply on whole dataframe
df['agg'] = df.swifter.apply(lambda x: x.sum() - x.min())

# use swifter apply on specific columns
df['outCol'] = df[['col1', 'col2']].swifter.apply(my_func)
df['outCol'] = df[['col1', 'col2', 'col3']].swifter.apply(my_func,
             positional_arg, keyword_arg=keyword_argval)

2.2 swifter efficiency improvement principle

  1. It will determine whether the function in apply can be vectorized. If it can, it will automatically select the vectorized function for application (this is the best effect at this time);

  2. If the apply function cannot be vectorized, the faster one between dask **parallel processing** and simple pandas apply will be automatically selected;

  3. In the group apply scenario, swifter can also achieve better results.

Note: Parallelization may not achieve the expected results on small-scale data sets, so parallelization operations are used as appropriate according to the application scenario, while vectorization can bring some performance regardless of the size of the data set. improvement.

You can see a characteristic of Swift, that is, regardless of the size of the data, the effect of using vectorization is almost always better; if the amount of data is small, then ordinary Pandas operations have the best speed until the data is large enough; once the threshold is exceeded, parallelism Processing will be faster.

3. Multi-process pandarallel

Pandallel integrates seamlessly with pandas and is a very friendly tool for implementing multi-threading.
Installation: pip3 install pandarallel

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from pandarallel import pandarallel
# shm_size_mb  The size of allocated memory space
# nb_workers  Number of cores called
pandarallel.initialize(nb_workers=10, use_memory_fs=False, progress_bar=True)

def func(x):
    return x**3
df = pd.DataFrame(np.random.rand(1000,1000))

transfer

# Process a row
df.parallel_apply(func, axis=1)  
# Process by column
df['col1'].parallel_apply(func)

The following original pandas methods have corresponding parallel implementations of pandarallel.

Reference: pandas apply several methods of parallel processing_parallel_apply-CSDN blog

4. joblib

from math import sqrt
from joblib import Parallel, delayed
def test():
    start = time.time()
    result = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
    # results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))    
    end = time.time()
    print(end-end)

11

5. multiprocessing

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(fun, df['col'])

multiprocessing.``cpu_count() # Return the number of CPUs in the system.

This number is different from the number of CPUs that the current process can use. The number of available CPUs can be obtained by the len(os.sched_getaffinity(0)) method.

6. modin

Modin's backend uses dask (dask is similar to the pandas library and can implement parallel reading and operation) or ray, which is a pandas-like library that supports distributed operation. It can optimize pandas by changing one line of code import modin.pandas as pd. The commonly used built-in read_csv, concat, and apply have good acceleration.

Note: The overhead of parallel processing can slow down processing of small data sets.

!pip install modin
import modin.pandas as pd

my_dict = {'a': np.random.randn(10000000),
           'b': np.random.randn(10000000),
           'N': np.random.randint(100, 10000, (10000000)),
           'x':  np.random.randint(1, 1000, (10000000))}
df = pd.DataFrame(my_dict)
df_new = pd.concat([df for _ in range(25)]) 

It takes 0.6s, while ordinary pandas takes 3s, which is 5 times faster!

7. python acceleration

7.1 numexpr

numexpr is a performance optimization for NumPy calculations. It is simple to use. You only need to enclose the original numpy statement in double quotes and call it using the evaluate method. From experience, if there are tens of thousands of data + using numexpr is more effective. For simple operations, using numexpr may be slower.

import numexpr as ne
a = np.linspace(0,1000,1000) 
ne.evaluate('a**10') 

Compared with numpy, it can be 5 times faster.

7.2 numbers

numba uses the industry-standard LLVM compiler library to convert Python functions into optimized machine code at runtime. Numerical algorithms compiled by numba in Python can approach the speed of C or FORTRAN. numba is very simple. Its built-in function itself is a decorator. You only need to add an @nb. method in front of the function you define. It is simple and fast!

The numba engine performs well when processing millions+ massive data points.

# pip install numba
import numba as nb
import numpy as np

# usenumbaaccelerated summation function
@nb.jit()
def nb_sum(a):
    Sum = 0
    for i in range(len(a)):
        Sum += a[i]
    return Sum

a = np.linspace(0, 1000, 1000) # Create a length of1000array of
nb_sum(a) 

numba is even 5 times faster than numpy, which is said to be closest to the C language speed, and is hundreds of times faster for python summing.

In addition, Numba also supports GPU acceleration and vector acceleration methods, which can further achieve higher performance.

from numba import cuda
cuda.select_device(1)

@cuda.jit
def CudaSquare(x):
    i, j = cuda.grid(2)
    x[i][j] *= x[i][j]

# numbavectorization acceleration
from math import sin
@nb.vectorize()
def nb_vec_sin(a):
    return sin(a)

7.3 cupy

CuPy is a library that implements Numpy arrays on NVIDIA GPUs with the help of the CUDA GPU library. Based on the implementation of Numpy arrays, the multiple CUDA cores of the GPU itself can promote better parallel acceleration.

# pip install cupy
import numpy as np
import cupy as cp
x_gpu = cp.ones((1000,1000,1000)) 

CuPy has achieved 10.5 times acceleration. As the amount of data increases sharply, CuPy's performance improvement will be more obvious.

7.4 Cython optimization

Cython is a Python compiler based on C language. In some computationally intensive programs, Cython can achieve considerable acceleration. By adding the Cython magic function %load_ext Cython to Ipython, the following example can be doubled in speed. With the help of more advanced cython statements, it can still be dozens or hundreds of times faster than Python.

%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

8. Other tips for using pandas

Reference: https://pandas.pydata.org/pandas-docs/stable/user_guide/enhancingperf.html

8.1 Row-by-row iterative optimization

In the new version of pandas, the itertuples function is used to iterate the dataframe row by row instead of the iterrows function, which can be dozens of times faster.

res = []
for row in df.itertuples():
    temp = getattr(row, 'a')
    res.append(temp*temp)
df['a2'] = res

for index,row in df.iterrows():
    temp = row['a']
    a2.append(temp*temp)
df['a2'] = res 

8.2 apply, applymap optimization

When performing similar operations on each row, processing row by row in a loop is inefficient. At this time, you can use apply or applymap with function operations. Apply can be used for line-by-line calculations, while applymap can do more fine-grained element-by-element calculations.

# Lista、ListbCalculate a function line by line
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)
# Keep two decimal places element by element
df.applymap(lambda x: "%.2f" % x)

8.3 Aggregation function agg optimization

After a certain column is aggregated, using built-in functions is more efficient than custom functions. In the following example, the speed is accelerated by 3 times.

df.groupby("x")['a'].agg(lambda x:x.sum())
df.groupby("x")['a'].agg(sum)
df.groupby("x")['a'].agg(np.sum)

8.4 File operations

When pandas reads files, pkl format data is the fastest, followed by hdf format data, and csv format data, while xlsx reading is slower. However, the advantage of accessing CSV is that this data format is more versatile and takes up less memory and hard disk resources. In addition, for large files, csv can also divide the file into blocks, select certain columns, and specify the data type to read.
 

8.5 pandas.eval

pandas.eval is based on numexpr. One experience of using eval expression is that it will have obvious optimization effect when the data exceeds 1W rows.

import pandas as pd 
nrows, ncols = 20000, 100
df1, df2, df3, df4 = [pd.DataFrame(np.random.randn(nrows, ncols)) for _ in range(4)]
pd.eval("df1 + df2 + df3 + df4")
RELATED ARTICLES

Most Popular

Recent Comments