我正在尝试使用spring-data-r2dbc。我正在Postgresql上尝试这个。我以前尝试过spring-data-mongodb-reactive。我情不自禁地将两者进行比较。
我发现还不支持查询派生。但我想知道是否有@Tailable的等价物。这样,数据库的变化就会实时地通知我。有没有人分享过关于这方面的代码样本。
我理解底层数据库应该支持这一点。我相信Postgresql确实支持这种使用逻辑解码的东西(如果我错了,请纠正我)。
在@Tailable -data-r2dbc中是否有spring等效项?
发布于 2020-05-16 01:03:29
我在同样的问题上,不确定您是否找到了解决方案,但我能够通过执行以下操作来完成类似的事情。首先,我将触发器添加到我的表中
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;这将在每次更新、删除或插入行时在表上设置触发器。然后,它将调用我设置的触发器函数,如下所示:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;这将允许我‘监听’来自我的spring boot项目的这些更新中的任何一个,并且它将作为有效负载发送整个行。接下来,在我的spring boot项目中,我配置了一个到我的数据库的连接。
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}这样,我将其自动绑定(依赖注入)到我的服务类中的构造函数中,并将其转换为一个r2dbc PostgressqlConnection类,如下所示:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();现在我们想‘监听’我们的表,并在对我们的表执行一些更新时得到通知。为此,我们设置了一个初始化方法,该方法在依赖项注入之后使用@PostContruct注解执行
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}请注意,我们监听放在pg_notify方法中的任何名称。我们还希望设置一个方法,以便在bean即将被丢弃时关闭连接,如下所示:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}现在,我简单地创建了一个方法,该方法返回表中当前存在的任何内容的Flux,并且还将其与通知合并,正如我在通知作为json传入之前所说的那样,因此我必须对其进行反序列化,因此我决定使用ObjectMapper。因此,它看起来像这样:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}希望这能有所帮助。干杯!
https://stackoverflow.com/questions/60819966
复制相似问题