我正在尝试运行一个Flink客户端应用程序,在这里我从一个文件中读取数据。应该使用ProcessFunction操作符根据数据库中的记录验证文件中的每条记录。我已经配置了一个单吨设计模式的数据源。我有以下问题
public void open(Configuration parameters) throws Exception {
//1. If i open a Connection Here
//2. Shall i declare a Data Source COnnection here with only 1 as Maximum No of Connections
super.open(parameters);
}发布于 2021-06-09 19:21:03
如果我正确地理解了您的用例,这将更容易地实现为具有外部数据库中的表的查找连接。
如果您必须自己实现这一点,那么如果您使用Flink的异步i/o操作符而不是向ProcessFunction中的数据库发出同步请求,它会更好地工作。(在Flink的用户函数中执行阻塞i/o会导致问题,应该避免。)
但要回答你最初的问题:
给定任务管理器中的所有任务槽都位于同一个JVM中。但是每个任务管理器都在一个单独的JVM中。每个任务槽都有自己的进程函数实例;每个实例将在不同的线程中运行。
不可能有一个在任务管理器之间共享连接的全局连接池。可以使用静态类在多时隙任务管理器中建立跨时隙共享的连接池,但使用静态类则被认为是Flink中的反模式。它可能导致死锁,还需要小心类加载(静态意味着每个类加载器都有一个实例,因此必须确保类由父类加载器加载,方法是将类放置在/lib中,或者配置classloader.parent-first-patterns.additional (文档)来获取这个特定类)。
关于为什么不应该这样做的更多信息,请看https://youtu.be/F7HQd3KX2TQ?t=1407。
连接池可以与Flink的异步i/o操作符相结合是一个好主意。在那里,每个操作符实例都在管理对外部数据库或服务的并发请求池,并且使用连接池可以提高性能。但是很多异步客户端库已经这样做了,在这种情况下,不需要自己去做。
https://stackoverflow.com/questions/67902674
复制相似问题