我使用的是Flink 1.13.0,下面是简单的代码片段
import org.apache.flink.table.api.bridge.scala.table2RowDataSet
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object HelloFlinkBatchTable {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
val tenv = TableEnvironment.create(settings)
val words = tenv.fromValues("hello", "world", "hadoop", "spark", "world").as("word")
words.collect().foreach(println)
words.printSchema()
tenv.createTemporaryView("words", words)
//collect works on TableResult
val result = tenv.executeSql("select word from words")
result.collect()
//collect doesn't work on the Table
//ERROR:Table cannot be converted into a DataSet. It is not part of a batch table environment.
words.collect()
}
}我会问为什么TableResult.collect可以工作,而Table.collect却不能工作(错误是:Table cannot be converted into a DataSet. It is not part of a batch table environment.)。我认为我已经在代码中正确地指定了批处理环境。
发布于 2021-05-17 09:40:39
隐式转换table2RowDataSet实际上是不推荐的,但在一般情况下很难取消隐式转换。
DataSet API即将结束,并将在中期完全集成到TableEnvironment和StreamExecutionEnvironment中。
TableResult.collect是官方和稳定支持的检索结果的方式。其他关联将在Flink 1.14中更新,参见FLINK-22590。
https://stackoverflow.com/questions/67543236
复制相似问题