py多进程
python 因为全局锁的问题, 想要利用多核只能多进程. 但又和其它语言不太一样. 里面封装了不少东西, 不过未来估计会去掉这些限制. 所以这块理解到够用就可以.
通过 Manager 实现在多个进程中共享一个队列.
import multiprocessing as mp
import pandas as pd
from datetime import datetime
import time
def worker(queue):
print(f"Worker {mp.current_process().name} started.")
while True:
try:
# 从队列中获取数据
data = queue.get(timeout=1) # 等待1秒,防止死循环
except mp.queues.Empty:
print(f"Worker {mp.current_process().name} found the queue empty.")
break # 队列为空,退出循环
if data is None:
print(f"Worker {mp.current_process().name} received termination signal.")
break # 收到终止信号,退出循环
# 处理数据(字典)
df = data['key2']
print(f"Process {mp.current_process().name} processing data: {data}")
# 模拟处理时间
time.sleep(1)
print(f"Worker {mp.current_process().name} finished.")
def main():
# 创建管理器
manager = mp.Manager()
queue = manager.Queue()
# 当前时间
current_time = datetime.now()
# 放入一些示例数据
data1 = {
'key1': 1.23,
'key2': pd.DataFrame({
'A': [1, 2],
'B': [3, 4],
'C': ['string1', 'string2'],
'D': [current_time, current_time]
}),
'key3': 'value1'
}
data2 = {
'key1': 4.56,
'key2': pd.DataFrame({
'A': [5, 6],
'B': [7, 8],
'C': ['string3', 'string4'],
'D': [current_time, current_time]
}),
'key3': 'value2'
}
data3 = {
'key1': 7.89,
'key2': pd.DataFrame({
'A': [9, 10],
'B': [11, 12],
'C': ['string5', 'string6'],
'D': [current_time, current_time]
}),
'key3': 'value3'
}
for data in [data1, data2, data3]:
queue.put(data)
# 创建进程池
pool = mp.Pool(processes=4)
# 启动工作进程
for i in range(4):
print(f"Start worker {i}")
pool.apply_async(worker, args=(queue,))
# 关闭队列(没有更多数据放入队列)
queue.put(None) # 发送终止信号
queue.put(None) # 发送终止信号
queue.put(None) # 发送终止信号
queue.put(None) # 发送终止信号
# 等待所有进程完成
pool.close()
pool.join()
print("Main process finished.")
if __name__ == '__main__':
main()
注意 worker, args=(queue,) 中queue 死manager.queue() 返回的对象, 不能用. mp.Queue() 具体可参考 ref
多进程共享队列(全局变量)
import multiprocessing as mp
import pandas as pd
from datetime import datetime
import time
# 定义全局队列变量
queue = None
def worker():
print(f"Worker {mp.current_process().name} started.")
while True:
try:
# 从队列中获取数据
data = queue.get(timeout=1) # 等待1秒,防止死循环
except mp.queues.Empty:
print(f"Worker {mp.current_process().name} found the queue empty.")
break # 队列为空,退出循环
if data is None:
print(f"Worker {mp.current_process().name} received termination signal.")
break # 收到终止信号,退出循环
# 处理数据(字典)
df = data['key2']
print(f"Process {mp.current_process().name} processing data: {data}")
# 模拟处理时间
time.sleep(1)
print(f"Worker {mp.current_process().name} finished.")
def main():
global queue
# 创建队列
queue = mp.Queue()
# 当前时间
current_time = datetime.now()
# 放入一些示例数据
data1 = {
'key1': 1.23,
'key2': pd.DataFrame({
'A': [1, 2],
'B': [3, 4],
'C': ['string1', 'string2'],
'D': [current_time, current_time]
}),
'key3': 'value1'
}
data2 = {
'key1': 4.56,
'key2': pd.DataFrame({
'A': [5, 6],
'B': [7, 8],
'C': ['string3', 'string4'],
'D': [current_time, current_time]
}),
'key3': 'value2'
}
data3 = {
'key1': 7.89,
'key2': pd.DataFrame({
'A': [9, 10],
'B': [11, 12],
'C': ['string5', 'string6'],
'D': [current_time, current_time]
}),
'key3': 'value3'
}
for data in [data1, data2, data3]:
queue.put(data)
# 创建进程池
pool = mp.Pool(processes=4)
# 启动工作进程
for i in range(4):
print(f"Start worker {i}")
pool.apply_async(worker)
# 关闭队列(没有更多数据放入队列)
queue.close()
queue.join_thread()
# 等待所有进程完成
pool.close()
pool.join()
print("Main process finished.")
if __name__ == '__main__':
mp.set_start_method('fork') # 确保使用 'fork' 启动方法
main()
这里说的全局变量是
# 定义全局队列变量
queue = None
是代码上的全局, 实际情况,各个进程地址空间完全独立. queue应该是c底层做了特殊处理. 否者 普通 queue 就ok了. 不需要mp.Queue().
Last modified: 22 七月 2024