首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将flink与mysql数据库连接时所需的上下文属性不匹配

将flink与mysql数据库连接时所需的上下文属性不匹配
EN

Stack Overflow用户
提问于 2020-10-08 02:38:30
回答 2查看 588关注 0票数 1

我正在使用flink最新版本(1.11.2)来处理一个mysql数据库示例,该数据库运行良好。

此外,我还将FLINK-连接器-jdbc_2.11-1.11.2、mysql-连接器-java-8.0.21.jar、PostgreSQL42.2.17.jar添加到{FLINK}/lib中。

这是我的密码

代码语言:javascript
复制
T_CONFIG = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)

ddl = """
            CREATE TABLE nba_player4 (
                 first_name STRING ,
                 last_name STRING,
                 email STRING,
                 id INT
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/inventory',
                'username' = 'root',
                'password' = 'debezium',
                'table-name' = 'customers'
            )
      """;
BT_ENV.sql_update(ddl)

sinkddl = """
        CREATE TABLE print_table (
         f0 INT,
         f1 INT,
         f2 STRING,
         f3 DOUBLE
        ) WITH (
         'connector' = 'print'
        )
      """;
BT_ENV.sql_update(sinkddl)


sqlquery("SELECT first_name, last_name  FROM nba_player4 ");
BT_ENV.execute("table_job")

但是,在运行代码时,它会出错

代码语言:javascript
复制
py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=email
schema.3.data-type=INT
schema.3.name=id
table-name=customers
url=jdbc:mysql://localhost:3306/inventory
username=root

The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory

最近:

这是我的码头文件。

代码语言:javascript
复制
version: '2.1'
services:
  jobmanager:
    build: .
    image: flink:latest
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  mysql:
    image: debezium/example-mysql
    ports:
     - "3306:3306"
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw 

docker ps命令显示出来

代码语言:javascript
复制
CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                                            NAMES
cf84c84f7821        flink      "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        6121-6123/tcp, 8081/tcp                                          _taskmanager_1
09b19142d70a        flink      "/docker-entrypoint.…"   9 minutes ago       Up 9 minutes        6123/tcp, 0.0.0.0:8081->8081/tcp                                 _jobmanager_1
4ac01eb11bf7        debezium/example-mysql      "docker-entrypoint.s…"   3 days ago          Up 9 minutes        0.0.0.0:3306->3306/tcp, 33060/tcp                                keras-flask-dep

更多信息:

我当前的flink环境是flink:Scala2.12-java8

代码语言:javascript
复制
docker pull flink:scala_2.12-java8

pyflink jdbc连接器是flink 1.11版本的flink连接器-jdbc_2.11-1.11.2.jar。

代码语言:javascript
复制
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

为了使用jdbc库,我尝试了两种方法

将flink连接器-jdbc_2.11-1.11.2.jar保存到/usr/local/lib/python3.7/site-packages/flink/lib中的

  1. 在python应用程序中配置类路径

f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar“=”/Users/huhu/ base_dir /project/webapp/libs/“flink_jdbc_jar = BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)

但还是会犯同样的错误

EN

回答 2

Stack Overflow用户

发布于 2020-10-10 12:18:55

这可能不能完全回答这个问题,但是:从MySQL的角度来看,CREATE TABLE语句是无效的,并且会引发语法错误。原因是VARCHAR数据类型需要一个长度(即列可以容纳的最大字符数)。

例如:

代码语言:javascript
复制
CREATE TABLE nba_player4 (
    first_name VARCHAR(20),
    last_name  VARCHAR(20),
    email      VARCHAR(50),
    id         VARCHAR(10)
);

现在,这是有效的MySQL代码。不过,我还建议在表中定义主键。这是数据库设计中的一个很好的实践,原因有很多,其中之一是能够唯一地标识每个记录:这样就可以使用WHERE子句准确地选择给定的记录,或者构建引用表的外键约束。一个名为id的列可能是一个很好的候选列,并且可能会更好地定义为一个自动递增的整数。

那么,马比:

代码语言:javascript
复制
CREATE TABLE nba_player4 (
    first_name VARCHAR(20),
    last_name  VARCHAR(20),
    email      VARCHAR(50),
    id         INT PRIMARY KEY AUTO_INCREMENT
);
票数 0
EN

Stack Overflow用户

发布于 2020-10-13 07:39:51

能否验证所使用的所有组件版本。很可能您没有使用1.9版本的Flink,正如我所看到的,它产生了一种新的数据类型属性格式,这是在以后的版本中引入的。

在Flink 1.9中(至少在我检查过的1.9.3中是这样),属性应该是格式:schema.#.type,而在您的示例中是schema.#.data-type

我建议要么升级到最新的Flink版本,要么至少确保使用相同版本的所有组件。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64254996

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档