首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink:找不到在类路径中实现'org.apache.flink.table.factories.CatalogFactory‘的标识符“kafka”的任何工厂

Flink:找不到在类路径中实现'org.apache.flink.table.factories.CatalogFactory‘的标识符“kafka”的任何工厂
EN

Stack Overflow用户
提问于 2021-11-30 10:35:02
回答 1查看 2.5K关注 0票数 0

我试图连接卡夫卡与Flink和运行通过sql-client.sh。但是,无论我如何处理.yaml和库,我都会得到错误:

代码语言:javascript
复制
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.

Catalog options are:
'type'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
        at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
        at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
        ... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.

Available factory identifiers are:

generic_in_memory
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
        at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
        ... 11 more

我的sql-conf非常简单(我没有包含诸如引导服务器之类的敏感信息):

代码语言:javascript
复制
catalogs:
 - name: myKafka
   type: kafka

此外,library文件夹包括以下jars:

  • flink-avro-confluent-registry-1.13.2.jar
  • flink-connector-kafka_2.12-1.13.2.jar
  • flink-sql-connector-kafka_2.12-1.13.2.jar
  • kafka-clients-2.0.0-cdh6.1.1.jar

Flink版本:1.13.2。卡夫卡版本:2.0.0-cdh6.1.1

解决方案(感谢@Niko为我指出了正确的方向):我修改了sql-conf.yaml以使用hive目录,并在SQL中创建了Kafka表。所以,我的sql-conf.yaml看起来是:

代码语言:javascript
复制
execution:
  type: streaming
  result-mode: table
  planner: blink
  current-database: default
  current-catalog: myhive

catalogs:
  - name: myhive
    type: hive
    hive-version: 2.1.1-cdh6.0.1
    hive-conf-dir: /etc/hive/conf
  
deployment:
  m: yarn-cluster
  yqu: ABC_XYZ

运行它并在SQLclient.sh内部使用必要的连接创建Kafka表。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-11-30 13:51:24

使用YAML定义的所有目录必须提供指定目录类型的类型属性。支持下列类型:

  • generic_in_memory
  • 蜂巢

您可以在官方医生中阅读更多有关信息。

您可以创建您所谓的https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#initialize-session-using-sql-files,如:

代码语言:javascript
复制
CREATE CATALOG MyCatalog WITH (
    'type' = 'hive',
    'default-database' = 'my_database',
    'hive-conf-dir' = '/etc/hive/conf'
  );

USE CATALOG MyCatalog;

CREATE TABLE MyTable(
  MyField1 INT,
  MyField2 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'YOUR_TOPIC',
  'properties.bootstrap.servers' = 'localhost',
  'properties.group.id' = 'some_id',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70167873

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档