关于ZAKER 融媒体解决方案 合作 加入

python- 使用 multiprocessing.starmap ( ) 在进程之间共 .

CocoaChina 10-23

我想使用 multiprocessing.Value multiprocessing.Lock 在单独的进程之间共享一个计数器 . 例如:

import itertools as itimport multiprocessingdef func ( x, val, lock ) : for i in range ( x ) : i ** 2 with lock: val.value += 1 print ( 'counter incremented to:', val.value ) if __name__ == '__main__': v = multiprocessing.Value ( 'i', 0 ) lock = multiprocessing.Lock ( ) with multiprocessing.Pool ( ) as pool: pool.starmap ( func, ( ( i, v, lock ) for i in range ( 25 ) ) ) print ( counter.value ( ) )

这将引发以下异常:

RuntimeError: Synchronized objects should only be shared between

processes through inheritance

我最困惑的是 , 一个相关的 ( 虽然不是完全相似的 ) 模式可以与 multiprocessing.Process ( ) 一起使用:

if __name__ == '__main__': v = multiprocessing.Value ( 'i', 0 ) lock = multiprocessing.Lock ( ) procs = [ multiprocessing.Process ( target=func, args= ( i, v, lock ) ) for i in range ( 25 ) ] for p in procs: p.start ( ) for p in procs: p.join ( )

现在 , 我认识到这是两个明显不同的事物:

> 第一个示例使用了多个等于 cpu_count ( ) 的工作进程 , 并在它们之间分割了一个可迭代范围 ( 25 )

> 第二个示例创建 25 个工作流程和任务 , 每个工作流程和任务只有一个输入

就是说:如何以这种方式与 pool.starmap ( ) ( 或 pool.map ( ) ) 共享实例?

我已经看到过类似的问题hereherehere, 但是这些方法似乎不适用于 .map ( ) /.starmap ( ) , 因为 Value 是否使用 ctypes.c_int 都是如此 .

我意识到这种方法在技术上是可行的:

def func ( x ) : for i in range ( x ) : i ** 2 with lock: v.value += 1 print ( 'counter incremented to:', v.value ) v = Nonelock = Nonedef set_global_counter_and_lock ( ) : """Egh ... """ global v, lock if not any ( ( v, lock ) ) : v = multiprocessing.Value ( 'i', 0 ) lock = multiprocessing.Lock ( ) if __name__ == '__main__': # Each worker process will call `initializer ( ) ` when it starts. with multiprocessing.Pool ( initializer=set_global_counter_and_lock ) as pool: pool.map ( func, range ( 25 ) )

这真的是最佳做法吗?

最佳答案

使用 Pool 时遇到的 RuntimeError 是因为在通过 ( 内部池 ) 队列发送到工作进程之前 , 已对池方法的参数进行了腌制 .

您要使用哪种池方法与此处无关 . 当您仅使用 Process 时 , 不会发生这种情况 , 因为不涉及队列 . 您可以仅使用 pickle.dumps ( multiprocessing.Value ( ‘ i ’ ,0 ) ) 重现该错误 .

您上一个代码段不符合您的想法 . 您没有共享价值 , 而是为每个子进程重新创建了独立的计数器 .

如果您在 Unix 上并使用默认的启动方法 " fork", 则只需不将共享对象作为参数传递给池方法即可 .

您的子进程将通过派生继承全局变量 . 使用 process-start-methods" spawn" ( 默认 Windows ) 或 " forkserver" 时 , 您必须在 Pool 期间使用初始化程序

实例化 , 以使子进程继承共享对象 .

请注意 , 您不需要额外的 multiprocessing.Lock 锁定在这里 , 因为 multiprocessing.Value 默认情况下带有内部可使用的值 .

import osfrom multiprocessing import Pool, Value #, set_start_methoddef func ( x ) : for i in range ( x ) : assert i == i with cnt.get_lock ( ) : cnt.value += 1 print ( f'{os.getpid ( ) } | counter incremented to: {cnt.value}' ) def init_globals ( counter ) : global cnt cnt = counterif __name__ == '__main__': # set_start_method ( 'spawn' ) cnt = Value ( 'i', 0 ) iterable = [ 10000 for _ in range ( 10 ) ] with Pool ( initializer=init_globals, initargs= ( cnt, ) ) as pool: pool.map ( func, iterable ) assert cnt.value == 100000

以上内容由"CocoaChina"上传发布 查看原文
相关标签 进程之间windows

觉得文章不错,微信扫描分享好友

扫码分享