我们可以使用Logstash-Jdbc插件将数据从Oracle/任何数据库同步到elastic。但是,在这个jdbc插件中,我找不到任何方法来操作来自DB的数据。我想在我的spring boot应用程序中使用Logstash/any插件来做同样的事情,在保存到elastic之前,我想通过它来操作数据和列名。
发布于 2021-05-22 22:43:15
有很多Logstash输入插件,你可以在Logstash中使用grok filter进行基本的流处理,不管我什么时候提出使用Kafka输入插件来做你的流处理,将你的数据发送到Logstash。
使用您的Kafka代理创建消费者,并在spring项目中使用publisher类发布您的文档,然后使用Logstash配置输入将您的数据摄取到索引中。在此路线图中,借助Apache Kafka,您拥有强大的消费者-发布者管道。
找到一个示例,如下所示:
<!--Kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
处理完文档和数据后,创建一个publisher类来发布此文档
public class DriverProducer {
@Autowired
KafkaTemplate<Integer, String > kafkaTemplate;
@Autowired
ObjectMapper objectMapper;
public void messenger(Object convey) throws JsonProcessingException {
String message=objectMapper.writeValueAsString(convey);
ListenableFuture<SendResult<Integer,String>> listenableFuture=kafkaTemplate.sendDefault(null, message);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
failHandler(null, message, throwable);
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
successHandler(result);
}
});
}
private void failHandler(Integer key, String message, Throwable throwable){
//log.error("Unable to send the message for following Error :"+throwable.getMessage());
try {
throw throwable;
}
catch (Throwable anotherThrowable){
//log.error("**Supreme Error on throwing the throwable**"+anotherThrowable.getMessage());
}
}
private void successHandler (SendResult<Integer,String> result){
//log.info("Message sent successfully :"+ result);
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public static class Convey{
private SagaSequence sequence;
private Integer key;
private Date date;
}
Logstash配置文件可能如下所示
input {
kafka{
group_id => "35834"
topics => ["Second-Topic"]
bootstrap_servers => "localhost:9092"
codec => json
}
}
filter {
}
output {
file {
path => "/SOMEPATH"
}
elasticsearch {
hosts => ["localhost:9200"]
document_type => "_doc"
index => "logger"
}
stdout { codec => rubydebug
}
}
https://stackoverflow.com/questions/67601275
复制相似问题