是否可以在Beam中应用嵌套查询?我试图在Beam中运行这样的查询,但得到了错误。
我正在运行的查询是:
PCollection<BeamRecord> Query_Output = Query.apply(
BeamSql.queryMulti("Select Orders.OrderID From Orders Where Orders.CustomerID IN (Select Customers.CustomerID From Customers WHERE Customers.CustomerID = 2)"));我得到的错误是:
org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner validateAndConvert
INFO: SQL:
SELECT `Orders`.`OrderID`
FROM `Orders` AS `Orders`
WHERE `Orders`.`CustomerID` IN (SELECT `Customers`.`CustomerID`
FROM `Customers` AS `Customers`
WHERE `Customers`.`CustomerID` = 2)
Jan 19, 2018 11:56:36 AM org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(OrderID=[$0])
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
LogicalTableScan(table=[[Orders]])
LogicalAggregate(group=[{0}])
LogicalProject(CustomerID=[$0])
LogicalFilter(condition=[=($0, 2)])
LogicalTableScan(table=[[Customers]])
Exception in thread "main" java.lang.IllegalStateException: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: DefaultTrigger, Repeatedly.forever(AfterWatermark.pastEndOfWindow())
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:165)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:160)
at com.bitwise.cloud.ExampleOfJoins.main(ExampleOfJoins.java:91)
Caused by: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: DefaultTrigger, Repeatedly.forever(AfterWatermark.pastEndOfWindow())
at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:123)
at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:101)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:107)
at org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(Join.java:59)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.standardJoin(BeamJoinRel.java:217)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.buildBeamPipeline(BeamJoinRel.java:161)
at org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel.buildBeamPipeline(BeamProjectRel.java:68)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:163)
... 5 more有人能帮帮我吗。
谢谢。
发布于 2018-01-26 23:00:15
您能尝试在输入Repeatedly.forever(AfterWatermark.pastEndOfWindow())上设置PCollection触发器吗?(见光束编程指南)
发生了什么:
从查询计划中可以看到,它包含一个连接(LogicalJoin)和一个聚合(LogicalAggregate)。
当前的Beam实现要求LogicalJoin的两个输入都具有相同的窗口策略(包括触发器)。
聚合可能对应于查询中的IN运算符。对于聚合,Beam采用一些默认的窗口策略,并覆盖任何输入的配置。
因此,查询的第二部分最终应用了与输入不同的窗口策略,这会导致对联接操作的验证失败。
为了避免这些问题,目前还需要更改Beam行为,以避免覆盖窗口配置。
将您的问题归档为bug:https://issues.apache.org/jira/browse/BEAM-3547
https://stackoverflow.com/questions/48335383
复制相似问题