我正在尝试使用Jet聚合,源和宿都是Kafka主题,要求是从源中获取GPB (google proto buf)消息并发布GPB消息。问题是我可以发布Double,但不能发布GPB消息,它会给我编译错误。
这可以很好地工作:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>))
.map(s->s.getValue() ).groupingKey(x->x.account)
.rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));尽管上面的代码运行良好,但它发布了接收主题的double,而我的要求是发布一个具有double属性的GPB来接收主题。当我试图通过将map放在drainTo之前来做这件事时,它给出了语法错误。下面是我尝试过的:
.rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
.map(k->Amount.newBuilder().setAmount(k.getValue()).build())
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));Amount是具有double属性的GPB消息。这给我带来了我不理解的语法错误。你能帮我把这个拿过去吗?
你能不能也分享一些文档或链接,因为不同的场景有不同的聚合?我看了Hazelcast的样本,演示,不是所有的,但很少,但没有找到我的用例。非常感谢。
发布于 2019-02-08 21:31:32
我猜语法错误是这样的:
不兼容的类型。所需的Sink<?超级Amount>但'kafka‘被推断为Sink>:不存在变量K,V类型的实例,因此字符串符合Entry
(下次,请分享这个异常,您的代码依赖于非共享类,我无法编译它。)
这意味着Kafka接收器在输入时需要java.util.Map.Entry,但是你给了它Amount。你需要这样map它:
.map(entry-> Util.entry(entry.getKey(), Amount.newBuilder().setAmount(entry.getValue()).build()))https://stackoverflow.com/questions/54442130
复制相似问题