Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bad multi-threaded parallel execution efficiency question. #1501

Closed
GoodManWEN opened this issue Mar 16, 2021 · 22 comments
Closed

Bad multi-threaded parallel execution efficiency question. #1501

GoodManWEN opened this issue Mar 16, 2021 · 22 comments

Comments

@GoodManWEN
Copy link

GoodManWEN commented Mar 16, 2021

🌍 Environment

  • Your operating system and version: Windows 10 20H2
  • Your python version: python 3.8.7
  • How did you install python (e.g. apt or pyenv)? Did you use a virtualenv?: from exe file / no.
  • Your Rust version (rustc --version): 1.51.0 (beta)
  • Your PyO3 version: 0.13.2
  • Have you tried using latest PyO3 master (replace version = "0.x.y" with git = "https://github.com/PyO3/pyo3")?: no

💥 Description

Hi everyone, I recently migrated some of my algorithms from python written to rust, the code is around 5,000 lines in total. The bad news is, I found that despite the excellent single-threaded execution efficiency, the pyo3 extension plugin in multi-threaded parallel mode does not execute very nicely. After releasing the GIL, running on my 8-core CPU, I was expecting a 4-8x speedup, but the actual speedup was only 2x.

I'm cautiously assuming this is caused by the type conversions between python and rust (convert python lists into rust vectors and then convert back) are all being executed under the GIL, this may be related to the fact that the data type I passed in was som ralatively long two-dimensional python lists. I would like to ask if this situation can be improved (the low-efficiency may be caused by my wrong calling method), or if it is my requirements that make it can not improve at all.

Minimum Implementation

lib.rs:
It accepts a M by N matrix and returns after each item +1. The algorithm is much more complex in real production.

use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

fn multithread_logic<const N:usize>(matrix: Vec<[f64;N]>
) -> Vec<Vec<f64>> {
    let height = matrix.len() ;
    let width = N;
    let mut result = Vec::new();
    for i in 0..height{
        let mut row:Vec<f64> = Vec::new();
        for j in 0..width {
            row.push(matrix[i][j] + 1.0);
        }
        result.push(row);
    }
    result
}

#[pyfunction]
fn multithread(
    py: Python,
    matrix: Vec<[f64;32]>,
) -> Vec<Vec<f64>> {
    py.allow_threads(|| multithread_logic(matrix))
}

#[pymodule]
fn testlib(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(multithread, m)?)?;
    Ok(())
}

call.py:
Uses a simple way to compare the speed of single-thread and multi-threaded execution speed. Time increases linearly in the actual execution, I'd like to know if this is due to my mismanagement of GIL.

import testlib
import time
import threading

matrix = [list(range(32)) for _ in range(2000)]

def single_thread(matrix):
    for i in range(1000):
        testlib.multithread(matrix)

st_time = time.time()
single_thread(matrix)
print(f"Single thread time: {time.time() - st_time} s")

st_time = time.time()
threads = []
for _ in range(8):
    threads.append(threading.Thread(target = single_thread , args = (matrix,)))
for _ in threads:
    _.start()
for _ in threads:
    _.join()
print(f"Multi-threaded time: {time.time() - st_time} s")
@GoodManWEN GoodManWEN changed the title Bad multi-threaded parallel execution efficiency problem. Bad multi-threaded parallel execution efficiency question. Mar 16, 2021
@messense
Copy link
Member

    matrix_for_each_thread = deepcopy(matrix)

Why is this copy needed?

@GoodManWEN
Copy link
Author

@messense I'm worring about competition for resources between threads, which may be unnecessary, I'm not sure.

@messense
Copy link
Member

Python objects are ref-counted, and you are not modifying the input in the Rust side, it should be fine to just pass matrix

@messense
Copy link
Member

Otherwise you are also measuring the deepcopy cost in multi-threaded case.

@messense
Copy link
Member

def single_thread(num , matrix):
    for i in range(1000):
        testlib.multithread(matrix)

I don't think it's using the num input? Did you mean

def single_thread(num , matrix):
    for i in range(num):
        testlib.multithread(matrix)

