Source code for UCTB.utils.multi_threads

import os

from multiprocessing import Pool, Manager
from functools import reduce


# (my_rank, n_jobs, dataList, resultHandleFunction, parameterList)
[docs]def multiple_process(distribute_list, partition_func, task_func, n_jobs, reduce_func, parameters): if callable(partition_func) and callable(task_func) and callable(reduce_func): print('Parent process %s.' % os.getpid()) manager = Manager() share_queue = manager.Queue() locker = manager.Lock() p = Pool() for i in range(n_jobs): p.apply_async(task_func, args=(share_queue, locker, partition_func(distribute_list, i, n_jobs), [i] + parameters,)) print('Waiting for all sub_processes done...') p.close() p.join() print('All sub_processes done.') result_list = [] while not share_queue.empty(): result_list.append(share_queue.get_nowait()) return reduce(reduce_func, result_list) else: print('Parameter error')
# Example
[docs]def task(share_queue, locker, data, parameters): print('Child process %s with pid %s' % (parameters[0], os.getpid())) result = sum(data) locker.acquire() share_queue.put(result) locker.release()
if __name__ == "__main__": data = [e for e in range(1000000)] n_job = 4 sum_result = \ multiple_process(distribute_list=data, partition_func=lambda data, i, n_job: [data[e] for e in range(len(data)) if e % n_job == i], task_func=task, n_jobs=n_job, reduce_func=lambda x, y: x + y, parameters=[]) print('Result', sum_result)