我在代码中使用kombu,有时我从kombu的生产者发布方法中得到以下异常。我认为它更多地是在压力条件下复制的,所以可能是多线程问题,而异常描述似乎也指出了这一点。
使用python 2.7.18,kombu 4.6.11,amqp 2.6.1,gevent 20.6.2
我很感谢你的帮助。谢谢!!
我也看到了这些可能相关的页面,尽管我无法从它们中推断出在我的情况下应该做什么:
例外情况:
Traceback (most recent call last):
File "C:\Code\A\home_common\rabbitmq_common.py", line 168, in send_data_message
self.producer.publish(data_message, routing_key=destination, exchange=self.dataDirectExchange, headers={'source': source}, content_encoding='binary', content_type='application/octet-stream', retry=True)
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 181, in publish
exchange_name, declare,
File "C:\Python27\lib\site-packages\kombu\connection.py", line 533, in _ensured
return fun(*args, **kwargs)
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "C:\Python27\lib\site-packages\amqp\channel.py", line 1766, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "C:\Python27\lib\site-packages\amqp\abstract_channel.py", line 59, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "C:\Python27\lib\site-packages\amqp\method_framing.py", line 189, in write_frame
write(view[:offset])
File "C:\Python27\lib\site-packages\amqp\transport.py", line 305, in write
self._write(s)
File "C:\Python27\lib\site-packages\gevent_socket2.py", line 383, in sendall
return _socketcommon._sendall(self, data_memory, flags)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py", line 392, in _sendall
timeleft = __send_chunk(socket, chunk, flags, timeleft, end)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py", line 321, in __send_chunk
data_sent += socket.send(chunk, flags)
File "C:\Python27\lib\site-packages\gevent_socket2.py", line 369, in send
self._wait(self._write_event)
File "src\\gevent
_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\\gevent
_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\\gevent
_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x0598E810>>发布于 2020-10-01 22:10:03
作为今后的参考-这个问题似乎是通过锁定所有可能与兔子服务器联系的kombu调用来解决的,例如:
with self._kombu_lock:
self.producer.publish(data_message, routing_key=destination, exchange=self.dataDirectExchange, headers={'source': source}, content_encoding='binary', content_type='application/octet-stream', retry=True)https://stackoverflow.com/questions/63762504
复制相似问题