首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ZeroMQ订阅服务器在单元测试中不会接收任何数据。为什么?

ZeroMQ订阅服务器在单元测试中不会接收任何数据。为什么?
EN

Stack Overflow用户
提问于 2017-08-02 18:16:09
回答 1查看 957关注 0票数 2

我不知道我的数据在哪里结束的。

我编写了一个测试,以确保我的Publisher类成功地发送数据,并且如果绑定到它,就会接收到这些数据。

类本身是从Thread继承的,并公开了一个publish()方法,我可以调用该方法来传递数据,以便通过Queue()向订阅者广播。

然而,在我的测试中,数据从未到达。我确保使用相同的端口,我想不出这里还有什么问题。

我是一个ZeroMQ新手,但我已经设法让PubSub模式开始工作了。

测试代码:

代码语言:javascript
复制
# Import Built-ins
import time
import json
import queue
from queue import Queue
from threading import Thread

# Import Third-Party
import zmq


def test_publisher_sends_data(self):
    port = 667
    name, topic, data = 'TestNode', 'testing', ['this', 'is', 'data']
    encoded_name = json.dumps(name).encode('utf-8')
    encoded_topic = json.dumps(topic).encode('utf-8')
    encoded_data = json.dumps(data).encode('utf-8')
    expected_result = (encoded_name, encoded_topic, encoded_data)

    publisher = Publisher(port)
    print("starting publisher")
    publisher.start()

    q = Queue()

    def recv(q):
        ctx = zmq.Context()
        zmq_sock = ctx.socket(zmq.SUB)
        print("Connecting to publisher")
        zmq_sock.connect('tcp://127.0.0.1:%s' % port)
        while True:
            print("waiting for data..")
            q.put(zmq_sock.recv_multipart())
            print("data received!")
    t = Thread(target=recv, args=(q,))
    t.start()

    print("sending data via publisher")
    for i in range(5):
        self.assertTrue(publisher.publish(name, topic, data))
        time.sleep(0.1)
    print("checking q for received data..")
    try:
        result = q.get(block=False)
    except queue.Empty:
        self.fail("Queue was empty, no data received!")
    self.assertEqual(expected_result, result)

Publisher

代码语言:javascript
复制
# Import Built-Ins
import logging
import json
from queue import Queue
from threading import Thread, Event

# Import Third-Party
import zmq


class Publisher(Thread):
    """Publisher Class which allows publishing data to subscribers.

    The publishing is realized with ZMQ Publisher sockets, and supports publishing
    to multiple subscribers.

    The run() method continuosly checks for data on the internal q, which is fed
    by the publish() method.

    """
    def __init__(self, port, *args, **kwargs):
        """Initialize Instance.
        :param port:
        """
        self.port = port
        self._running = Event()
        self.sock = None
        self.q = Queue()
        super(Publisher, self).__init__(*args, **kwargs)

    def publish(self, node_name, topic, data):
        """Publish the given data to all current subscribers.

        All parameters must be json-serializable objects
        :param data:
        :return:
        """
        message_parts = [json.dumps(param).encode('utf-8')
                         for param in (node_name, topic, data)]
        if self.sock:
            self.q.put(message_parts)
            return True
        else:
            return False

    def join(self, timeout=None):
        self._running.clear()
        try:
            self.sock.close()
        except Exception:
            pass
        super(Publisher, self).join(timeout)

    def run(self):
        self._running.set()
        ctx = zmq.Context()
        self.sock = ctx.socket(zmq.PUB)
        self.sock.bind("tcp://*:%s" % self.port)
        while self._running.is_set():
            if not self.q.empty():
                msg_parts = self.q.get(block=False)
                print("Sending data:", msg_parts)
                self.sock.send_multipart(msg_parts)
            else:
                continue
        ctx.destroy()
        self.sock = None
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-02 21:13:54

添加.setsockopt( zmq.SUBSCRIBE, someNonZeroLengthSTRING )

作为

文档中的默认SUB-socket实例没有订阅。

(自然)如果任何传入消息都无法匹配任何字符串,则将订阅SUB-side,本地.recv()自然不会接收该字符串。

如果您的代码没有明确的订阅,就没有这样的消息可以满足主题过滤处理条件Q.E.D。

下一步最好的办法是:

另一个问题--“迟加入”的麻烦--可能会在接下来发生,如果单一的设计是快速的,那么我建议你进一步(不仅仅是ZeroMQ )分布式系统设计的下一个最好的步骤,就是花一段时间去看“ZeroMQ连接”( Connected )一书,第一卷“”。任何认真从事异构分布式系统信令/消息传递的人都会喜欢分享技术和非技术观点和意见。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45468217

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档