@GoodManWEN
Copy link
Author

GoodManWEN commented Mar 16, 2021

It is reasonable to assume so, but the overhead of copy in this example should not be the main cost. Even if executing deepcopy, it only executes 8 times, where python execution is not slow for a 2000x32 matrix.

@GoodManWEN
Copy link
Author

num is used to track the execution of each thread (to distinguish between threads), I removed the relevant code in the minimum implementation thus make an ambiguity, I have modified the code above.

@GoodManWEN
Copy link
Author

The results of my execution is

Single thread time: 4.926007509231567 s
Multi-threaded time: 30.748129844665527 s

@messense

This comment has been minimized.

@messense
Copy link
Member

messense commented Mar 16, 2021

I can reproduce locally.

Note that it could be much faster if you put the loop in Rust code and call it from Python, for example

#[pyfunction]
fn multithread_loop(
    py: Python,
    matrix: Vec<[f64;32]>,
    iterations: usize,
) -> () {
    py.allow_threads(|| {
        for _ in 0..iterations {
            multithread_logic(matrix.clone());
        }
    })
}

@messense
Copy link
Member

messense commented Mar 16, 2021

image

Looks like most of time is spent in type conversions.

Meanwhile, take a look at this comment: #1480 (comment)

@GoodManWEN
Copy link
Author

GoodManWEN commented Mar 16, 2021

@messense Your analysis tools are cool. Indeed its better to run the loop in rust, the reason for the testing approach is to simulate my production environment, in my actual business I have several filters that operate on 2D matrices, who run some complex logic that is difficult to optimize with numpy thus I tend to implement in embeded dlls.

In the original python implementation, the execution time of a single filter is around 200 seconds, makes it a long time in serial execution. After switching to rust, I can expect a 70x-90x acceleration in the purely algorithmic part, individual filter execution times can be reduced to milliseconds including conversion overhead.

In my testing of the actual production code (they are too long to discuss in github issue), the overall execution time for each filter is 2.3 seconds (python overall call time for 100 loop times in 230s). And in rust, I used some other code to find out it takes about 1.51 seconds between releasing gil and before returning.

rust sample code as following,

use std::time::Instant;

fn multithread_logic<const N:usize>(matrix: Vec<[f64;N]>
) -> Vec<Vec<f64>> {
   let st_time = Instant::now();
   // ... logic
   st_time.elapsed().as_micros();
    // return
}

#[pyfunction]
fn multithread(
    py: Python,
    matrix: Vec<[f64;32]>,
) -> Vec<Vec<f64>> {
    py.allow_threads(|| multithread_logic(matrix))
}

To be honest this test result does not meet my expectations. In my production code test, the result of 230s execution for a single thread takes 1210 seconds for 8 threads to execute (I'm sure that this result is not limited by the hardware which means it's much worse than the best execution efficiency). Under ideal conditions, if the code part in rust can achieve a perfect parallel efficiency of 100%, then it means that only 0.7 seconds are needed to be linear executed in a 2.3s execution. Parallel time should not be as high as 1210 seconds.


Edit after #1480
Anyway thanks for reply. If I understand correctly, there is nothing wrong with the way I release gil in the example, so unless using numpy, this is not a problem that can be optimized at the user level at this stage.

I'm not sure if you've noticed in the reproduction, even though conversion overhead takes up most of the time, they do it in rust code, and if this process not run under gil, then the cpu usage will be pertty high with multiple threads executing. But in my tests, the actual occupancy rate is relatively low, not more than 50% in total.

@messense
Copy link
Member

messense commented Mar 16, 2021

even though conversion overhead takes up most of the time, they do it in rust code, and if this process not run under gil

It has to run under GIL because it's using PyIter_Next to iterate the input matrix to build a Vec<[f64;32]>

Your analysis tools are cool.

It's Instruments.app from Xcode on macOS.

@messense
Copy link
Member

Since you are using the nightly Rust compiler, try enable the nightly feature on pyo3 to see if it make a difference.

See https://pyo3.rs/v0.13.2/features.html#nightly

@messense
Copy link
Member

