首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Flink中将avro文件写入S3?

如何在Flink中将avro文件写入S3?
EN

Stack Overflow用户
提问于 2019-07-11 06:06:43
回答 2查看 908关注 0票数 1

我想从卡夫卡主题读取流数据,并写入S3的avro,或拼图,格式。数据流看起来像json字符串,但我不能转换和写入avro或拼图,格式的S3。

我找到了一些代码片段并尝试

val .forBulkFormat=新路径(OutputS3Path),val .build()

但是我在addSink得到了“类型不匹配,预期的SinkFunctionString,实际的: StreamingFileSinkTextOut”

val .addSource=环境信宿(MyConsumerSource).addSink(信宿)

请帮帮忙,谢谢!

EN

回答 2

Stack Overflow用户

发布于 2019-07-16 03:50:38

Stack Overflow用户

发布于 2019-12-04 20:06:07

这是我的代码,它是工作存储到本地系统的拼图文件。

代码语言:javascript
复制
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(100)
val schema = SchemaBuilder
  .record("record")
  .fields()
  .requiredString("message")
  .endRecord()

val stream: DataStreamSource[GenericRecord] = env.fromCollection(genericRecordList)
val path = new Path(s"/tmp/flink-parquet-${System.currentTimeMillis()}")
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
  .forBulkFormat(path, ParquetAvroWriters.forGenericRecord(schema))
  .build()

stream.addSink(sink)
env.execute()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56979250

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档