首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用云数据流通过窗口从PubSub写入谷歌云存储

使用云数据流通过窗口从PubSub写入谷歌云存储
EN

Stack Overflow用户
提问于 2016-11-06 19:31:12
回答 1查看 569关注 0票数 2

我在流模式下通过pubsub接收到数据流的消息(这是我需要的)。在GCS中,每条消息都应存储在其自己的文件中。因为不支持TextIO.Write中的无界集合,所以我尝试将PCollection划分为多个窗口,每个窗口包含一个元素。并将每个窗口写入google-cloud-storage。

下面是我的代码:

代码语言:javascript
复制
public static void main(String[] args) {    

          DataflowPipelineOptions options = PipelineOptionsFactory.create()
                  .as(DataflowPipelineOptions.class);
                options.setRunner(BlockingDataflowPipelineRunner.class);                
                options.setProject(PROJECT_ID);             
                options.setStagingLocation(STAGING_LOCATION);
                options.setStreaming(true);
                Pipeline pipeline = Pipeline.create(options);

                PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
                        .subscription(SUBSCRIPTION);

                PCollection<String> streamData = pipeline.apply(readFromPubsub);        



                PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes());
            e


                windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1"));

                pipeline.run();
        }

我仍然收到窗口之前得到的相同错误。

代码语言:javascript
复制
The DataflowPipelineRunner in streaming mode does not support TextIO.Write.

执行上述代码的代码是什么。

EN

回答 1

Stack Overflow用户

发布于 2016-11-08 23:05:57

TextIO与绑定的PCollection一起工作,您可以使用API Storage写入GCS。

你可以这样做:

代码语言:javascript
复制
    PipeOptions options = data.getPipeline().getOptions().as(PipeOptions.class);
    data.apply(WithKeys.of(new SerializableFunction<String, String>() {
             public String apply(String s) { return "mykey"; } }))          

    .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardMinutes(options.getTimeWrite()))))
    .apply(GroupByKey.create())
    .apply(Values.<Iterable<String>>create())
    .apply(ParDo.of(new StorageWrite(options)));

您可以使用groupBy操作创建一个窗口,然后使用iterable将其写入存储。StorageWrite的processElement:

代码语言:javascript
复制
        PipeOptions options = c.getPipelineOptions().as(PipeOptions.class);
        String date = ISODateTimeFormat.date().print(c.window().maxTimestamp());
        String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
        String blobName = String.format("%s/%s/%s", options.getBucketRepository(), date, options.getFileOutName() + isoDate);

        BlobId blobId = BlobId.of(options.getGCSBucket(), blobName);

        WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build());

        for (Iterator<String> it = c.element().iterator(); it.hasNext();) {
            writer.write(ByteBuffer.wrap(it.next().getBytes()));
        }
        writer.close();  
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40448829

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档