首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在apache中加入后从PCollection<Row>中提取信息?

如何在apache中加入后从PCollection<Row>中提取信息?
EN

Stack Overflow用户
提问于 2022-03-12 01:52:26
回答 1查看 626关注 0票数 1

我有两个执行innerJoin的示例数据流。我想扩展这段示例连接代码,并在连接发生后添加一些逻辑。

代码语言:javascript
复制
public class JoinExample {

  public static void main(String[] args) {
    final Pipeline pipeline = Pipeline.create(pipelineOpts);

    PCollection<Row> adStream =
        pipeline
            .apply(From.source("kafka.adStream"))
            .apply(Select.fieldNames("ad.id", "ad.name"))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    PCollection<Row> clickStream =
        pipeline
            .apply(From.source("kafka.clickStream"))
            .apply(Select.fieldNames("ad.id", "numClicks"))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    adStream
        .apply(Join.<Row, Row>innerJoin(clickStream).using("id"))
        .apply(ConsoleOutput.of(Row::toString)); // Instead of this output, I would like to just print the ad name and num clicks after the join

    pipeline.run();
  }

我只想打印广告名称,并在联接之后单击num,使用如下DoFcn:

代码语言:javascript
复制
 adStream
    .apply(Join.<Row, Row>innerJoin(clickStream).using("id"))
    .apply(ParDo.of(new DoFcn(PCollection<Row>, int>() {

      public void processElement(ProcessContext c) {
        // Since there are two rows after the join, how can I get info from each row?
        // Example in:
        //    ad.id = 1, ad.name = test
        //    ad.id = 1, numClicks = 1000
        
        // After join
        // Row: [Row:[1, test], Row:[1, 1000]]
        
        // I tried this statement but it is incorrect
        Row one = c.element.getRow(0);  // This API is not available
      }
     } 

对于如何从连接的数据中提取这些信息,有什么想法吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-14 23:08:17

正如您所了解的,Schema方法模拟SQL,其中连接的结果是连接PCollections中的行。为了查看哪些行进入内部联接,您必须使用CoGroup实用程序来加入PCollections。这将返回一个Row对象,其中包含与键匹配的Rows,每个PCollections都具有单独的可迭代性。示例:

代码语言:javascript
复制
import org.apache.beam.sdk.schemas.transforms.CoGroup;
import org.apache.beam.sdk.values.PCollectionTuple;

public class JoinExample {

  public static void main(String[] args) {
    final Pipeline pipeline = Pipeline.create(pipelineOpts);

    PCollection<Row> adStream =
        pipeline
            .apply(From.source("kafka.adStream"))
            .apply(Select.fieldNames("ad.id", "ad.name"))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    PCollection<Row> clickStream =
        pipeline
            .apply(From.source("kafka.clickStream"))
            .apply(Select.fieldNames("ad.id", "numClicks"))          
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    // The names given here for the PCollections can be used to retrieve the
    // the rows in the consuming PTransform. See below:
    PCollectionTuple.of("adStream", adStream, "clickStream", clickStream)
      // This selects the common field name in both adStream and clickStream 
      // to join on. See the documentation for ways of joining on
      // different keys.
      .apply(CoGroup.join(By.fieldNames("id")))
      .apply(ParDo.of(new DoFn<Row, int>() {
        public void processElement(ProcessContext c) 

          // Get key.
          String id = c.element.getValue("key").id;

          // Get rows from the adStream and clickStream PCollections that 
          // share the same id.
          Iterable<Row> adStream = c.element.getValue("adStream");
          Iterable<Row> clickStream = c.element.getValue("clickStream");

          return 0;
        }
      }));

     pipeline.run();
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71446398

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档