首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在不清除bigquery表的情况下更新应用程序引擎中运行的google云数据流

如何在不清除bigquery表的情况下更新应用程序引擎中运行的google云数据流
EN

Stack Overflow用户
提问于 2016-11-03 17:13:41
回答 1查看 256关注 0票数 0

我在google-cloud-dataflow上运行了一个App-engine进程。它监听通过pubsub发送的消息和流向big-query的流。

我更新了我的代码,我试图重新运行应用程序。

但我收到了这个错误:

代码语言:javascript
复制
Exception in thread "main" java.lang.IllegalArgumentException: BigQuery table is not empty

是否在不删除表的情况下更新数据流?因为我的代码可能经常更改,所以我不想删除表中的数据。

这是我的代码:

代码语言:javascript
复制
public class MyPipline {
    private static final Logger LOG = LoggerFactory.getLogger(BotPipline.class);
    private static String name;

    public static void main(String[] args) {

        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("a").setType("string"));
        fields.add(new TableFieldSchema().setName("b").setType("string"));
        fields.add(new TableFieldSchema().setName("c").setType("string"));
        TableSchema tableSchema = new TableSchema().setFields(fields);

        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(BlockingDataflowPipelineRunner.class);
        options.setProject("my-data-analysis");
        options.setStagingLocation("gs://my-bucket/dataflow-jars");
        options.setStreaming(true);

        Pipeline pipeline = Pipeline.create(options);

        PCollection<String> input = pipeline
                .apply(PubsubIO.Read.subscription(
                        "projects/my-data-analysis/subscriptions/myDataflowSub"));

        input.apply(ParDo.of(new DoFn<String, Void>() {

            @Override
            public void processElement(DoFn<String, Void>.ProcessContext c) throws Exception {
                LOG.info("json" + c.element());
            }

        }));
        String fileName = UUID.randomUUID().toString().replaceAll("-", "");


        input.apply(ParDo.of(new DoFn<String, String>() {
            @Override
            public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
                JSONObject firstJSONObject = new JSONObject(c.element());
                firstJSONObject.put("a", firstJSONObject.get("a").toString()+ "1000");
                c.output(firstJSONObject.toString());

            }

        }).named("update json")).apply(ParDo.of(new DoFn<String, TableRow>() {

            @Override
            public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
                JSONObject json = new JSONObject(c.element());
                TableRow row = new TableRow().set("a", json.get("a")).set("b", json.get("b")).set("c", json.get("c"));
                c.output(row);
            }

        }).named("convert json to table row"))
                .apply(BigQueryIO.Write.to("my-data-analysis:mydataset.mytable").withSchema(tableSchema)
        );

        pipeline.run();
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-11-03 22:44:02

您需要在您的withWriteDisposition上指定BigQueryIO.Write --参见文档该方法的它的论点。根据您的需求,您需要WRITE_TRUNCATEWRITE_APPEND

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40407527

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档