首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Hazelcast-Jet drainTo语法问题

Hazelcast-Jet drainTo语法问题
EN

Stack Overflow用户
提问于 2019-01-30 21:50:57
回答 1查看 77关注 0票数 0

我正在尝试使用Jet聚合,源和宿都是Kafka主题,要求是从源中获取GPB (google proto buf)消息并发布GPB消息。问题是我可以发布Double,但不能发布GPB消息,它会给我编译错误。

这可以很好地工作:

代码语言:javascript
复制
    Pipeline p = Pipeline.create();
    p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>)) 
    .map(s->s.getValue() ).groupingKey(x->x.account)
    .rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

尽管上面的代码运行良好,但它发布了接收主题的double,而我的要求是发布一个具有double属性的GPB来接收主题。当我试图通过将map放在drainTo之前来做这件事时,它给出了语法错误。下面是我尝试过的:

代码语言:javascript
复制
    .rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
    .map(k->Amount.newBuilder().setAmount(k.getValue()).build())
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

Amount是具有double属性的GPB消息。这给我带来了我不理解的语法错误。你能帮我把这个拿过去吗?

你能不能也分享一些文档或链接,因为不同的场景有不同的聚合?我看了Hazelcast的样本,演示,不是所有的,但很少,但没有找到我的用例。非常感谢。

EN

回答 1

Stack Overflow用户

发布于 2019-02-08 21:31:32

我猜语法错误是这样的:

不兼容的类型。所需的Sink<?超级Amount>但'kafka‘被推断为Sink>:不存在变量K,V类型的实例,因此字符串符合Entry

(下次,请分享这个异常,您的代码依赖于非共享类,我无法编译它。)

这意味着Kafka接收器在输入时需要java.util.Map.Entry,但是你给了它Amount。你需要这样map它:

代码语言:javascript
复制
.map(entry-> Util.entry(entry.getKey(), Amount.newBuilder().setAmount(entry.getValue()).build()))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54442130

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档