首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在spring-data-r2dbc中@Tailable(spring-data-reactive mongodb)等效项

在spring-data-r2dbc中@Tailable(spring-data-reactive mongodb)等效项
EN

Stack Overflow用户
提问于 2020-03-24 02:58:46
回答 1查看 432关注 0票数 1

我正在尝试使用spring-data-r2dbc。我正在Postgresql上尝试这个。我以前尝试过spring-data-mongodb-reactive。我情不自禁地将两者进行比较。

我发现还不支持查询派生。但我想知道是否有@Tailable的等价物。这样,数据库的变化就会实时地通知我。有没有人分享过关于这方面的代码样本。

我理解底层数据库应该支持这一点。我相信Postgresql确实支持这种使用逻辑解码的东西(如果我错了,请纠正我)。

@Tailable -data-r2dbc中是否有spring等效项?

EN

回答 1

Stack Overflow用户

发布于 2020-05-16 01:03:29

我在同样的问题上,不确定您是否找到了解决方案,但我能够通过执行以下操作来完成类似的事情。首先,我将触发器添加到我的表中

代码语言:javascript
复制
CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

这将在每次更新、删除或插入行时在表上设置触发器。然后,它将调用我设置的触发器函数,如下所示:

代码语言:javascript
复制
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

这将允许我‘监听’来自我的spring boot项目的这些更新中的任何一个,并且它将作为有效负载发送整个行。接下来,在我的spring boot项目中,我配置了一个到我的数据库的连接。

代码语言:javascript
复制
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

这样,我将其自动绑定(依赖注入)到我的服务类中的构造函数中,并将其转换为一个r2dbc PostgressqlConnection类,如下所示:

代码语言:javascript
复制
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

现在我们想‘监听’我们的表,并在对我们的表执行一些更新时得到通知。为此,我们设置了一个初始化方法,该方法在依赖项注入之后使用@PostContruct注解执行

代码语言:javascript
复制
@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

请注意,我们监听放在pg_notify方法中的任何名称。我们还希望设置一个方法,以便在bean即将被丢弃时关闭连接,如下所示:

代码语言:javascript
复制
@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

现在,我简单地创建了一个方法,该方法返回表中当前存在的任何内容的Flux,并且还将其与通知合并,正如我在通知作为json传入之前所说的那样,因此我必须对其进行反序列化,因此我决定使用ObjectMapper。因此,它看起来像这样:

代码语言:javascript
复制
private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

希望这能有所帮助。干杯!

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60819966

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档