首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >librdkafka线程在失败时未进行清理

librdkafka线程在失败时未进行清理
EN

Stack Overflow用户
提问于 2018-05-12 02:26:48
回答 1查看 199关注 0票数 0

在我的应用程序中,我创建了一个生产者和一个消费者。

代码语言:javascript
复制
conf->set("dr_cb", delivery_cb, errstr);
conf->set("event_cb", event_cb, errstr);

RdKafka::Producer::create(conf.get(), errstr)

conf->set("log_level", "0", errstr);
conf->set("group.id", group_id, errstr);
conf->set("client.id", m_kafka_client_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("rebalance_cb", rebalance_cb, errstr);
conf->set("statistics.interval.ms", "3000", errstr);
conf->set("event_cb", event_cb, errstr);

RdKafka::KafkaConsumer::create(conf.get(), errstr)

然后尝试获取元数据,如下所示

代码语言:javascript
复制
err = _consumer->metadata(false, nullptr, &metadata, METADATA_TIMEOUT); 
std::unique_ptr<RdKafka::Metadata> metadata_uptr(metadata); // Handover the raw pointer to the unique_ptr now

如果由于某种原因,broker通信不能正常工作,我会收到错误消息,然后我会通过unique_ptr ->析构函数删除生产者和消费者的get。然后循环进行,直到应用程序成功连接到代理程序。

我观察到的是,我看到许多线程被创建并保持在那里。在某一时刻,该计数达到2000个线程。

清理Kafka的正确方法是什么?

线在这里卡住了

代码语言:javascript
复制
#0  0x00007f6acc031cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00000000004b8e65 in cnd_timedwait_ms (cnd=cnd@entry=0xab41b8, mtx=mtx@entry=0xab4190, timeout_ms=<optimized out>) at tinycthread.c:501
#2  0x00000000004853aa in rd_kafka_q_serve (rkq=0xab4190, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, 
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:440
#3  0x000000000045a43c in rd_kafka_thread_main (arg=arg@entry=0xabf760) at rdkafka.c:1227
#4  0x00000000004b8c07 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#5  0x00007f6acc02de25 in start_thread () from /lib64/libpthread.so.0
#6  0x00007f6acaa7834d in clone () from /lib64/libc.so.6

我也试着在失败时调用下面的例程,但没有帮助...

代码语言:javascript
复制
RdKafka::wait_destroyed(5000);
EN

回答 1

Stack Overflow用户

发布于 2018-05-19 01:44:28

RdKafka::KafkaConsumer被包装在另一个容器中,"myobject_wrapper->close()“没有被重定向到"RdKafka::KafkaConsumer::close”

在修复了我的应用程序中的this问题之后,现在一切正常。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50298202

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档