首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何从Kafka Topic中获取记录总数并保存到HDFS中?

如何从Kafka Topic中获取记录总数并保存到HDFS中?
EN

Stack Overflow用户
提问于 2020-05-22 07:14:37
回答 1查看 257关注 0票数 1

全,

我正在使用转储到HDFS上的Kafka数据。我能够使用数据,并希望从Kafka获得记录的总数,并将其另存为HDFS文件,以便我可以使用该文件进行验证。我可以在控制台中打印记录,但我不确定如何创建总计数文件?

从Kafka拉取记录的查询:

代码语言:javascript
复制
Dataset ds1=ds.filter(args[5]);
 StreamingQuery query = ds1
                   .coalesce(10)
                   .writeStream()
                   .format("parquet")
                   .option("path", path.toString())
                   .option("checkpointLocation", args[6] + "/checkpoints" + args[2])
                   .trigger(Trigger.Once())
                   .start();

          try {
                query.awaitTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
                System.exit(1);
            }   

以及我为获取记录并在控制台中打印而编写的代码:

Dataset stream=ds1.groupBy("<column_name>").count(); //实际上,我想在不使用GroupBy的情况下获取计数,我尝试过long stream=ds1.count(),但遇到了错误。

代码语言:javascript
复制
 StreamingQuery query1=stream.coalesce(1)
                        .writeStream()
                        .format("csv")
                       .option("path", path + "/record")
                       .start();

               try {
                    query1.awaitTermination();
                 } catch (StreamingQueryException e) {
                     e.printStackTrace();
                    System.exit(1);
                } 

这不起作用,你能帮我解决这个问题吗?

EN

回答 1

Stack Overflow用户

发布于 2020-05-23 06:39:36

主题中任何时候的记录数量都是一个移动目标。

您将需要使用旧的Spark Streaming来查找每个Spark partiton批处理的记录数,然后使用Accumulator来计算所有处理的记录,但这将是您所能获得的最接近的结果。

Spark + Kafka声称只有一次处理语义,所以我建议你把重点放在错误捕获和监控上,而不仅仅是计数验证。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61945286

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档