嗨,我想要一个进程的简单持久队列。它将记录(添加到队列)如果任何sql失败,那么当sql服务器苏醒时,它将将所有剩余的对象从队列发送回sql服务器。如果队列中的任何记录在发送时再次失败,它将不会将对象从队列中移除。
我尝试过简单的应用程序测试,它将记录一些对象来排队并从队列中读取。
对于Test,我遇到了一些问题,比如,我需要将常量3( TestQueue#pushTest 3对象推入队列),但是在现实生活中,我需要所有的队列大小。
...
for (int i = 0; i < 3; i++) {
...第二个问题是关于TestQueue#tryToSend的。当我第一次运行我的测试TestQueue#tryToSend时,它正确地转储了所有的对象3,但是在第二次运行相同的测试TestQueue#tryToSend时,我仍然得到了3个对象转储,我认为没有结果,因为我认为当我调用tailer.readingDocument()方法时会删除结果。当对象成功地完成sql操作时,我希望从队列中删除对象,或者不删除对象。
但目前我无法删除对象,我不知道哪种方法具有此功能,也不知道如何标记已完成或未完成的对象进程。
目前,当我发现我的问题从编年史队列中找不到解决方案时,我已经迁移到https://github.com/square/tape,它不是更新的长time.If,您可以找到解决我的问题的方法,我会迁移回编年史队列。
我的代码如下:
与依赖关系:
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
<version>5.22.18</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>import model.LogData;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import java.sql.Timestamp;
public class ExampleMain {
private static final SingleChronicleQueue singleChronicleQueue;
static {
singleChronicleQueue = SingleChronicleQueueBuilder.single("logqueue").rollCycle(RollCycles.FAST_DAILY).build();
}
public static final void pushQueue(String param, String requestIp, Timestamp requestTime,
Timestamp responseTime, String returnCode) {
ExcerptAppender appender = singleChronicleQueue.acquireAppender();
try (final DocumentContext document =
appender.acquireWritingDocument(false)) {
document
.wire()
.bytes()
.writeObject(LogData.class,
new LogData(param, requestIp, requestTime, responseTime, returnCode) );
}
appender.close();
}
public static void processQueueAndRemoveProcessed() {
final ExcerptTailer tailer = singleChronicleQueue.createTailer();
final long lastReadIndex = tailer.lastReadIndex();
// tailer.toStart().direction(TailerDirection.FORWARD);
// tailer.moveToIndex(82038170320897L);
// why do i need to put 3 ? how can i get all current queue size ?
for (int i = 0; i < 3; i++) {
try (DocumentContext document = tailer.readingDocument()) {
if (document.isPresent()) {
long index = tailer.index();
final Wire wire = document.wire();
final Bytes<?> bytes = wire.bytes();
LogData logData = bytes.readObject(LogData.class);
System.out.println(index);
System.out.println(logData);
// how to remove current logData from queue ?
}
}
}
tailer.close();
}
}简单测试应用
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
public class TestQueue {
@Test
public void pushTest() {
final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "200" );
ExampleMain.pushQueue("param1","192.168.0.2" , timestamp , timestamp , "403" );
ExampleMain.pushQueue("param1","127.0.0.1" , timestamp , timestamp , "423" );
}
@Test
public void tryToSend() {
ExampleMain.processQueueAndRemoveProcessed();
}
}发布于 2022-05-03 00:49:46
纪事队列不支持删除条目-它是一个不可变的、仅附加的事件存储。
当不再使用循环文件时,您实际上可以存档或删除旧数据--参见纪事队列文档中的滚动循环。
发布于 2022-05-28 12:11:13
在创建ExcerptTailer的新实例时,我必须:
final ExcerptTailer tailer = singleChronicleQueue.createTailer("a");
我的错误是:
最终的ExcerptTailer尾矿= singleChronicleQueue.createTailer( );
保留createTailer空白参数不会记录索引!因此,每次您重新启动应用程序时,它都从开始索引开始。
谢谢为我提供信息!我已经解决了这个问题
https://stackoverflow.com/questions/71941333
复制相似问题