Since you are using the nightly Rust compiler, try enable the nightly feature on pyo3 to see if it make a difference.

See https://pyo3.rs/v0.13.2/features.html#nightly

It doesn't seem to make it faster but actually slow it down a little. Optimized this a little in #1502

@davidhewitt
Copy link
Member

davidhewitt commented Mar 16, 2021

unless using numpy, this is not a problem that can be optimized at the user level at this stage.

This is exactly the right statement. On every function execution there's a huge amount of copying that is being done here, making lots of new list and int objects. And the conversion code can only ever run in a single thread at a time because it is making Python objects (and so must hold the GIL).

There are performance improvements that can be made here (e.g. #1308), however this will always be quite inefficient.

A simple example showing use of numpy, which avoids all the copying / Python objects, is significantly faster:

use numpy::PyArray2;
use pyo3::prelude::*;
use pyo3::{wrap_pyfunction, PyNativeType};

use ndarray::{ArrayView2, Array2};

fn multithread_logic(matrix: ArrayView2<f64>) -> Array2<f64> {
    let mut result = matrix.into_owned();
    result.iter_mut().for_each(|x| *x += 1.0);
    result
}

#[pyfunction]
fn multithread(
    matrix: &PyArray2<f64>,
) -> &PyArray2<f64> {
    let py = matrix.py();
    let readonly = matrix.readonly();
    let arr = readonly.as_array();
    PyArray2::from_owned_array(py, py.allow_threads(|| multithread_logic(arr)))
}

#[pymodule]
fn testlib(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(multithread, m)?)?;
    Ok(())
}

Result of execution on my machine:

Single thread time: 0.03507590293884277 s
Multi-threaded time: 0.2570936679840088 s

If you can't use numpy, then I suggest you make #[pyclass] structs which contain the Vec data you wish to work with. This way the data can always be owned by Rust and you won't have to pay the huge copying cost.

@GoodManWEN
Copy link
Author

GoodManWEN commented Mar 16, 2021

@davidhewitt Thanks for example, with your sample code I can reproduce it locally and that‘s really a good learning.

Unfortunately, my data comes from a relational database and is planned to be presented on some kind of web page eventually, both of them are difficult to use with numpy data directly, which means that if I want to use numpy I need to cover the overhead of converting into numpy data structure and back again, which unfortunately took longer time than the rust solution according to my testing.

Could you please provide some more details about the #[pyclass] approach, seems few explanatory paragraphs in the documentation, not enough for me to figure it out. For specific, my input type is a matrix of fixed width but uncertain length, it will always have 32 columns, and the number of rows will be approximately between 10k and 100k. It comes from a relational database of type (python) list<tuple<float;32>> or list<list<float;32>> , which can be simulated with the following code:

with pymysql.acquire() as conn:
    with conn.cursor() as cursor:
        # data = cursor.fetchdata()
        import random
        data = [tuple(range(32)) for _ in range(random.randint(10000,99999))]

Thanks.

@messense
Copy link
Member

messense commented Mar 16, 2021

Since your input type is a matrix of fixed width (assuming 32), convert your code to use Vec<f64> and iterate in chunk (of size 32) instead of Vec<Vec<f64>> could be faster due to less memory allocations when doing type conversions.

@GoodManWEN
Copy link
Author

GoodManWEN commented Mar 16, 2021

@messense I'm actually using Vec<[f64;32]> in production code, I guess that will make no difference from the approach converting it into one-dimension?

@GoodManWEN
Copy link
Author

GoodManWEN commented Mar 16, 2021

Btw another weird issue I'd like to mention is that, since multithreading cannot get rid of the gil problem in type conversion, I tried the multiprocess solution. Theoretically in multi-process mode different processes hold different gil, which will make CPU resources efficiently used. However the test result shows surprisingly that multi-processing doesnot make a signifficant difference. That makes me confuse.

Rust code same as those in the top level of discuss.

use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

