我想从卡夫卡主题读取流数据,并写入S3的avro,或拼图,格式。数据流看起来像json字符串,但我不能转换和写入avro或拼图,格式的S3。
我找到了一些代码片段并尝试
val .forBulkFormat=新路径(OutputS3Path),val .build()
但是我在addSink得到了“类型不匹配,预期的SinkFunctionString,实际的: StreamingFileSinkTextOut”
val .addSource=环境信宿(MyConsumerSource).addSink(信宿)
请帮帮忙,谢谢!
发布于 2019-07-16 03:50:38
变通解决方案您可以在基本的etl之后使用AWS Kinesis Firehose,将您的SQL Query Flink表转换为字符串,并从AWS控制台写入到Kinesis,然后作为拼接写入S3。
Kafka示例:- https://github.com/kali786516/FlinkStreamAndSql/tree/master/src/main/scala/com/aws/examples/kafka

发布于 2019-12-04 20:06:07
这是我的代码,它是工作存储到本地系统的拼图文件。
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(100)
val schema = SchemaBuilder
.record("record")
.fields()
.requiredString("message")
.endRecord()
val stream: DataStreamSource[GenericRecord] = env.fromCollection(genericRecordList)
val path = new Path(s"/tmp/flink-parquet-${System.currentTimeMillis()}")
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
.forBulkFormat(path, ParquetAvroWriters.forGenericRecord(schema))
.build()
stream.addSink(sink)
env.execute()https://stackoverflow.com/questions/56979250
复制相似问题