首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何修复Apache Storm Trident拓扑中的错误“component:[x] subscribes from non existent component[y]”

如何修复Apache Storm Trident拓扑中的错误“component:[x] subscribes from non existent component[y]”
EN

Stack Overflow用户
提问于 2019-06-12 22:33:23
回答 1查看 122关注 0票数 0

我刚刚实现了一个三叉树DRPC函数来处理传入的消息,并且我正在尝试将拓扑的最后阶段处理的元组的计数持久化为三叉树状态。下面是我的拓扑结构:

代码语言:javascript
复制
topology.newDRPCStream("portfolio")
    .map(parseMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "portfolioPayload"))
    .filter(new FilterNull())
    .flatMap(splitMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "strategyCode"))
    .parallelismHint(1)
    .shuffle()
    .each(new Fields("strategyCode"), findMongoTradesFunction,
        new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
            "tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
    .parallelismHint(10)
    .shuffle()
    .filter(tradeFilterFunction)
    .parallelismHint(150)
    .groupBy(new Fields("uitid"))
    .aggregate(
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo", "uitid"), reduceAggregateFunction,
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo"))
    .parallelismHint(200)
    .groupBy(new Fields("portfolioUrn"))
    .persistentAggregate(stateFactory, new Count(), new Fields("count"));

当我试图将这个拓扑提交给Storm时,我遇到了这个错误:

代码语言:javascript
复制
Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more

如果我从拓扑中删除最后两个函数,则可以成功提交拓扑,即:

代码语言:javascript
复制
.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));

在运行聚合函数(aggregate())之后,我想使用'portfolioUrn‘字段对元组进行分组,并将计数持久化到mongoDB中。我不明白为什么最后的groupBy().persistentAggregate()部分会导致这个错误。你能帮我找出原因吗?

EN

回答 1

Stack Overflow用户

发布于 2019-09-02 16:13:57

经过一番研究后,我发现了this页面,这个页面对我来说似乎是一个相似的案例。Nathan Marz表示DRPC拓扑不支持分区持久化(从2013年开始),我相信我的情况也是如此。我认为(未完全验证) Storm 1.2.1 DRPC拓扑可能根本不支持状态持久性。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56564675

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档