首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我们可以在SpringBoot中使用Logstash从关系型数据库同步数据吗

我们可以在SpringBoot中使用Logstash从关系型数据库同步数据吗
EN

Stack Overflow用户
提问于 2021-05-19 18:14:27
回答 1查看 74关注 0票数 0

我们可以使用Logstash-Jdbc插件将数据从Oracle/任何数据库同步到elastic。但是,在这个jdbc插件中,我找不到任何方法来操作来自DB的数据。我想在我的spring boot应用程序中使用Logstash/any插件来做同样的事情,在保存到elastic之前,我想通过它来操作数据和列名。

EN

回答 1

Stack Overflow用户

发布于 2021-05-22 22:43:15

有很多Logstash输入插件,你可以在Logstash中使用grok filter进行基本的流处理,不管我什么时候提出使用Kafka输入插件来做你的流处理,将你的数据发送到Logstash。

使用您的Kafka代理创建消费者,并在spring项目中使用publisher类发布您的文档,然后使用Logstash配置输入将您的数据摄取到索引中。在此路线图中,借助Apache Kafka,您拥有强大的消费者-发布者管道。

找到一个示例,如下所示:

代码语言:javascript
复制
        <!--Kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.8.RELEASE</version>
        </dependency>

处理完文档和数据后,创建一个publisher类来发布此文档

代码语言:javascript
复制
public class DriverProducer {

    @Autowired
    KafkaTemplate<Integer, String > kafkaTemplate;

    @Autowired
    ObjectMapper objectMapper;


    public void messenger(Object convey) throws JsonProcessingException {

        String message=objectMapper.writeValueAsString(convey);

        ListenableFuture<SendResult<Integer,String>> listenableFuture=kafkaTemplate.sendDefault(null, message);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
            failHandler(null, message, throwable);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
            successHandler(result);
            }
        });
    }
    private void failHandler(Integer key, String message, Throwable throwable){
        //log.error("Unable to send the message for following Error :"+throwable.getMessage());

        try {
            throw throwable;
        }
        catch (Throwable anotherThrowable){
            //log.error("**Supreme Error on throwing the throwable**"+anotherThrowable.getMessage());
        }
    }

    private void successHandler (SendResult<Integer,String> result){

        //log.info("Message sent successfully :"+ result);
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Getter
    @Setter
    public static class Convey{

        private SagaSequence sequence;
        private Integer key;
        private Date date;
  }

Logstash配置文件可能如下所示

代码语言:javascript
复制
input {
    kafka{
        group_id => "35834"
        topics => ["Second-Topic"]
        bootstrap_servers => "localhost:9092"
        codec => json
    }
}

filter {

}

output {
    file {
        path => "/SOMEPATH"
    }
    elasticsearch {
        hosts => ["localhost:9200"]
        document_type => "_doc"
        index => "logger"
    }
    stdout { codec => rubydebug
    }
}

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67601275

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档