fn multithread_logic<const N:usize>(matrix: Vec<[f64;N]>
) -> Vec<Vec<f64>> {
    let height = matrix.len() ;
    let width = N;
    let mut result = Vec::new();
    for i in 0..height{
        let mut row:Vec<f64> = Vec::new();
        for j in 0..width {
            row.push(matrix[i][j] + 1.0);
        }
        result.push(row);
    }
    result
}

#[pyfunction]
fn multithread(
    py: Python,
    matrix: Vec<[f64;32]>,
) -> Vec<Vec<f64>> {
    py.allow_threads(|| multithread_logic(matrix))
}

#[pymodule]
fn testlib(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(multithread, m)?)?;
    Ok(())
}

Python benchmark code

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import testlib

def pre_activate(times):
    time.sleep(times)

def execution(matrix):
    for i in range(1000):
        testlib.multithread(matrix)

if __name__ == "__main__":

    matrix = [list(range(32)) for _ in range(1500)]

    core_num = 8
    st_time = time.time()
    execution(matrix)
    single_execute_time = time.time() - st_time
    print(f"Single thread execute time: {round(single_execute_time,4)} s")

    with ThreadPoolExecutor(max_workers=core_num) as executor:
        # pre-activate {core_num} threads in threadpoolexecutor
        pre_task = [executor.submit(pre_activate, times) for times in [0.5 for _ in range(core_num)]]
        for future in as_completed(pre_task):future.result()

        st_time = time.time()
        tasks = [executor.submit(execution , matrix) for _ in range(core_num)]
        for future in as_completed(tasks):future.result()
        print(f"Multi thread execute time: {round(time.time() - st_time,4)} s",
              f", speedup: {round(core_num * single_execute_time / (time.time() - st_time),2)} x")

    with ProcessPoolExecutor(max_workers=core_num) as executor:
        #
        pre_task = [executor.submit(pre_activate, times) for times in [0.5 for _ in range(core_num)]]
        for future in as_completed(pre_task):future.result()

        st_time = time.time()
        tasks = [executor.submit(execution , matrix) for _ in range(core_num)]
        for future in as_completed(tasks):future.result()
        print(f"Multi Process execute time: {round(time.time() - st_time,4)} s",
              f", speedup: {round(core_num * single_execute_time / (time.time() - st_time),2)} x")

Results:

Single thread execute time: 3.3889 s
Multi thread execute time: 23.1617 s , speedup: 1.17 x
Multi Process execute time: 13.3324 s , speedup: 2.03 x

Which meas that with eight processes run on eight physical threads, it cannot increase computing power up to eight times, but only make it increase to about twice the speed.


Edit: Seems like not the problem of dlls.
Even using a python recursive implementation of the Fibonacci series to simulate CPU-intensive tasks will give similar results

Single thread execute time: 3.0169 s
Multi thread execute time: 24.1445 s , speedup: 1.01 x
Multi Process execute time: 9.0138 s , speedup: 2.68 x

@davidhewitt
Copy link
Member

davidhewitt commented Mar 17, 2021

Yes, multiprocessing has a lot of overhead because it must copy memory between processes to communicate.

Regarding the #[pyclass] solution, if you keep your rows in #[pyclass] structs then you won't need to allocate new memory all the time. This can save you needing to do lots of expensive copying.

However, #[pyclass] structs can only be edited with the GIL held, so it may complicate your ability to do multithreading.

You might be able to do something like this, for example:

#[pyclass]
struct MyValues {
    x: i32
}

#[pyfunction]
fn modify_myvalues(py: Python, values: Vec<PyRefMut<MyValue>>) {
    let values_mut: Vec<&mut MyValue> = values.iter_mut().map(|pyref| &mut *pyref).collect();
    py.allow_threads(|| values_mut.iter_mut().for_each(|value| value.x += 1));
}

@GoodManWEN
Copy link
Author

@messense @davidhewitt Thanks for all your reply, I understand you can not take care of efficiency and safety at the same time. Under the trade-off I'd prefer take advantage of the multi-threaded feature so raw python data structure may not help.

With regard to the test above, I realized that the performance drop might be related to the difference CPU boost frequency between single-core and multi-core mode, because it's usually difficult to be noticed in daily use. The actual multi-core utilization may be better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants