我正在编写一个应用程序,将消息从Kafka读取到Spark使用结构化流。
传入消息是字符串格式的与交易相关的修复消息。它们被转换为Java POJO。
示例代码如下:
SparkSession spark = createSparkSession();
// Subscribe to 1 topic
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
df.as(Encoders.STRING()).map(new MapFunction<String, OrderData>() {
@Override
public OrderData call(String arg0) throws Exception {
// TODO Auto-generated method stub
return OrderData(arg0);
}
}, Encoders.bean(OrderData.class));我的问题是,如何在DataSet中保存内存中的最后n条记录,以便在该DataSet之上支持Spark SQL?
发布于 2017-09-06 21:12:32
想出一些像这样的东西是可行的:
dataset.writeStream().format("memory").queryName("orderdataDS").start()https://stackoverflow.com/questions/45811364
复制相似问题