我在流模式下通过pubsub接收到数据流的消息(这是我需要的)。在GCS中,每条消息都应存储在其自己的文件中。因为不支持TextIO.Write中的无界集合,所以我尝试将PCollection划分为多个窗口,每个窗口包含一个元素。并将每个窗口写入google-cloud-storage。
下面是我的代码:
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(PROJECT_ID);
options.setStagingLocation(STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(SUBSCRIPTION);
PCollection<String> streamData = pipeline.apply(readFromPubsub);
PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes());
e
windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1"));
pipeline.run();
}我仍然收到窗口之前得到的相同错误。
The DataflowPipelineRunner in streaming mode does not support TextIO.Write.执行上述代码的代码是什么。
发布于 2016-11-08 23:05:57
TextIO与绑定的PCollection一起工作,您可以使用API Storage写入GCS。
你可以这样做:
PipeOptions options = data.getPipeline().getOptions().as(PipeOptions.class);
data.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) { return "mykey"; } }))
.apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardMinutes(options.getTimeWrite()))))
.apply(GroupByKey.create())
.apply(Values.<Iterable<String>>create())
.apply(ParDo.of(new StorageWrite(options)));您可以使用groupBy操作创建一个窗口,然后使用iterable将其写入存储。StorageWrite的processElement:
PipeOptions options = c.getPipelineOptions().as(PipeOptions.class);
String date = ISODateTimeFormat.date().print(c.window().maxTimestamp());
String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
String blobName = String.format("%s/%s/%s", options.getBucketRepository(), date, options.getFileOutName() + isoDate);
BlobId blobId = BlobId.of(options.getGCSBucket(), blobName);
WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build());
for (Iterator<String> it = c.element().iterator(); it.hasNext();) {
writer.write(ByteBuffer.wrap(it.next().getBytes()));
}
writer.close(); https://stackoverflow.com/questions/40448829
复制相似问题