当发送数据足够快时,就会抛出InvalidOperationException :对于这个WebSocket实例,已经有一个尚未执行的'SendAsync‘调用。ClientWebSocket.ReceiveAsync和ClientWebSocket.SendAsync可以同时调用,但最多允许它们同时执行一个未完成的操作。
第一个示例使用SemaphoreSlim,它一次只允许一条消息,以防止该问题发生。
问题是,我是否需要在第二个示例中执行相同的SemaphoreSlim解决方案,因为SingleReader = true是在通道选项中指定的?基本上应该是一样的,但我希望有人来确认,所以没有意外。
无System.Threading.Channels
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);
public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
if (_webSocket.State != WebSocketState.Open)
{
return false;
}
// Only one message can be sent at a time. Wait until a message can be sent.
await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
// We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
try
{
await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
}
catch (TaskCanceledException)
{
sendAsyncSemaphore.Release();
return false;
}
// Our thread can now release the sendAsyncSemaphore so another message can be sent.
sendAsyncSemaphore.Release();
return true;
}
public void Dispose()
{
sendAsyncSemaphore.Dispose();
}用System.Threading.Channels
public class WebSocketClient
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public WebSocketClient(WebSocket webSocket)
{
_webSocket = webSocket;
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
if (_webSocket.State != WebSocketState.Open)
{
return;
}
var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
}
}
}发布于 2022-04-07 13:24:09
简短的回答是的!当SingleReader = true时,您仍然需要一些东西来限制未执行的读取调用的数量。文档清楚地指出,您将该属性设置为true,“如果通道的读取器保证一次最多只能进行一次读取操作”。因此,该财产不提供对发送者的限制。
在我看来,使用信号量或任何其他同步原语并不是一项工作。如果您查看代码,您将看到设置SingleReader选项将为您提供一个SingleConsumerUnboundedChannel。如果你自己不限制读者,你最终会无意中取消阅读操作。
发布于 2022-04-07 13:05:51
如果设置了SingleReader = true,就不必要地使用SemaphoreSlim,因为这只允许通道中的true读取器保证一次最多只能进行一次读取操作。
通道的
true读取器保证一次最多只能进行一次读取操作;如果没有这样的约束,则为guaranteed.false。
顺便说一下,您只是定义了initialCount,但没有限制maxCount,这意味着您将无限地计数信号,这是SemaphoreSlim源代码,NO_MAXIMUM是一个const值int.MaxValue。
// No maximum constant
private const int NO_MAXIMUM = int.MaxValue;
public SemaphoreSlim(int initialCount)
: this(initialCount, NO_MAXIMUM)
{
}您可以尝试使用另一个构造方法,它提供两个参数,第一个是initialCount,第二个是设置maxCount。
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1,1);https://stackoverflow.com/questions/71782076
复制相似问题