首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Storm Trident只能在批处理中进行连接吗?

Storm Trident只能在批处理中进行连接吗?
EN

Stack Overflow用户
提问于 2013-03-29 11:43:25
回答 1查看 1.2K关注 0票数 1

我想实现连接语义,并尝试在三叉树拓扑中使用连接方法。我发现连接是在批处理之间进行的。如果两个流之间的连接有数百万个元组,是否必须在一个批次内?

在genderSpout中,每个批次有3个元组,因此ageSpout将发出2个批次,每个批次有5个元组,因此Spout将只发出一个批次

我用JoinType做了一个左外连接

测试代码的输出是:

代码语言:javascript
复制
1 man 15
2 woman 18
1 man 19
3 woman NULL
4 man NULL
1 woman NULL

从输出中,我发现前四个结果连接在来自genderSpout的第一批和来自ageSpout的第一批结果之间。最后两个结果是来自genderSpout的第二个批处理与来自ageSpout的空批处理之间的连接。因此,连接语义的结果是不正确的,因为我希望genderSpout左连接ageSpout的结果是:

代码语言:javascript
复制
1 man 15
1 man 19
2 woman 18
3 woman NULL
4 man 20
1 woman 15
1 woman 19

所以我的问题是:如果JOIN的两端(Spout)中有数百万个元组,我是否应该将它们放在一批中以获得正确的结果?

或者我走的方式是错的,你能告诉我应该怎么做才能得到正确的外连接语义结果吗?

测试代码如下:

代码语言:javascript
复制
public static void main(String[] args) throws Exception{
    Fields genderField = new Field("id", "gender");
    FixedBatchSpout genderSpout = new FixedBatchSpout(genderField, 3,
        new Values("1", "man"),
        new Values("2", "woman"),
        new Values("3", "woman"),
        new Values("4", "man"),
        new Values("1", "woman"));
    genderSpout.setCycle(false);

    Fields ageField = new Field("id2", "age");
    FiexedBatchSpout ageSpout = new FixedBatchSpout(new Fields("id2", "age"), 5,
        new Values("1", "15"),
        new Values("4", "20"),
        new Values("2", "18"),
        new Values("1", "19"));
    ageSpout.setCycle(false);

    List<Stream> allStreams = new ArrayList<Stream>();
    List<Fields> allFields = new ArrayList<Fields>();
    List<Fields> joinFileds = new ArrayList<Fields>();
    List<JoinType> joinTypes = new ArrayList<JoinType>();    

    TridentTopology topology = new TridentTopology();

    Stream genderStream = topology.newStream("genderIn", genderSpout);
    Stream ageStream = topology.newStream("ageIn", ageSpout);

    allStreams.add(genderStream);
    allStreams.add(ageStream);

    allFields.add(genderFields);
    allFields.add(ageFields);

    joinFields.add(new Field("id")));
    joinFields.add(new Field("id2"));

    joinTypes.add(JoinType.INNER);
    joinTypes.add(JoinType.OUTER);

    topology.join(allStreams, joinFields, new Filds("id", "gender", "age"), joinTypes)

    LocalCluster cluster = new LocalCluster();

    Config config = new Config()
    config.setDebug(false);
    config.setMaxSpoutPending(3);

    cluster.submitTopology("trident-join-test", config, topology.build());

    Thread.sleep(3000);
    cluster.shutdown();
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-03-31 23:53:41

我在https://groups.google.com/forum/?fromgroups=#!forum/storm-user中也问过同样的问题:https://groups.google.com/forum/?fromgroups=#!topic/storm-user/7fxAVgF2_0M

Jason Jackson给出的答案是: TridentTopology.join不会跨批加入。您可以使用stateQuery和其中一个partitionPersist跨批处理执行流连接。

希望有用

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/15696970

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档