首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink sql不向es集群写入数据?

Flink sql不向es集群写入数据?
EN

Stack Overflow用户
提问于 2020-12-09 10:20:30
回答 1查看 135关注 0票数 0

问题描述:我想通过sql查询mysql中的数据,或者通过sql将其写入es集群。该程序可以成功运行,但es没有数据。

version:

  • flink: 1.11
  • es: 6.2.2
  • 组: 1.2.1
  • mysql: 5.7
  1. Below是我的代码

G 217

代码语言:javascript
复制
public class HiveExample {
    public static void main(String[] args) throws DatabaseNotExistException {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tabEnv = TableEnvironment.create(settings);


        String sql =
                "insert into user_action_es_sink " +
                        "select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from dragonfly.web_page limit 10" ;


        String sporeUserAuthCreateTableSQL = "CREATE TABLE users (\n" +
                "  `id` INT,\n" +
                "  `userid` INT,\n" +
                "  `type` INT,\n" +
                "   PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://localhost:3306/spore',\n" +
                "  'table-name' = 'spore_user_auth',\n" +
                "  'driver' = 'com.mysql.jdbc.Driver',\n" +
                "  'username' = 'xxxx',\n" +
                "  'password'  = 'xxxx'\n" +
                ")";

        tabEnv.executeSql(sporeUserAuthCreateTableSQL);

        String esTable = "CREATE TABLE user_action_es_sink (\n" +
                "  uid INT,\n" +
                "  appid INT,\n" +
                "  prepage_id INT,\n" +
                "  page_id INT,\n" +
                "  action_id STRING,\n" +
                "  page_name STRING,\n" +
                "  action_name STRING,\n" +
                "  prepage_name STRING,\n" +
                "  stat_time BIGINT,\n" +
                "  dt DATE\n" +
//                "  PRIMARY KEY (uid,dt) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'elasticsearch-6',\n" +
                "  'hosts' = 'http://localhost:9200',\n" +
                "  'index' = 'mytest',\n" +
                "  'document-type' = 'user_action'\n" +
//                "  'sink.bulk-flush.max-size' = '0',\n" +
//                "  'sink.bulk-flush.max-actions' = '0',\n" +
//                "  'sink.bulk-flush.interval' = '0'\n"+
//                "  'format' = 'json',\n" +
//                "  'json.fail-on-missing-field' = 'false',\n"+
//                "  'json.ignore-parse-errors' = 'true'\n" +
                ")";

        tabEnv.executeSql(esTable);

        tabEnv.executeSql("insert into user_action_es_sink select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from users limit 10").print();


    }
}

我的pom档案:

代码语言:javascript
复制
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-orc-nohive_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <!--<scope>test</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.13</version>
        </dependency>
    </dependencies>

代码没有提示我任何异常信息,但是数据没有被写入,并且不清楚是什么导致了问题。

(谢谢你的帮助:)

EN

回答 1

Stack Overflow用户

发布于 2020-12-18 04:11:34

executeSql函数在异步节点中工作,如果在IDE中进行测试,主函数将在调用executeSql函数之后退出,而底层迷你集群将在主函数完成后关闭。这只存在于本地测试中,生产集群始终处于活动状态,提交的作业将正常执行。

如果要在IDE中等待作业执行,可以使用以下方法。

代码语言:javascript
复制
tabEnv.executeSql("insert into user_action_es_sink select xxx ")
      .getJobClient().get()          
      .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

在Flink 1.12中,有一个简单的方法可以这样做:

代码语言:javascript
复制
tabEnv.executeSql("insert into user_action_es_sink select xxx ")
      .await();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65214841

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档