首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法在Apache中使用Pulsar共享订阅

无法在Apache中使用Pulsar共享订阅
EN

Stack Overflow用户
提问于 2022-05-28 08:04:14
回答 1查看 111关注 0票数 0

Flink数据管道从Apache分区主题中读取。我已经将PulsarSource订阅设置为SubscriptionType.Exclusive。当它被更改为SubscriptionType.Shared时,它期望为命名空间启用事务策略。然后在代理中启用事务管理器并启动它。但还是得到了这个例外

代码语言:javascript
复制
Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:140)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:55)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:56)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.newTransaction(PulsarUnorderedPartitionSplitReader.java:165)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:91)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:115)
    ... 9 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:51)
    ... 12 more
Caused by: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError(TransactionMetaStoreHandler.java:419)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleTransactionFailOp(TransactionMetaStoreHandler.java:352)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleNewTxnResponse(TransactionMetaStoreHandler.java:210)
    at org.apache.pulsar.client.impl.ClientCnx.handleNewTxnResponse(ClientCnx.java:945)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:382)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

我看不到在Flink的PulsarSource中启用事务的方法(方法)。

EN

回答 1

Stack Overflow用户

发布于 2022-05-28 12:27:04

独立脉冲星使用的是standalone.conf,而不是broker.conf。我错误地启用了broker.conf中的事务协调器。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72413964

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档