corn_bag Help

py多进程

python 因为全局锁的问题, 想要利用多核只能多进程. 但又和其它语言不太一样. 里面封装了不少东西, 不过未来估计会去掉这些限制. 所以这块理解到够用就可以.

通过 Manager 实现在多个进程中共享一个队列.

import multiprocessing as mp import pandas as pd from datetime import datetime import time def worker(queue): print(f"Worker {mp.current_process().name} started.") while True: try: # 从队列中获取数据 data = queue.get(timeout=1) # 等待1秒,防止死循环 except mp.queues.Empty: print(f"Worker {mp.current_process().name} found the queue empty.") break # 队列为空,退出循环 if data is None: print(f"Worker {mp.current_process().name} received termination signal.") break # 收到终止信号,退出循环 # 处理数据(字典) df = data['key2'] print(f"Process {mp.current_process().name} processing data: {data}") # 模拟处理时间 time.sleep(1) print(f"Worker {mp.current_process().name} finished.") def main(): # 创建管理器 manager = mp.Manager() queue = manager.Queue() # 当前时间 current_time = datetime.now() # 放入一些示例数据 data1 = { 'key1': 1.23, 'key2': pd.DataFrame({ 'A': [1, 2], 'B': [3, 4], 'C': ['string1', 'string2'], 'D': [current_time, current_time] }), 'key3': 'value1' } data2 = { 'key1': 4.56, 'key2': pd.DataFrame({ 'A': [5, 6], 'B': [7, 8], 'C': ['string3', 'string4'], 'D': [current_time, current_time] }), 'key3': 'value2' } data3 = { 'key1': 7.89, 'key2': pd.DataFrame({ 'A': [9, 10], 'B': [11, 12], 'C': ['string5', 'string6'], 'D': [current_time, current_time] }), 'key3': 'value3' } for data in [data1, data2, data3]: queue.put(data) # 创建进程池 pool = mp.Pool(processes=4) # 启动工作进程 for i in range(4): print(f"Start worker {i}") pool.apply_async(worker, args=(queue,)) # 关闭队列(没有更多数据放入队列) queue.put(None) # 发送终止信号 queue.put(None) # 发送终止信号 queue.put(None) # 发送终止信号 queue.put(None) # 发送终止信号 # 等待所有进程完成 pool.close() pool.join() print("Main process finished.") if __name__ == '__main__': main()

注意 worker, args=(queue,) 中queue 死manager.queue() 返回的对象, 不能用. mp.Queue() 具体可参考 ref

多进程共享队列(全局变量)

import multiprocessing as mp import pandas as pd from datetime import datetime import time # 定义全局队列变量 queue = None def worker(): print(f"Worker {mp.current_process().name} started.") while True: try: # 从队列中获取数据 data = queue.get(timeout=1) # 等待1秒,防止死循环 except mp.queues.Empty: print(f"Worker {mp.current_process().name} found the queue empty.") break # 队列为空,退出循环 if data is None: print(f"Worker {mp.current_process().name} received termination signal.") break # 收到终止信号,退出循环 # 处理数据(字典) df = data['key2'] print(f"Process {mp.current_process().name} processing data: {data}") # 模拟处理时间 time.sleep(1) print(f"Worker {mp.current_process().name} finished.") def main(): global queue # 创建队列 queue = mp.Queue() # 当前时间 current_time = datetime.now() # 放入一些示例数据 data1 = { 'key1': 1.23, 'key2': pd.DataFrame({ 'A': [1, 2], 'B': [3, 4], 'C': ['string1', 'string2'], 'D': [current_time, current_time] }), 'key3': 'value1' } data2 = { 'key1': 4.56, 'key2': pd.DataFrame({ 'A': [5, 6], 'B': [7, 8], 'C': ['string3', 'string4'], 'D': [current_time, current_time] }), 'key3': 'value2' } data3 = { 'key1': 7.89, 'key2': pd.DataFrame({ 'A': [9, 10], 'B': [11, 12], 'C': ['string5', 'string6'], 'D': [current_time, current_time] }), 'key3': 'value3' } for data in [data1, data2, data3]: queue.put(data) # 创建进程池 pool = mp.Pool(processes=4) # 启动工作进程 for i in range(4): print(f"Start worker {i}") pool.apply_async(worker) # 关闭队列(没有更多数据放入队列) queue.close() queue.join_thread() # 等待所有进程完成 pool.close() pool.join() print("Main process finished.") if __name__ == '__main__': mp.set_start_method('fork') # 确保使用 'fork' 启动方法 main()

这里说的全局变量是

# 定义全局队列变量 queue = None

是代码上的全局, 实际情况,各个进程地址空间完全独立. queue应该是c底层做了特殊处理. 否者 普通 queue 就ok了. 不需要mp.Queue().

Last modified: 22 七月 2024