首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >得到"pyflink.util.exceptions.TableException: findAndCreateTableSource失败“。运行PyFlink示例时

得到"pyflink.util.exceptions.TableException: findAndCreateTableSource失败“。运行PyFlink示例时
EN

Stack Overflow用户
提问于 2021-03-15 05:24:16
回答 1查看 805关注 0票数 2

我运行在PyFlink程序下面(从https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html复制)

代码语言:javascript
复制
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

为了验证它是否有效,我执行了以下操作:

运行echo -e "flink\npyflink\nflink" > /tmp/input

  • Run python WordCount.py

  • Run cat /tmp/out并查找预期输出

然后,我更改了我的PyFlink程序,使之更喜欢SQL而不是Table,但我发现它不起作用。

代码语言:javascript
复制
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

以下是错误:

代码语言:javascript
复制
Traceback (most recent call last):
  File "WordCount.py", line 38, in <module>
    .execute_insert('mySink').wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py", line 864, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
     at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
     at org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
     at org.apache.flink.table.api.internal.TableEnvImpl.org$apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
     at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
     at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
     at org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
     at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
     at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
     at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
     at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
     at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
     at java.lang.Thread.run(Thread.java:748)

我想知道我的新程序怎么了?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-16 02:07:49

问题是,您使用的遗留DataSet不支持您声明的FileSystem连接器。您可以使用眨眼计划,以实现您的需要。

代码语言:javascript
复制
t_env = BatchTableEnvironment.create(
    environment_settings=EnvironmentSettings.new_instance()
    .in_batch_mode().use_blink_planner().build())
t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
    .select(tab.word, lit(1).count) \
    .execute_insert('mySink').wait()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66632765

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档