这是我的第一个星火项目。我想用卡夫卡的信息。这些消息包含字节arr、一些kafka头和键。所需的输出是带有列(kafkaKey、kafkaHeader1、kafkaHeader2、byteArr)的拼图文件。我试着用Spark实现它--知道我是如何添加模式的,模式正确吗?目前我无法控制输出结果是什么样子?
...
SparkSession spark = SparkSession
.builder()
.appName("Spark Kafka")
.master("local")
.getOrCreate();
...这是创建模式的方式吗?
StructType rfSchema = new StructType(new StructField[]{
new StructField("kafkaHeader1", DataTypes.StringType, false, Metadata.empty()),
new StructField("kafkaHeader2", DataTypes.StringType, false, Metadata.empty()),
new StructField("key", DataTypes.LongType, false, Metadata.empty()),
}
);
Dataset<Row> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "10.0.0.0:30526")
.option("subscribe", "test.topic")
.option("includeHeaders", "true")
.option("max.poll.records", "4000")
.option("group.id", "testSpark")
.option("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load();。。//我在许多例子中看到了这一行,为什么我需要它?Ds.selectExpr(“强制转换(键作为字符串)”、“强制转换(值作为字符串)”、“标头”);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
String currentDate= format.format(new Date());
ds.printSchema();
ds.writeStream()
.option("checkpointLocation", "/home/xxx/spark3/streamingCheckpoint")
.format("parquet")
.outputMode(OutputMode.Append())
.partitionBy("partition")
.start("home/xxx/spark3/"+currentDate);
try {
Thread.sleep(400000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
...
Thanks发布于 2022-06-23 18:48:13
在很多例子中看到了这一行,为什么我需要它?
因为在默认情况下,Spark不会反序列化您的Kafka数据。
您需要使用UDF函数(如CAST)来解析value (以及可选的key和headers)。
例如,只有在将数据转换/解析为星火StructType之后,才能将数据写入结构化格式,例如Parquet。
顺便说一句,拼花应该能够容纳数组,所以如果你想要的是所有的标题,而不是两个,可以使用ArrayType模式。
话虽如此,从这个开始。
ds.selectExpr("CAST(key AS LONG)", "headers")
.writeStream设置deserializer选项无效。
从医生那里得到价值..。
值是,总是将反序列化为带有ByteArrayDeserializer的字节数组。使用DataFrame操作显式反序列化值
请参阅“源中的每一行都有以下架构”的部分,以查看其余的数据类型。
https://stackoverflow.com/questions/72721364
复制相似问题