假设我有一个使用唯一名称(由我设置)的线程,现在我想创建一个已经存在的具有相同名称的线程。如何终止/停止较旧的线程?下面是我用来识别是否存在同名线程的代码:
Set<Thread> processes = Thread.getAllStackTraces().keySet();
processes.forEach(process -> {
if (process.getName().equals(eventConfiguration.getId().toString())) {
System.out.println("Yes Does Exit");
process.interrupt();
}
});
Thread t = new Thread(eventConfigurationThreads);
t.setName(eventConfiguration.getId().toString());
t.start();在这个run方法中,我终止/不启动最新的线程,但我想启动新的线程并终止同名的旧线程。
@Override
public void run() {
while (!Thread.interrupted()) {
try {
ConsumerRecords<String, String> records = anomalyConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
EventProcessHandler eventProcessHandler = new EventProcessHandler(delegateExecution,
runtimeService);
eventProcessHandler
.recordArrived(records, Thread.currentThread().getId(), configurationRecord);
anomalyConsumer.commitAsync();
}
} catch (Exception e) {
try {
anomalyConsumer.close();
} catch (Exception exception) {
}
}
}当我试图执行上面的代码,并调用中断方法时,我得到了中断的异常,我该如何处理这个问题。
发布于 2021-02-12 22:14:41
像往常一样,有许多可能的解决方案来满足您的需求。我会尝试以下方法(这只是一个草稿,请随意尝试和更改)……
public interface TopicEventListener {
void onEvent(byte[] key, byte[] event) throws Exception;
}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Topic implements AutoCloseable {
private final TopicConfig topicConfig;
private final Consumer consumer;
public Topic(final TopicConfig topicConfig) {
this.topicConfig = topicConfig;
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, topicConfig.getBootstrapServers());
// this is a common topic consumer, so we just pull byte arrays and pass them
// to a listener, we don't do any decoding in here
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
// ... all other required properties are here ...
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
try {
consumer.partitionsFor(topicConfig.getTopic()); // just to check connectivity immediately
} catch (final Exception e) {
try {
consumer.close();
} catch (final Exception ignore) {
}
throw e;
}
}
/**
* We can exit from this method only by an exception. Most important cases:
* 1. org.apache.kafka.common.errors.WakeupException - if we call close() method
* 2. org.apache.kafka.common.errors.InterruptException - if the current thread has
* been interrupted
* @see KafkaConsumer#poll(Duration)
*
* @param listener
*/
public void consumeUntilError(final TopicEventListener listener) {
consumer.subscribe(Arrays.asList(topicConfig.getTopic()));
while (true) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<byte[], byte[]> record : records) {
try {
listener.onEvent(record.key(), record.value());
} catch (final InterruptedException e) { // can be thrown by a blocking operation inside onEvent()
throw new org.apache.kafka.common.errors.InterruptException(e);
} catch (final Exception e) {
// just log ("Unexpected error while listener.onEvent() notification", e)
// don't corrupt the consuming loop because of
// an error in a listener
}
}
}
}
@Override
public void close() {
try {
consumer.wakeup();
} finally {
consumer.close();
}
}
} public interface TopicConsumerListener extends TopicEventListener {
void onStarted();
void onStoppedByErrorAndReconnecting(Exception error);
void onStopped();
}
public class TopicConsumer {
private final TopicConfig topicConfig;
private ConsumingThread consumingThread; // guarded by this
private boolean closed; // guarded by this
public TopicConsumer(final TopicConfig topicConfig) {
this.topicConfig = topicConfig;
}
public synchronized void start(final TopicConsumerListener listener) {
if (closed) {
throw new IllegalStateException("Closed");
}
if (consumingThread != null) {
throw new IllegalStateException("Already started");
}
final ConsumingThread ct = new ConsumingThread(listener);
ct.start();
consumingThread = ct;
}
public void stop() throws InterruptedException {
final ConsumingThread ct;
synchronized (this) {
ct = consumingThread;
if (ct == null) {
return;
}
consumingThread = null;
}
ct.close();
if (Thread.currentThread() != ct) {
ct.join();
}
}
public void close() throws InterruptedException {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
stop();
}
private class ConsumingThread extends Thread {
private final TopicConsumerListener listener;
private volatile boolean closed;
ConsumingThread(final TopicConsumerListener listener) {
super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic());
this.listener = listener;
}
@Override
public void run() {
Exception error = null;
while (true) {
Topic tc = null;
try {
tc = new Topic(topicConfig);
try {
listener.onStarted();
} catch (final Exception e) {
// log ("Unexpected error while onStarted() notification", e);
}
// we consume the events from the topic until
// this thread is interrupted by close()
tc.consumeUntilError(listener);
} catch (final Exception e) {
if (closed) {
break;
}
error = e;
} finally {
if (tc != null) {
try {
tc.close();
} catch (final Exception ignore) {
}
}
}
try {
listener.onStoppedByErrorAndReconnecting(error);
} catch (final Exception e) {
//log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", e)
}
try {
Thread.sleep(5000); // TODO: make the timeout configurable and use backoff with jitter
} catch (final InterruptedException e) {
break; // interrupted by close()
// we don't restore the flag interrupted, since we still need
// to do some additional work like
// to notify listener.onStopped()
}
}
try {
listener.onStopped();
} catch (final Exception e) {
//log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", e);
}
}
void close() {
if (closed) { // no atomicity/membars required
return; // since can be called only by one single thread
}
closed = true;
// We stop the consuming with org.apache.kafka.common.errors.InterruptException
// In here it isn't convenient to call Topic.close() directly to initiate
// org.apache.kafka.common.errors.WakeupException, since we recreate
// the instance of Topic and it takes additional efforts to share the
// changeable reference to a Topic to close it from other thread.
interrupt();
}
}
} import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TopicConsumerManager {
private final CommandExecutor commandExecutor;
private volatile BlockingQueue<Object> commands = new LinkedBlockingQueue<>(10_000); // some reasonable limit
// just to prevent possible OOM
public TopicConsumerManager() {
commandExecutor = new CommandExecutor(commands);
commandExecutor.start();
}
public void execute(final Object command) throws InterruptedException {
final BlockingQueue<Object> cms = commands;
if (cms == null) { // closed
return;
}
cms.put(command);
}
public void close() throws InterruptedException {
synchronized (this) {
if (commands == null) {
return;
}
commands = null;
}
commandExecutor.interrupt();
if (Thread.currentThread() != commandExecutor) {
commandExecutor.join();
}
}
private class CommandExecutor extends Thread {
private final Map<Integer, TopicConsumer> consumers = new HashMap<>();
private final BlockingQueue<Object> commands;
public CommandExecutor(final BlockingQueue<Object> commands) {
super("TopicConsumerManager");
this.commands = commands;
}
@Override
public void run() {
try {
while (true) {
final Object command = commands.take();
try {
if (command instanceof MakeTopicConsumer) {
final MakeTopicConsumer makeTc = (MakeTopicConsumer) command;
final TopicConsumer oldTc = consumers.get(makeTc.getConfig().getId());
if (oldTc != null) {
oldTc.close();
}
final TopicConsumer newTc = new TopicConsumer(makeTc.getConfig());
consumers.put(makeTc.getConfig().getId(), newTc);
newTc.start(makeTc.getListener());
}
} catch (final InterruptedException e) {
throw e; // push the exception up to break the loop
} catch (final Exception e) {
// log ("An error while executing: " + command, e)
}
}
} catch (final InterruptedException ignore) {
// we don't need to restore interrupted() flag, since
// we need to do additional job - to close
// all started consumers
} finally {
// close all existing consumers
for (TopicConsumer tc : consumers.values()) {
try {
tc.close();
} catch (final Exception ignore) {
}
}
}
}
}
} public class TopicConfig {
private String bootstrapServers;
private String topic;
public TopicConfig() {
}
public TopicConfig(final String bootstrapServers, final String topic) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getTopic() {
return topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
@Override
public String toString() {
return getClass().getSimpleName() +
" bootstrapServers='" + bootstrapServers + '\'' +
", topic='" + topic + '\'';
}
}
public class IdentifiableTopicConfig extends TopicConfig {
private int id;
public IdentifiableTopicConfig() {
}
public IdentifiableTopicConfig(final int id, final String bootstrapServers, final String topic) {
super(bootstrapServers, topic);
this.id = id;
}
public int getId() {
return id;
}
public void setId(final int id) {
this.id = id;
}
@Override
public String toString() {
return super.toString() +
", id=" + id;
}
}
/**
* A command
*/
public class MakeTopicConsumer {
private final IdentifiableTopicConfig config;
private final TopicConsumerListener listener;
public MakeTopicConsumer(final IdentifiableTopicConfig config, final TopicConsumerListener listener) {
this.config = config;
this.listener = listener;
}
public IdentifiableTopicConfig getConfig() {
return config;
}
public TopicConsumerListener getListener() {
return listener;
}
} public class Application {
public static void main(String[] args) throws Exception {
// initial parameters to start listening to the configs
final String configBootstrapServers = args[0];
final String configTopic = args[1];
final TopicConsumerManager topicManager = new TopicConsumerManager();
final TopicConsumer configConsumer = new TopicConsumer(
new TopicConfig(configBootstrapServers,configTopic));
configConsumer.start(new TopicConsumerListener() {
@Override
public void onStarted() {
System.out.println("Config consuming started.");
}
@Override
public void onStoppedByErrorAndReconnecting(final Exception error) {
System.out.println("Config consuming stopped by error: " +
error.getLocalizedMessage() + " and reconnecting...");
}
@Override
public void onStopped() {
System.out.println("Config consuming stopped.");
}
@Override
public void onEvent(final byte[] key, final byte[] event) throws Exception {
final IdentifiableTopicConfig config = new IdentifiableTopicConfig();
// decode and fill-in IdentifiableTopicConfig from event bytes
System.out.println("Configuration consumed: " + config);
topicManager.execute(new MakeTopicConsumer(config, new TopicConsumerListener() {
@Override
public void onStarted() {
System.out.println("Consuming for " + config + " started.");
}
@Override
public void onStoppedByErrorAndReconnecting(final Exception error) {
System.out.println("Consuming for " + config + " stopped by error: " + error.getLocalizedMessage() + " and reconnecting...");
}
@Override
public void onStopped() {
System.out.println("Consuming for " + config + " stopped.");
}
@Override
public void onEvent(final byte[] key, final byte[] event) {
System.out.println("An event consumed for " + config + '.');
// ... process your event ...
}
}));
}
});
configConsumer.close();
topicManager.close();
}
}https://stackoverflow.com/questions/66103052
复制相似问题