首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ChronicleQueue如何从队列中删除对象或如何更新对象

ChronicleQueue如何从队列中删除对象或如何更新对象
EN

Stack Overflow用户
提问于 2022-04-20 14:18:54
回答 2查看 61关注 0票数 0

嗨,我想要一个进程的简单持久队列。它将记录(添加到队列)如果任何sql失败,那么当sql服务器苏醒时,它将将所有剩余的对象从队列发送回sql服务器。如果队列中的任何记录在发送时再次失败,它将不会将对象从队列中移除。

我尝试过简单的应用程序测试,它将记录一些对象来排队并从队列中读取。

对于Test,我遇到了一些问题,比如,我需要将常量3( TestQueue#pushTest 3对象推入队列),但是在现实生活中,我需要所有的队列大小。

代码语言:javascript
复制
...
for (int i = 0; i < 3; i++) {
...

第二个问题是关于TestQueue#tryToSend的。当我第一次运行我的测试TestQueue#tryToSend时,它正确地转储了所有的对象3,但是在第二次运行相同的测试TestQueue#tryToSend时,我仍然得到了3个对象转储,我认为没有结果,因为我认为当我调用tailer.readingDocument()方法时会删除结果。当对象成功地完成sql操作时,我希望从队列中删除对象,或者不删除对象。

但目前我无法删除对象,我不知道哪种方法具有此功能,也不知道如何标记已完成或未完成的对象进程。

目前,当我发现我的问题从编年史队列中找不到解决方案时,我已经迁移到https://github.com/square/tape,它不是更新的长time.If,您可以找到解决我的问题的方法,我会迁移回编年史队列。

我的代码如下:

与依赖关系:

代码语言:javascript
复制
        <dependency>
            <groupId>net.openhft</groupId>
            <artifactId>chronicle-queue</artifactId>
            <version>5.22.18</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
代码语言:javascript
复制
import model.LogData;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;

import java.sql.Timestamp;

public class ExampleMain {

    private static final SingleChronicleQueue singleChronicleQueue;

    static {
        singleChronicleQueue = SingleChronicleQueueBuilder.single("logqueue").rollCycle(RollCycles.FAST_DAILY).build();
    }


    public static final void pushQueue(String param, String requestIp, Timestamp requestTime,
                                       Timestamp responseTime, String returnCode) {
        ExcerptAppender appender = singleChronicleQueue.acquireAppender();


        try (final DocumentContext document =
                     appender.acquireWritingDocument(false)) {
            document
                    .wire()
                    .bytes()
                    .writeObject(LogData.class,
                            new LogData(param, requestIp, requestTime, responseTime, returnCode) );

        }
        appender.close();
    }

    public static void processQueueAndRemoveProcessed() {
        final ExcerptTailer tailer = singleChronicleQueue.createTailer();
        final long lastReadIndex = tailer.lastReadIndex();
//        tailer.toStart().direction(TailerDirection.FORWARD);
//        tailer.moveToIndex(82038170320897L);

//        why do i need to put 3 ? how can i get all current queue size ?
        for (int i = 0; i < 3; i++) {
            try (DocumentContext document = tailer.readingDocument()) {
                if (document.isPresent()) {
                    long index = tailer.index();
                    final Wire wire = document.wire();
                    final Bytes<?> bytes = wire.bytes();
                    LogData logData = bytes.readObject(LogData.class);

                    System.out.println(index);
                    System.out.println(logData);

//                    how to remove current logData from queue ?
                }
            }
        }
        tailer.close();
    }

}

简单测试应用

代码语言:javascript
复制
import org.junit.jupiter.api.Test;

import java.sql.Timestamp;

public class TestQueue {
    @Test
    public void pushTest() {
        final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
        ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "200" );
        ExampleMain.pushQueue("param1","192.168.0.2" , timestamp , timestamp , "403" );
        ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "423" );
    }

    @Test
    public void tryToSend() {
        ExampleMain.processQueueAndRemoveProcessed();
    }
}
EN

回答 2

Stack Overflow用户

发布于 2022-05-03 00:49:46

纪事队列不支持删除条目-它是一个不可变的、仅附加的事件存储。

当不再使用循环文件时,您实际上可以存档或删除旧数据--参见纪事队列文档中的滚动循环。

票数 0
EN

Stack Overflow用户

发布于 2022-05-28 12:11:13

在创建ExcerptTailer的新实例时,我必须:

final ExcerptTailer tailer = singleChronicleQueue.createTailer("a");

我的错误是:

最终的ExcerptTailer尾矿= singleChronicleQueue.createTailer( );

保留createTailer空白参数不会记录索引!因此,每次您重新启动应用程序时,它都从开始索引开始。

谢谢为我提供信息!我已经解决了这个问题

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71941333

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档