我在多线程方面遇到了一些难题。我目前正在使用SinglaR进行实时服务。这个想法是,连接的用户可以从另一个用户那里请求数据。下面是请求和响应功能的要点。
考虑以下代码:
private readonly ConcurrentBag _sharedObejcts= new ConcurrentBag();请求:
[...]
var sharedObject = new MyObject();
_sharedObejcts.Add(sharedObject);
ForwardRequestFireAndForget();
try
{
await Task.Delay(30000, sharedObject.myCancellationToken);
}
catch
{
return sharedObject.ResponseProperty;
}
_myConcurrentBag.TryTake(sharedObject);
[...]答复:
[...]
var result = DoSomePossiblyVeryLengthyTaskHere();
var sharedObject = ConcurrentBag
.Where(x)
.FirstOrDefault();
// The request has timed out so the object isn't there anymore.
if(sharedObject == null)
{
return someResponse;
}
sharedObject.ResponseProperty = result;
// triggers the cancellation source
sharedObject.Cancel();
return someOtherResponse;
[...]因此,基本上是向服务器发出请求,转发到另一个主机,然后函数等待取消或超时。
另一个主机调用响应函数,该函数添加repsonseObject并触发myCancellationToken。
然而,我不确定这是否代表了一种种族状况。从理论上讲,响应线程能否在另一个线程仍然位于finally块上时检索sharedObject?这意味着,请求已经超时,任务还没有从包中删除对象,这意味着数据不一致。
要确保在Task.Delay()调用之后调用的第一件事是TryTake()调用,有哪些保证的方法?
发布于 2020-12-23 00:53:49
你不想让制片人取消消费者的等待。这是太多的责任混淆了。
相反,您真正想要的是生产者发送异步信号。这是通过TaskCompletionSource<T>完成的。使用者可以使用不完整的TCS添加对象,然后使用者可以(异步)等待TCS完成(或超时)。然后生产者就把它的价值给了TCS。
就像这样:
class MyObject
{
public TaskCompletionSource<MyProperty> ResponseProperty { get; } = new TaskCompletionSource<MyProperty>();
}
// request (consumer):
var sharedObject = new MyObject();
_sharedObejcts.Add(sharedObject);
ForwardRequestFireAndForget();
var responseTask = sharedObject.ResponseProperty.Task;
if (await Task.WhenAny(Task.Delay(30000), responseTask) != responseTask)
return null;
_myConcurrentBag.TryTake(sharedObject);
return await responseTask;
// response (producer):
var result = DoSomePossiblyVeryLengthyTaskHere();
var sharedObject = ConcurrentBag
.Where(x)
.FirstOrDefault();
// The request has timed out so the object isn't there anymore.
if(sharedObject == null)
return someResponse;
sharedObject.ResponseProperty.TrySetResult(result);
return someOtherResponse;上面的代码可以稍微清理一下;具体来说,让生产者拥有共享对象的“生产者视图”,而使用者有一个“使用者视图”,这是一个不错的主意,这两个接口都由相同的类型实现。但是上面的代码应该会给你一个大致的想法。
https://stackoverflow.com/questions/65400537
复制相似问题