首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么我要创建我自己的RichSinkFunction而不是打开和关闭我的PostgreSql连接?

为什么我要创建我自己的RichSinkFunction而不是打开和关闭我的PostgreSql连接?
EN

Stack Overflow用户
提问于 2019-05-21 20:27:47
回答 1查看 2.8K关注 0票数 1

我想知道为什么我真的需要创建我自己的RichSinkFunction或使用JDBCOutputFormat在数据库上连接,而不是仅仅创建我的连接、执行查询和使用JDBCOutputFormat内部的传统PostgreSQL驱动程序关闭连接?

我发现很多文章告诉我这样做,但没有解释为什么?有什么关系?

使用JDBCOutputFormat编写代码示例,

代码语言:javascript
复制
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("org.postgresql.Driver")
     .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
     .setQuery(query)
     .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
     .finish();

实现自己的RichSinkFunction的代码示例,

代码语言:javascript
复制
public class RichCaseSink extends RichSinkFunction<Case> {

  private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
      + "VALUES (?, ?) "
      + "ON CONFLICT (caseid) DO UPDATE SET "
      + "  tracehash=?";

  private PreparedStatement statement;


  @Override
  public void invoke(Case aCase) throws Exception {

    statement.setString(1, aCase.getId());
    statement.setString(2, aCase.getTraceHash());
    statement.setString(3, aCase.getTraceHash());
    statement.addBatch();
    statement.executeBatch();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    Class.forName("org.postgresql.Driver");
    Connection connection =
        DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");

    statement = connection.prepareStatement(UPSERT_CASE);
  }

}

为什么我不能只使用PostgreSQL驱动程序?

代码语言:javascript
复制
public class Storable implements SinkFunction<Activity>{

    @Override
    public void invoke(Activity activity) throws Exception {
        Class.forName("org.postgresql.Driver");
        try(Connection connection =
            DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio")){

        statement = connection.prepareStatement(UPSERT_CASE);

        //Perform the query

        //close connection...
        }
    }

}

有人知道Flink最佳实践的技术答案吗?RichSinkFunction的实现或JDBCOutputFormat的使用做了什么特别的事情吗?

提前谢谢你。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-05-21 23:21:20

您可以使用您自己的SinkFunction,它将简单地使用invoke()方法来打开连接和写入数据,而且它一般都能工作。但在大多数情况下,它的性能会非常非常差。

第一个示例和第二个示例之间的实际区别在于,在RichSinkFunction中,您使用open()方法打开连接并准备语句。当函数初始化时,只调用一次此open()方法。在第二个示例中,您将打开到数据库的连接,并在invoke()方法中准备语句,该方法将为输入DataStream的每个元素调用该语句。实际上将为流中的每个元素打开一个新连接。

创建数据库连接是一件昂贵的事情,而且它肯定会有可怕的性能缺陷。

票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56245901

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档