进程间通信的一种方式是通过进程之间的(命名)管道。
我希望在两个线程之间实现相同的“队列”-like通信。生产者应该编写一个基于文本的命令(使用TextWriter或输出流)。消费者应该从TextReader中读取。因为,仔细想想,OutputStream/-Writer是InputStream/-Reader的另一面。因此,从理论上讲,使用Writer向Reader填充数据应该很容易。
(这里的标准方法是在线程之间建立一个队列。然而,我希望使用TextReader和TextWriter,因为我已经有了前端和后端的现有代码。这样,通过将Console.In / Console.Out连接到生产者/消费者,将很容易进行调试。)
我以为把作者和读者联系起来真的很容易,但我找不到该怎么做。
我可以自己写一个这样的连接,但感觉它“应该”已经存在了。
有什么想法吗?
为Leif干杯
发布于 2009-08-21 09:26:14
我放弃了寻找一个“现成”的解决方案。我自己写的。一个新的类ThroughputStream,它在写入端接收数据,通过线程安全队列将它们发送到读取端,后者使用接收到的数据块进行读取。
namespace My.IO
{
public class ThrouputStream
{
private InputStreamClass inputStream;
private OutputStreamClass outputStream;
private Queue<byte[]> queue = new Queue<byte[]>();
private System.Threading.EventWaitHandle queueEvent = new System.Threading.EventWaitHandle(false, System.Threading.EventResetMode.AutoReset);
public ThrouputStream()
{
inputStream = new InputStreamClass(this);
outputStream = new OutputStreamClass(this);
}
public Stream InputStream
{
get { return inputStream; }
}
public Stream OutputStream
{
get { return outputStream; }
}
private class InputStreamClass : Stream
{
private readonly Queue<byte[]> queue;
private readonly ThrouputStream parent;
private byte[] currentBlock = null;
private int currentBlockPos = 0;
private Boolean closed = false;
private int readTimeoutMs = System.Threading.Timeout.Infinite;
public InputStreamClass(ThrouputStream parent)
{
this.parent = parent;
this.queue = parent.queue;
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
// Do nothing, always flushes.
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override bool CanTimeout
{
get
{
return true;
}
}
public override int ReadTimeout
{
get
{
return readTimeoutMs;
}
set
{
readTimeoutMs = value;
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (currentBlock == null)
{
int queueCount;
lock (queue)
{
queueCount = queue.Count;
if ( queueCount > 0 )
currentBlock = queue.Dequeue();
}
if (currentBlock == null && !parent.outputStream.IsClosed )
{
parent.queueEvent.WaitOne(readTimeoutMs);
lock (queue)
{
if (queue.Count == 0)
return 0;
currentBlock = queue.Dequeue();
}
}
currentBlockPos = 0;
}
if (currentBlock == null)
return 0;
int read = Math.Min(count, currentBlock.Length - currentBlockPos);
Array.Copy(currentBlock, currentBlockPos, buffer, offset, read);
currentBlockPos += read;
if (currentBlockPos == currentBlock.Length)
{
// did read whole block
currentBlockPos = 0;
currentBlock = null;
}
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override void Close()
{
this.closed = true;
base.Close();
}
}
private class OutputStreamClass : Stream
{
private bool isClosed = false;
private readonly Queue<byte[]> queue;
private readonly ThrouputStream parent;
public OutputStreamClass(ThrouputStream parent)
{
this.parent = parent;
this.queue = parent.queue;
}
public override bool CanRead
{
get { return false; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
// always flush
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
byte[] copy = new byte[count];
Array.Copy(buffer, offset, copy, 0, count);
lock (queue)
{
queue.Enqueue(copy);
try
{
parent.queueEvent.Set();
}
catch (Exception)
{ }
}
}
public override void Close()
{
this.isClosed = true;
base.Close();
// Signal event, to stop waiting consumer
try
{
parent.queueEvent.Set();
}
catch (Exception)
{ }
}
public bool IsClosed
{
get { return isClosed; }
}
}
}
}发布于 2009-08-21 09:42:42
我不鼓励使用streams和TextWriter/TextReader作为线程间通信的有效方法。您需要为每个“队列”提供一个流,并且为了确保有效数据完全写入或读取,您需要为每个写入或读取操作锁定该流。更好的解决方案可能是这样的:
设置一个string类型的队列,以及一对ManualResetEvents。一般的想法是使用线程信号来允许两个线程在不需要锁的情况下通信。
public static class ThreadTest
{
public void Main()
{
long exit = 0;
Queue<string> messages = new Queue<string>();
ManualResetEvent signal1 = new ManualResetEvent();
ManualResetEvent signal2 = new ManualResetEvent();
signal2.Set();
Thread writer = new Thread(() =>
{
while (exit == 0)
{
string value = Console.ReadLine();
if (value == "exit")
{
Interlocked.Exchange(ref exit, 1);
}
else
{
messages.Enqueue(value);
Console.WriteLine("Written: " + value);
signal1.Set();
}
signal2.WaitOne();
}
});
Thread reader = new Thread(() =>
{
while (exit == 0)
{
signal1.WaitOne();
signal2.Reset();
value = messages.Dequeue();
Console.WriteLine("Read: " + value);
signal2.Set();
signal1.Reset();
}
});
reader.Start();
writer.Start();
}
}https://stackoverflow.com/questions/1306800
复制相似问题