
目录
ZeroMQ (也写作 ØMQ, 0MQ 或 ZMQ) 是一个高性能的异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,无需一个专门的消息代理服务器。
与 RabbitMQ、Kafka 等传统消息队列系统不同,ZeroMQ:
ZeroMQ 提供了多种通信模式,每种模式都适用于不同的应用场景。了解这些模式是有效使用 ZeroMQ 的关键。
这是最简单也是最常见的通信模式,类似于客户端-服务器模型。客户端发送请求,服务器处理请求并回复响应。
特点:
适用场景:
在这种模式中,发布者发送消息,不关心谁接收;多个订阅者可以订阅感兴趣的消息类型。
特点:
适用场景:
这种模式创建了一个管道,数据从推送端流向拉取端。多个节点可以参与推送或拉取过程,实现负载均衡。
特点:
适用场景:
这是一种特殊模式,用于两个套接字之间的独占通信。
特点:
适用场景:
sudo apt-get update
sudo apt-get install libzmq3-dev
pip install pyzmq
创建一个简单的 Python 脚本测试安装:
import zmq
print(f"Current ZMQ version: {zmq.zmq_version()}")
print(f"Current PyZMQ version: {zmq.__version__}")
如果一切顺利,你应该能看到安装的 ZMQ 和 PyZMQ 版本号。
Current ZMQ version: 4.3.5
Current PyZMQ version: 26.2.0
现在我们已经了解了 ZeroMQ 的基础知识和不同的通信模式,接下来让我们通过实际的代码示例来深入了解每种模式。
请求-响应是最基本的通信模式,类似于传统的客户端-服务器模型。一个请求对应一个响应,通信是同步的。
重要的是,REQ 和 REP 套接字都遵循严格的收发顺序:
如果不遵循这种模式,会导致错误。
下面是一个简单的请求-响应模式示例,包含服务器和客户端:
# 服务器代码
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# 等待客户端请求
message = socket.recv_string()
print(f"收到请求: {message}")
# 模拟处理请求的时间
time.sleep(1)
# 发送响应
socket.send_string(f"对 '{message}' 的响应")
# 客户端代码
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request_num in range(5):
request = f"请求 #{request_num}"
print(f"发送请求: {request}")
socket.send_string(request)
# 等待响应
response = socket.recv_string()
print(f"收到响应: {response}")
发布-订阅模式是一种单向数据分发模式,发布者不需要知道谁在接收消息,订阅者只获取自己感兴趣的消息。
下面是一个发布-订阅模式的示例:
# 发布者代码
import zmq
import time
import random
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
# 消息类别
categories = ['天气', '交通', '新闻']
while True:
category = random.choice(categories)
update = f"{category} 更新 #{random.randint(1, 100)}"
socket.send_string(update)
print(f"发布: {update}")
time.sleep(1)
# 订阅者代码
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
# 设置订阅过滤器(只接收"天气"相关消息)
# 留空表示接收所有消息
socket.setsockopt_string(zmq.SUBSCRIBE, "天气")
while True:
message = socket.recv_string()
print(f"收到: {message}")
socket.setsockopt_string(zmq.SUBSCRIBE, "") 订阅所有消息socket.setsockopt_string(zmq.SUBSCRIBE, "天") 也能收到 天气 主题推拉模式创建了一个管道,数据从一端流向另一端,类似于流水线。多个节点可以参与推送或拉取过程,实现负载均衡。
以下是一个推拉模式的示例,实现了一个简单的任务分配系统:
# 任务生成器代码
import zmq
import time
import random
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
print("开始发送任务...")
for task_id in range(100):
workload = random.randint(1, 10) # 随机工作负载
socket.send_json({"id": task_id, "workload": workload})
print(f"发送任务 #{task_id}")
time.sleep(0.1)
# 工作者代码
import zmq
import time
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558") # 连接到结果收集器
while True:
task = receiver.recv_json()
print(f"处理任务 #{task['id']}")
# 模拟工作
time.sleep(task['workload'] * 0.1)
# 发送结果
sender.send_json({"task_id": task['id'], "status": "completed"})
# 结果收集器代码
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")
while True:
result = socket.recv_json()
print(f"任务 #{result['task_id']} 完成")
独家对模式是一种特殊模式,用于两个套接字之间的独占双向通信。
下面是一个使用独家对模式的简单示例:
# 节点1代码
import zmq
import time
import threading
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:5563")
def receive_messages():
while True:
message = socket.recv_string()
print(f"节点1收到: {message}")
# 启动接收线程
thread = threading.Thread(target=receive_messages, daemon=True)
thread.start()
# 发送消息
for i in range(5):
socket.send_string(f"从节点1的消息 {i}")
time.sleep(1)
# 节点2代码
import zmq
import time
import threading
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:5563")
def receive_messages():
while True:
message = socket.recv_string()
print(f"节点2收到: {message}")
# 启动接收线程
thread = threading.Thread(target=receive_messages, daemon=True)
thread.start()
# 发送消息
for i in range(5):
socket.send_string(f"从节点2的消息 {i}")
time.sleep(1)
让我们看一些 ZeroMQ 在实际项目中的应用案例。
这个例子展示了如何构建一个简单的分布式计算系统,用于并行处理大数据集:
# 主节点代码 (任务分配器)
import zmq
def master():
context = zmq.Context()
# 任务分发套接字
task_sender = context.socket(zmq.PUSH)
task_sender.bind("tcp://*:5557")
# 结果收集套接字
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://*:5558")
# 控制套接字 (PUB-SUB 模式)
controller = context.socket(zmq.PUB)
controller.bind("tcp://*:5559")
# 分发数据块任务
data_chunks = 100
for i in range(data_chunks):
task_sender.send_json({"chunk_id": i, "data": f"数据块 {i}"})
# 收集和处理结果
processed_chunks = 0
results = []
while processed_chunks < data_chunks:
result = results_receiver.recv_json()
results.append(result)
processed_chunks += 1
print(f"进度: {processed_chunks}/{data_chunks}")
# 通知工作者完成并退出
controller.send_string("FINISHED")
# 处理最终结果
print(f"处理完成,共 {len(results)} 个结果")
return results
if __name__ == "__main__":
results = master()
print("主节点处理结果:", results)
# 工作者节点代码
import zmq
import time
import random
import uuid
import multiprocessing
import signal
import sys
def worker(worker_id):
context = zmq.Context()
# 连接到任务分发器
task_receiver = context.socket(zmq.PULL)
task_receiver.connect("tcp://localhost:5557")
# 连接到结果收集器
results_sender = context.socket(zmq.PUSH)
results_sender.connect("tcp://localhost:5558")
# 连接到控制通道
controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt_string(zmq.SUBSCRIBE, "")
# 设置轮询器监听多个套接字
poller = zmq.Poller()
poller.register(task_receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
print(f"工作者 {worker_id} 已启动")
# 工作循环
while True:
try:
socks = dict(poller.poll(timeout=1000)) # 1秒超时
# 检查控制消息
if controller in socks and socks[controller] == zmq.POLLIN:
message = controller.recv_string(zmq.NOBLOCK)
if message == "FINISHED":
break # 退出循环
# 处理任务
if task_receiver in socks and socks[task_receiver] == zmq.POLLIN:
try:
task = task_receiver.recv_json(zmq.NOBLOCK)
# 处理数据块
result = process_data_chunk(task, worker_id)
# 发送结果
results_sender.send_json(result)
except zmq.Again:
# 没有可用任务,继续循环
continue
except KeyboardInterrupt:
break
except Exception as e:
print(f"工作者 {worker_id} 发生错误: {e}")
break
# 清理资源
task_receiver.close()
results_sender.close()
controller.close()
context.term()
print(f"工作者 {worker_id} 关闭")
def process_data_chunk(task, worker_id):
# 实际数据处理逻辑
chunk_id = task["chunk_id"]
data = task["data"]
# 模拟处理
time.sleep(random.random() * 0.1)
return {
"chunk_id": chunk_id,
"result": f"已处理 {data}",
"worker_id": f"{worker_id}-{uuid.uuid4().hex[:8]}" # 包含进程ID的工作者ID
}
def signal_handler(signum, frame):
print("接收到终止信号,正在关闭所有工作进程...")
sys.exit(0)
def main():
# 设置信号处理
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 获取CPU核心数,创建对应数量的工作进程
num_processes = multiprocessing.cpu_count()
print(f"启动 {num_processes} 个工作进程")
processes = []
try:
# 创建并启动工作进程
for i in range(num_processes):
p = multiprocessing.Process(target=worker, args=(f"Worker-{i}",))
p.start()
processes.append(p)
# 等待所有进程完成
for p in processes:
p.join()
except KeyboardInterrupt:
print("正在终止所有工作进程...")
for p in processes:
if p.is_alive():
p.terminate()
p.join(timeout=5)
if p.is_alive():
p.kill()
print("所有工作进程已关闭")
if __name__ == "__main__":
multiprocessing.set_start_method('spawn', force=True) # 确保跨平台兼容性
main()