我有一个Azure Event Hub,其中包含消息。我用Python应用程序编写了msg,可以在Event Hub GUI中看到正确的msg计数。但是我似乎不能用Python阅读消息。我的代码如下。它运行时没有错误,但没有得到任何结果。
奇怪的是,在我运行这段代码后,Event Hub GUI显示所有的msg(几千个)都是传出的,这表明我的程序确实收到了它们。但是代码永远不会显示它们。
感谢任何人的帮助!
结果总是..。
Msg offset: <azure.eventhub.common.Offset object at 0x102fc4e10>
Msg seq: 0
Msg body: 0
Received 1 messages in 0.11292386054992676 seconds++++++++++++
# pip install azure-eventhub
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
logger = logging.getLogger("azure")
# URL of the event hub, amqps://<mynamespace>.servicebus.windows.net/myeventhub
ADDRESS = "amqps://chc-eh-ns.servicebus.windows.net/chc-eh"
# Access tokens for event hub namespace, from Azure portal for namespace
USER = "RootManageSharedAccessKey"
KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXX"
# Additional setup to receive events
CONSUMER_GROUP = "$default" # our view of the event hub, useful when there is more than one consumer at same time
PARTITION = "0" # which stream within event hub
OFFSET = Offset("-1") # get all msgs in event hub. msgs are never removed, they just expire per event hub settings
PREFETCH = 100 # not sure exactly what this does ??
# Initialize variables
total = 0
last_sn = -1
last_offset = -1
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=PREFETCH, offset=OFFSET)
client.run()
start_time = time.time()
for event_data in receiver.receive(timeout=100):
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Msg offset: " + str(last_offset))
print("Msg seq: " + str(last_sn))
print("Msg body: " + event_data.body_as_str())
total += 1
end_time = time.time()
client.stop()
run_time = end_time - start_time
print("\nReceived {} messages in {} seconds".format(total, run_time))
except KeyboardInterrupt:
pass
finally:
client.stop()发布于 2018-11-29 05:19:13
明白了!我复制的代码样本是错误的。您必须获取成批的消息,然后迭代每批消息。这是正确的代码。
# pip install azure-eventhub
import time
from azure.eventhub import EventHubClient, Offset
# URL of the event hub, amqps://<mynamespace>.servicebus.windows.net/myeventhub
ADDRESS = "amqps://chc-eh-ns.servicebus.windows.net/chc-eh"
# Access tokens for event hub namespace, from Azure portal for namespace
USER = "RootManageSharedAccessKey"
KEY = "XXXXXXXXXXXX"
# Additional setup to receive events
CONSUMER_GROUP = "$default" # our view of the event hub, useful when there is more than one consumer at same time
PARTITION = "0" # which stream within event hub
OFFSET = Offset("-1") # get all msgs in event hub. msgs are never removed, they just expire per event hub settings
PREFETCH = 100 # batch size ??
# Initialize variables
total = 0
last_sn = -1
last_offset = -1
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=PREFETCH, offset=OFFSET)
client.run()
start_time = time.time()
batch = receiver.receive(timeout=5000)
while batch:
for event_data in batch:
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Msg offset: " + str(last_offset))
print("Msg seq: " + str(last_sn))
print("Msg body: " + event_data.body_as_str())
total += 1
batch = receiver.receive(timeout=5000)
end_time = time.time()
client.stop()
run_time = end_time - start_time
print("\nReceived {} messages in {} seconds".format(total, run_time))
except KeyboardInterrupt:
pass
finally:
client.stop()https://stackoverflow.com/questions/53524135
复制相似问题