下面的多线程示例从两个不同的线程写入TopicProcessor,并在两个不同的线程中从TopicProcessor读取。然而,在某些地方存在竞争条件,使得不是所有事件都被传递给订阅者,从而导致应用程序在processed.await()中永远挂起。有人知道为什么吗?
import reactor.core.publisher.Flux;
import reactor.extra.processor.TopicProcessor;
import reactor.extra.processor.WaitStrategy;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.util.Arrays.asList;
public class ReactorTopicProcessorSample {
public static class Producer implements Callable<Void> {
final String name;
final List<String> data;
final CountDownLatch producerCount;
final TopicProcessor<String> topicProcessor;
public Producer(String name, List<String> data, CountDownLatch submitted, TopicProcessor<String> topicProcessor) {
this.name = name;
this.data = data;
this.producerCount = submitted;
this.topicProcessor = topicProcessor;
}
@Override
public Void call() throws Exception {
producerCount.countDown();
producerCount.await(); // wait until the other producer is submitted to be sure that they run in different threads
Flux.fromIterable(data)
.map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
.delayElements(Duration.ofMillis(10))
.subscribe(topicProcessor);
System.out.println("Submitted " + name + " producer in thread " + Thread.currentThread().getName() + ".");
return null;
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) { // this sample doesn't hang every time. repeat a few times to make it reproducible
realMain(args);
System.out.println("\n--- the previous run was successful. running again ---\n");
}
}
public static void realMain(String[] args) throws InterruptedException {
List<String> numbers = asList("1", "2", "3", "4", "5", "6", "7", "8");
List<String> characters = asList("a", "b", "c", "d", "e", "f", "g", "h");
CountDownLatch producerCount = new CountDownLatch(2);
CountDownLatch subscriberCount = new CountDownLatch(2);
CountDownLatch processed = new CountDownLatch(
(int) subscriberCount.getCount() * (numbers.size() + characters.size()));
ExecutorService exec = Executors.newFixedThreadPool((int) producerCount.getCount());
TopicProcessor<String> topicProcessor = TopicProcessor.<String>builder()
.share(true)
.name("topic-processor")
.bufferSize(16)
.waitStrategy(WaitStrategy.liteBlocking())
.build();
Flux<String> flux = Flux.from(topicProcessor)
.doOnSubscribe(s -> subscriberCount.countDown());
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
subscriberCount.await();
exec.submit(new Producer("number", numbers, producerCount, topicProcessor));
exec.submit(new Producer("character", characters, producerCount, topicProcessor));
processed.await();
exec.shutdown();
topicProcessor.shutdown();
}
}依赖关系
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>3.3.2.RELEASE</version>
</dependency>示例行为:订阅者只接收字符或只接收数字,导致程序在processed.await()中永远等待。这并不是每次都会发生,有时它会像预期的那样工作。
发布于 2020-11-30 15:50:48
如果我理解得很好,您希望有两个并行产生数据的生产者和两个并行消费的消费者。
首先,您需要了解反应器或RxJava是如何工作的。您需要了解cold publisher (一种一旦订阅者订阅数据就开始发布数据的发布者)。
现在回到您的代码,如果您查看一下TopicProcessor的大理石图,您将看到这个类用于从单个生产者向多个消费者并行传输数据。您的竞态情况是由TopicProcessor的不正确使用引起的。
要解决这个问题,您必须合并生产者,并为主题处理器订阅这个生产者。示例如下:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.extra.processor.TopicProcessor;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static java.util.Arrays.asList;
public class ReactorTopicProcessorSampleFixed {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) { // this sample doesn't hang every time. repeat a few times to make it reproducible
realMain(args);
System.out.println("\n--- the previous run was successful. running again ---\n");
}
}
public static void realMain(String[] args) throws InterruptedException {
List<String> numbers = asList("1", "2", "3", "4", "5", "6", "7", "8");
List<String> characters = asList("a", "b", "c", "d", "e", "f", "g", "h");
CountDownLatch subscriberCount = new CountDownLatch(2);
CountDownLatch processed = new CountDownLatch((int) subscriberCount.getCount() * (numbers.size() + characters.size()));
//the producers will not produce anything until a subscriber is subscribed to it.
//I used subscribeOn to produce the data on different threads.
Flux<String> mergedFlux = Flux.fromIterable(numbers)
.map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
.subscribeOn(Schedulers.boundedElastic())
.mergeWith(Flux.fromIterable(characters)
.map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
.subscribeOn(Schedulers.boundedElastic()));
TopicProcessor<String> topicProcessor = TopicProcessor.share("topic-processor", 16);
Flux<String> flux = Flux.from(topicProcessor).doOnSubscribe(s -> subscriberCount.countDown());
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
subscriberCount.await();
mergedFlux.subscribe(topicProcessor);
processed.await();
topicProcessor.shutdown();
}
}执行的结果如下所示:
Subscriber in thread topic-processor-200 received "1" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "2" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "1" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "2" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "3" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "4" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "5" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "6" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "7" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "3" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "4" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "5" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "8" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "a" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "b" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "6" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "7" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "8" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "a" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "b" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "c" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "d" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "e" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "f" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "c" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "d" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "e" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "f" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "g" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "h" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "g" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "h" from thread boundedElastic-1如果这是你要找的东西,请告诉我。
https://stackoverflow.com/questions/60091561
复制相似问题