问题描述:我想通过sql查询mysql中的数据,或者通过sql将其写入es集群。该程序可以成功运行,但es没有数据。
version:
G 217
public class HiveExample {
public static void main(String[] args) throws DatabaseNotExistException {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tabEnv = TableEnvironment.create(settings);
String sql =
"insert into user_action_es_sink " +
"select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from dragonfly.web_page limit 10" ;
String sporeUserAuthCreateTableSQL = "CREATE TABLE users (\n" +
" `id` INT,\n" +
" `userid` INT,\n" +
" `type` INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/spore',\n" +
" 'table-name' = 'spore_user_auth',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'xxxx',\n" +
" 'password' = 'xxxx'\n" +
")";
tabEnv.executeSql(sporeUserAuthCreateTableSQL);
String esTable = "CREATE TABLE user_action_es_sink (\n" +
" uid INT,\n" +
" appid INT,\n" +
" prepage_id INT,\n" +
" page_id INT,\n" +
" action_id STRING,\n" +
" page_name STRING,\n" +
" action_name STRING,\n" +
" prepage_name STRING,\n" +
" stat_time BIGINT,\n" +
" dt DATE\n" +
// " PRIMARY KEY (uid,dt) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'elasticsearch-6',\n" +
" 'hosts' = 'http://localhost:9200',\n" +
" 'index' = 'mytest',\n" +
" 'document-type' = 'user_action'\n" +
// " 'sink.bulk-flush.max-size' = '0',\n" +
// " 'sink.bulk-flush.max-actions' = '0',\n" +
// " 'sink.bulk-flush.interval' = '0'\n"+
// " 'format' = 'json',\n" +
// " 'json.fail-on-missing-field' = 'false',\n"+
// " 'json.ignore-parse-errors' = 'true'\n" +
")";
tabEnv.executeSql(esTable);
tabEnv.executeSql("insert into user_action_es_sink select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from users limit 10").print();
}
}我的pom档案:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc-nohive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<!--<scope>test</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
</dependency>
</dependencies>代码没有提示我任何异常信息,但是数据没有被写入,并且不清楚是什么导致了问题。
(谢谢你的帮助:)
发布于 2020-12-18 04:11:34
executeSql函数在异步节点中工作,如果在IDE中进行测试,主函数将在调用executeSql函数之后退出,而底层迷你集群将在主函数完成后关闭。这只存在于本地测试中,生产集群始终处于活动状态,提交的作业将正常执行。
如果要在IDE中等待作业执行,可以使用以下方法。
tabEnv.executeSql("insert into user_action_es_sink select xxx ")
.getJobClient().get()
.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();在Flink 1.12中,有一个简单的方法可以这样做:
tabEnv.executeSql("insert into user_action_es_sink select xxx ")
.await();https://stackoverflow.com/questions/65214841
复制相似问题