我想知道用BlockingQueue代替(PipedOutputStream和PipedInputStream)的优势
import java.io.*;
import java.util.concurrent.*;
public class PipedStreamVsBlocking {
public static void main(String... args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(4);
Runnable producerTask = () -> {
try {
while (true) {
int value = ThreadLocalRandom.current().nextInt(0, 1000);
blockingQueue.put(value);
System.out.println("BlockingQueue.Produced " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("BlockingQueue.Consume " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
PipedOutputStream pipedSrc = new PipedOutputStream();
PipedInputStream pipedSnk = new PipedInputStream();
try {
pipedSnk.connect(pipedSrc);
} catch (IOException e) {
e.printStackTrace();
}
Runnable runnablePut2 = () -> {
try {
ObjectOutputStream oos = new ObjectOutputStream(pipedSrc);
while (true) {
int value = ThreadLocalRandom.current().nextInt(0, 1000);
oos.writeInt(value);
oos.flush();
System.out.println("PipedStream.Produced " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (Exception e) {
e.printStackTrace();
}
};
Runnable runnableGet2 = () -> {
try {
ObjectInputStream ois = new ObjectInputStream(pipedSnk);
while (true) {
int value = ois.readInt();
System.out.println("PipedStream.Consume " + value);
int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
Thread.sleep(timeSleeping);
}
} catch (Exception e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.execute(runnablePut2);
executor.execute(runnableGet2);
executor.shutdown();
}
}此代码的输出是:
BlockingQueue.Consume 298
BlockingQueue.Produced 298
PipedStream.Produced 510
PipedStream.Consume 510
BlockingQueue.Produced 536
BlockingQueue.Consume 536
PipedStream.Produced 751
PipedStream.Consume 751
PipedStream.Produced 619
BlockingQueue.Produced 584
BlockingQueue.Consume 584
PipedStream.Consume 619
BlockingQueue.Produced 327
PipedStream.Produced 72
BlockingQueue.Consume 327
PipedStream.Consume 72
BlockingQueue.Produced 823
BlockingQueue.Consume 823
PipedStream.Produced 544
PipedStream.Consume 544
BlockingQueue.Produced 352
BlockingQueue.Consume 352
PipedStream.Produced 134
PipedStream.Consume 134我认为使用PipedStream (PipedOutputStream和PipedInputStream)具有优势,我知道数据是什么时候直接产生/处理的。
可能是我错了,而这个recommendation用BlockingQueue代替管道。
但是,您的评论/建议在文档中找不到。因此,我需要知道我错过了什么。
为什么我要使用BlockingQueue而不是管道?
发布于 2019-07-17 12:09:20
与任何Collection一样,BlockingQueue存储对对象的引用,因此从它检索对象的线程接收的运行时对象完全相同,生成线程被放入其中。
相反,序列化将持久化表单存储到字节流中,这只适用于Serializable对象,并将导致在接收端创建副本。在某些情况下,对象之后可能会被规范对象替换,但是,整个过程比仅仅传输引用要昂贵得多。
在您的示例中,在传输int值时,对象标识并不重要,但是装箱、序列化、反序列化和取消装箱的Integer实例的开销更令人怀疑。
如果您没有使用序列化,而是直接将int值作为四个byte值传输,那么使用PipedOutputStream和PipedInputStream就有意义了,因为它是一个很好的工具,可以传输大量的原始数据。它还具有通过关闭管道来标记数据结束的内在支持。
对于软件来说,这些管道也是正确的工具,对于进程甚至是运行生产者或消费者的计算机都应该是不可知论的,也就是说,当管道实际上在进程之间,甚至在网络连接之间时,您希望能够使用相同的软件。这也将证明使用序列化(就像JMX连接一样)是合理的。
但是,除非您真正地传输单个字节,而这些字节在被撕开时保留了它们的意义,否则就有一个固有的限制:只有一个生产者可以写入管道,而只有一个使用者可以读取数据。
https://stackoverflow.com/questions/57049559
复制相似问题