我有一个关于alpakka_kafka+alpakka_s3集成的问题。当我使用Alpakka kafka源码时,alpakka s3 multipartUpload似乎不会上传文件。
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore然而,一旦我在kafkaSource之后添加了.take(100)。一切都很好。
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore任何帮助都将不胜感激。提前感谢!
下面是完整的代码片段:
// Source
val kafkaSource: Source[(CommittableOffset, Array[Byte]), Consumer.Control] = {
Consumer
.committableSource(consumerSettings, Subscriptions.topics(prefixedTopics))
.map(committableMessage => (committableMessage.committableOffset, committableMessage.record.value))
.watchTermination() { (mat, f: Future[Done]) =>
f.foreach { _ =>
log.debug("consumer source shutdown, consumerId={}, group={}, topics={}", consumerId, group, prefixedTopics.mkString(", "))
}
mat
}
}
// Flow
val commitFlow: Flow[CommittableOffset, Done, NotUsed] = {
Flow[CommittableOffset]
.groupedWithin(batchingSize, batchingInterval)
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.mapAsync(parallelism = 3) { msg =>
log.debug("committing offset, msg={}", msg)
msg.commitScaladsl().map { result =>
log.debug("committed offset, msg={}", msg)
result
}
}
}
private val kafkaMsgToByteStringFlow = Flow[KafkaMessage[Any]].map(x => ByteString(x.msg + "\n"))
private val kafkaMsgToOffsetFlow = {
implicit val askTimeout: Timeout = Timeout(5.seconds)
Flow[KafkaMessage[Any]].mapAsync(parallelism = 5) { elem =>
Future(elem.offset)
}
}
// Sink
val s3Sink = {
val BUCKET = "test-data"
s3Client.multipartUpload(BUCKET, s"tmp/data.txt")
// Doesnt' work..... ( no files are showing up on the S3)
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
// This one works...
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore发布于 2018-09-25 22:58:00
实际上,它确实可以上传。问题是,你需要向s3发送一个完成请求,以完成上传,然后你的文件就可以在存储桶中使用了。我敢打赌,因为没有take(n)的kafka source不会停止向下游产生数据,所以接收器永远不会向s3发送完成请求,因为流实际上永远不会完成,所以接收器总是在完成请求之前等待更多的数据上传。
只将所有内容上传到一个文件中是无法做到的,所以,我的建议是:将kafkaSource消息分组,然后将压缩的ArrayByte发送到接收器。诀窍部分是你必须为每个文件创建一个接收器,而不是只使用一个接收器。
发布于 2018-11-13 01:35:27
private def running: Receive = {
case Subscribe(subscriberId) =>
val kafkaSubscriber = new KafkaSubscriber(
serviceName = "akka_kafka_subscriber",
group = kafkaConfig.group,
topics = kafkaConfig.subscriberTopics,
system = system,
configurationProperties = Seq(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
)
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[KafkaMessage[Any]](2))
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> kafkaSubscriber.filterTypeFlow[Any] ~> bcast.in
bcast.out(0) ~> kafkaMsgToStringFlow
.groupedWithin(BATCH_SIZE, BATCH_DURATION)
.map(group => group.foldLeft(new StringBuilder()) { (batch, elem) => batch.append(elem) })
.mapAsync(parallelism = 3) { data =>
self ? ReadyForUpload(ByteString(data.toString()),UUID.randomUUID().toString,subscriberId)
} ~> Sink.ignore
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> kafkaSubscriber.commitFlow ~> Sink.ignore
ClosedShape
}).withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
sender ! "subscription started"
case ready: ReadyForUpload=>
println("==========================Got ReadyForUpload: " + ready.fileName)
val BUCKET = "S3_BUCKET"
Source.single(ready.data).runWith(s3Client.multipartUpload(BUCKET, s"tmp/${ready.fileName}_${ready.subscriberId}.txt"))
sender() ! "Done"https://stackoverflow.com/questions/52434787
复制相似问题