首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何用kafka标头编写拼花文件

如何用kafka标头编写拼花文件
EN

Stack Overflow用户
提问于 2022-06-22 20:02:20
回答 1查看 133关注 0票数 0

这是我的第一个星火项目。我想用卡夫卡的信息。这些消息包含字节arr、一些kafka头和键。所需的输出是带有列(kafkaKey、kafkaHeader1、kafkaHeader2、byteArr)的拼图文件。我试着用Spark实现它--知道我是如何添加模式的,模式正确吗?目前我无法控制输出结果是什么样子?

代码语言:javascript
复制
...
 SparkSession spark = SparkSession
                .builder()
                .appName("Spark Kafka")
                .master("local")
                .getOrCreate();
...

这是创建模式的方式吗?

代码语言:javascript
复制
        StructType rfSchema = new StructType(new StructField[]{
                new StructField("kafkaHeader1", DataTypes.StringType, false, Metadata.empty()),
                new StructField("kafkaHeader2", DataTypes.StringType, false, Metadata.empty()),
                new StructField("key", DataTypes.LongType, false, Metadata.empty()),
            }
        );


        Dataset<Row> ds = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "10.0.0.0:30526")
                .option("subscribe", "test.topic")
                .option("includeHeaders", "true")
                .option("max.poll.records", "4000")
                .option("group.id", "testSpark")
                .option("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
                .option("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss", "false")
                .load();

。。//我在许多例子中看到了这一行,为什么我需要它?Ds.selectExpr(“强制转换(键作为字符串)”、“强制转换(值作为字符串)”、“标头”);

代码语言:javascript
复制
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        String currentDate= format.format(new Date());

        ds.printSchema();
        ds.writeStream()
                .option("checkpointLocation", "/home/xxx/spark3/streamingCheckpoint")
                .format("parquet")
                .outputMode(OutputMode.Append())
                .partitionBy("partition")
                .start("home/xxx/spark3/"+currentDate);
        try {
            Thread.sleep(400000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }
...
Thanks
EN

回答 1

Stack Overflow用户

发布于 2022-06-23 18:48:13

在很多例子中看到了这一行,为什么我需要它?

因为在默认情况下,Spark不会反序列化您的Kafka数据。

您需要使用UDF函数(如CAST)来解析value (以及可选的keyheaders)。

例如,只有在将数据转换/解析为星火StructType之后,才能将数据写入结构化格式,例如Parquet。

顺便说一句,拼花应该能够容纳数组,所以如果你想要的是所有的标题,而不是两个,可以使用ArrayType模式。

话虽如此,从这个开始。

代码语言:javascript
复制
ds.selectExpr("CAST(key AS LONG)", "headers")
    .writeStream

设置deserializer选项无效。

从医生那里得到价值..。

值是,总是将反序列化为带有ByteArrayDeserializer的字节数组。使用DataFrame操作显式反序列化值

请参阅“源中的每一行都有以下架构”的部分,以查看其余的数据类型。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72721364

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档