我刚刚实现了一个三叉树DRPC函数来处理传入的消息,并且我正在尝试将拓扑的最后阶段处理的元组的计数持久化为三叉树状态。下面是我的拓扑结构:
topology.newDRPCStream("portfolio")
.map(parseMapFunction,
new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
"portfolioTimestamp", "portfolioPayload"))
.filter(new FilterNull())
.flatMap(splitMapFunction,
new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
"portfolioTimestamp", "strategyCode"))
.parallelismHint(1)
.shuffle()
.each(new Fields("strategyCode"), findMongoTradesFunction,
new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
"tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
.parallelismHint(10)
.shuffle()
.filter(tradeFilterFunction)
.parallelismHint(150)
.groupBy(new Fields("uitid"))
.aggregate(
new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
"sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
"riskViewTo", "uitid"), reduceAggregateFunction,
new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
"sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
"riskViewTo"))
.parallelismHint(200)
.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));当我试图将这个拓扑提交给Storm时,我遇到了这个错误:
Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more如果我从拓扑中删除最后两个函数,则可以成功提交拓扑,即:
.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));在运行聚合函数(aggregate())之后,我想使用'portfolioUrn‘字段对元组进行分组,并将计数持久化到mongoDB中。我不明白为什么最后的groupBy().persistentAggregate()部分会导致这个错误。你能帮我找出原因吗?
发布于 2019-09-02 16:13:57
经过一番研究后,我发现了this页面,这个页面对我来说似乎是一个相似的案例。Nathan Marz表示DRPC拓扑不支持分区持久化(从2013年开始),我相信我的情况也是如此。我认为(未完全验证) Storm 1.2.1 DRPC拓扑可能根本不支持状态持久性。
https://stackoverflow.com/questions/56564675
复制相似问题