我想知道为什么我真的需要创建我自己的RichSinkFunction或使用JDBCOutputFormat在数据库上连接,而不是仅仅创建我的连接、执行查询和使用JDBCOutputFormat内部的传统PostgreSQL驱动程序关闭连接?
我发现很多文章告诉我这样做,但没有解释为什么?有什么关系?
使用JDBCOutputFormat编写代码示例,
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
.setQuery(query)
.setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
.finish();实现自己的RichSinkFunction的代码示例,
public class RichCaseSink extends RichSinkFunction<Case> {
private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
+ "VALUES (?, ?) "
+ "ON CONFLICT (caseid) DO UPDATE SET "
+ " tracehash=?";
private PreparedStatement statement;
@Override
public void invoke(Case aCase) throws Exception {
statement.setString(1, aCase.getId());
statement.setString(2, aCase.getTraceHash());
statement.setString(3, aCase.getTraceHash());
statement.addBatch();
statement.executeBatch();
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.postgresql.Driver");
Connection connection =
DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");
statement = connection.prepareStatement(UPSERT_CASE);
}
}为什么我不能只使用PostgreSQL驱动程序?
public class Storable implements SinkFunction<Activity>{
@Override
public void invoke(Activity activity) throws Exception {
Class.forName("org.postgresql.Driver");
try(Connection connection =
DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio")){
statement = connection.prepareStatement(UPSERT_CASE);
//Perform the query
//close connection...
}
}
}有人知道Flink最佳实践的技术答案吗?RichSinkFunction的实现或JDBCOutputFormat的使用做了什么特别的事情吗?
提前谢谢你。
发布于 2019-05-21 23:21:20
您可以使用您自己的SinkFunction,它将简单地使用invoke()方法来打开连接和写入数据,而且它一般都能工作。但在大多数情况下,它的性能会非常非常差。
第一个示例和第二个示例之间的实际区别在于,在RichSinkFunction中,您使用open()方法打开连接并准备语句。当函数初始化时,只调用一次此open()方法。在第二个示例中,您将打开到数据库的连接,并在invoke()方法中准备语句,该方法将为输入DataStream的每个元素调用该语句。实际上将为流中的每个元素打开一个新连接。
创建数据库连接是一件昂贵的事情,而且它肯定会有可怕的性能缺陷。
https://stackoverflow.com/questions/56245901
复制相似问题