我有一个火花流应用程序,它使用来自topic1的数据并解析它,然后将相同的记录发布到两个过程中,一个是topic2,另一个是hive表。将数据发布到kafka topic2时,我看到重复项,但在配置单元表中看不到重复项
使用spark 2.2,Kafka 0.10.0
KafkaWriter.write(spark, storeSalesStreamingFinalDF, config)
writeToHIVE(spark, storeSalesStreamingFinalDF, config)
object KafkaWriter {
def write(spark: SparkSession, df: DataFrame, config: Config)
{
df.select(to_json(struct("*")) as 'value)
.write
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafka.dev.bootstrap.servers"))
.option("topic",config.getString("kafka.topic"))
.option("kafka.compression.type",config.getString("kafka.compression.type"))
.option("kafka.session.timeout.ms",config.getString("kafka.session.timeout.ms"))
.option("kafka.request.timeout.ms",config.getString("kafka.request.timeout.ms"))
.save()
}
}有人能帮上忙吗,
期望在kafka topic2中没有重复项。
发布于 2019-05-21 13:17:01
为了处理重复的数据,我们应该设置.option("kafka.processing.guarantee","exactly_once")
https://stackoverflow.com/questions/56223543
复制相似问题