因为我找不到任何不使用循环获取流内容的实现,所以我开始实现一个,但是我面临着一些问题,也许你们中的一些人可以指出正确的地方。
该实现使用Pub/Sub和流:* log ->流通道* log:notification ->发布/sub* log:lastReadMessage ->的组合,其中包含流中的最后一个读取键。
出版商
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
while(true)
{
var value = new NameValueEntry[]
{
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
};
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
}
}订阅者
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
Console.WriteLine(x);
});
Console.ReadLine();
} private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(obs =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
{
var locker = await taskFromStreamBlocker
.WaitAsync(0)
.ConfigureAwait(false);
if (!locker)
{
return;
}
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
taskFromStreamBlocker.Release();
});
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}为什么是信号量?
因为我可以将很多消息添加到流中,所以我不希望o将相同的消息处理两次。
问题
你将如何实现这一点?是否只有Rx实现了Redis流?该解决方案不应该使用某种循环,并且具有内存效率。这个是可能的吗?
谨致问候
保罗·阿博伊姆·平托
发布于 2020-05-26 17:56:13
这是另一种使用200 is时间的计时器的解决方案。
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
var instance = ThreadPoolScheduler.Instance;
return Observable.Create<string>(obs =>
{
var disposable = Observable
.Interval(TimeSpan.FromMilliseconds(200), instance)
.Subscribe(async _ =>
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
});
cancellationToken.Register(() => disposable.Dispose());
return Disposable.Empty;
});
}发布于 2020-05-29 02:27:03
我用一个紧的循环,只需做一个XRange,保存一个位置-吻。但是,如果没有工作,它就会退却,所以当有很多事情发生的时候,它就会很快地停下来。
但是,如果您需要更高的性能(如在处理过程中阅读),那么在大多数情况下,我会提醒您不要这样做。
我不再使用分布式锁/信号量了。
如果您处理的命令(如剂量,而不是xyz )已经发生,这些操作可能会失败。同样,消费者应该处理已经发生的情况,而不是红/流读取部分。
一些具有神奇回调功能的libs无法解决这些问题,当超时在任何节点上运行时,回调将重新尝试。复杂性/问题仍然存在,它们只是转移到其他地方。
对于消费者来说,您可能在上面有一个可以观察到的,但是这基本上是表面的,它不能解决问题,如果您在许多实现下看到相同的循环。我不会使用这个,而是让使用者注册一个操作。
例如
public interface IStreamSubscriber
{
void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
void Start();
} 在您的例子中,回调可能有可观察的,而不是使用循环,但是下面有一个低级别的循环,它还可以为使用者执行消息到对象的转换。
发布于 2020-05-26 16:31:56
这是我想要避免的解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(async obs =>
{
while(!cancellationToken.IsCancellationRequested)
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
await Task.Delay(TimeSpan.FromMilliseconds(500));
}
return Disposable.Empty;
});
}https://stackoverflow.com/questions/62012495
复制相似问题