首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何停止正在侦听kafka-topic的前一个线程

如何停止正在侦听kafka-topic的前一个线程
EN

Stack Overflow用户
提问于 2021-02-08 21:56:38
回答 1查看 272关注 0票数 0

假设我有一个使用唯一名称(由我设置)的线程,现在我想创建一个已经存在的具有相同名称的线程。如何终止/停止较旧的线程?下面是我用来识别是否存在同名线程的代码:

代码语言:javascript
复制
 Set<Thread> processes = Thread.getAllStackTraces().keySet();
  processes.forEach(process -> {
    if (process.getName().equals(eventConfiguration.getId().toString())) {
      System.out.println("Yes Does Exit");
      
      process.interrupt();
    }
  });

  Thread t = new Thread(eventConfigurationThreads);
  t.setName(eventConfiguration.getId().toString());
  t.start();

在这个run方法中,我终止/不启动最新的线程,但我想启动新的线程并终止同名的旧线程。

代码语言:javascript
复制
 @Override
public void run() {
while (!Thread.interrupted()) {
  try {
    ConsumerRecords<String, String> records = anomalyConsumer.poll(Duration.ofMillis(100));
    if (!records.isEmpty()) {

      EventProcessHandler eventProcessHandler = new EventProcessHandler(delegateExecution,
          runtimeService);

      eventProcessHandler
          .recordArrived(records, Thread.currentThread().getId(), configurationRecord);
      anomalyConsumer.commitAsync();
    }
  } catch (Exception e) {
    try {
      anomalyConsumer.close();
    } catch (Exception exception) {
    }

  }
}

当我试图执行上面的代码,并调用中断方法时,我得到了中断的异常,我该如何处理这个问题。

EN

回答 1

Stack Overflow用户

发布于 2021-02-12 22:14:41

像往常一样,有许多可能的解决方案来满足您的需求。我会尝试以下方法(这只是一个草稿,请随意尝试和更改)……

  1. 我们将同时使用配置和异常,所以,让我们介绍一些通用的主题消费者,它需要停止(关闭)的机制及其侦听器:

代码语言:javascript
复制
    public interface TopicEventListener {
        void onEvent(byte[] key, byte[] event) throws Exception;
    }
   

    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.ByteArraySerializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class Topic implements AutoCloseable {
        private final TopicConfig topicConfig;
    
        private final Consumer consumer;
    
        public Topic(final TopicConfig topicConfig) {
            this.topicConfig = topicConfig;
    
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, topicConfig.getBootstrapServers());
            // this is a common topic consumer, so we just pull byte arrays and pass them
            // to a listener, we don't do any decoding in here
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
            // ... all other required properties are here ...
    
            consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
            try {
                consumer.partitionsFor(topicConfig.getTopic()); // just to check connectivity immediately
            } catch (final Exception e) {
                try {
                    consumer.close();
                } catch (final Exception ignore) {
                }
                throw e;
            }
        }
    
        /**
         * We can exit from this method only by an exception. Most important cases:
         * 1. org.apache.kafka.common.errors.WakeupException - if we call close() method
         * 2. org.apache.kafka.common.errors.InterruptException - if the current thread has
         * been interrupted
         * @see KafkaConsumer#poll(Duration)
         *
         * @param listener
         */
        public void consumeUntilError(final TopicEventListener listener) {
            consumer.subscribe(Arrays.asList(topicConfig.getTopic()));
    
            while (true) {
                final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(10));
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    try {
                        listener.onEvent(record.key(), record.value());
                    } catch (final InterruptedException e) { // can be thrown by a blocking operation inside onEvent()
                        throw new org.apache.kafka.common.errors.InterruptException(e);
                    } catch (final Exception e) {
                        // just log ("Unexpected error while listener.onEvent() notification", e)
                        // don't corrupt the consuming loop because of
                        // an error in a listener
                    }
                }
            }
        }
    
        @Override
        public void close() {
            try {
                consumer.wakeup();
            } finally {
                consumer.close();
            }
        }
    }

  1. 现在,让我们创建一个使用主题来使用数据并实现重新连接(故障转移)功能及其侦听器的线程:

代码语言:javascript
复制
    public interface TopicConsumerListener extends TopicEventListener {
        void onStarted();
    
        void onStoppedByErrorAndReconnecting(Exception error);
    
        void onStopped();
    }

    public class TopicConsumer {
        private final TopicConfig topicConfig;
    
        private ConsumingThread consumingThread; // guarded by this
        private boolean closed; // guarded by this
    
        public TopicConsumer(final TopicConfig topicConfig) {
            this.topicConfig = topicConfig;
        }
    
        public synchronized void start(final TopicConsumerListener listener) {
            if (closed) {
                throw new IllegalStateException("Closed");
            }
    
            if (consumingThread != null) {
                throw new IllegalStateException("Already started");
            }
    
            final ConsumingThread ct = new ConsumingThread(listener);
            ct.start();
            consumingThread = ct;
        }
    
        public void stop() throws InterruptedException {
            final ConsumingThread ct;
            synchronized (this) {
                ct = consumingThread;
    
                if (ct == null) {
                    return;
                }
    
                consumingThread = null;
            }
    
            ct.close();
    
            if (Thread.currentThread() != ct) {
                ct.join();
            }
        }
    
        public void close() throws InterruptedException {
            synchronized (this) {
                if (closed) {
                    return;
                }
                closed = true;
            }
    
            stop();
        }
    
        private class ConsumingThread extends Thread {
            private final TopicConsumerListener listener;
            private volatile boolean closed;
    
            ConsumingThread(final TopicConsumerListener listener) {
                super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic());
                this.listener = listener;
            }
    
            @Override
            public void run() {
                Exception error = null;
    
                while (true) {
                    Topic tc = null;
                    try {
                        tc = new Topic(topicConfig);
    
                        try {
                            listener.onStarted();
                        } catch (final Exception e) {
                            // log ("Unexpected error while onStarted() notification", e);
                        }
    
                        // we consume the events from the topic until
                        // this thread is interrupted by close()
                        tc.consumeUntilError(listener);
                    } catch (final Exception e) {
                        if (closed) {
                            break;
                        }
                        error = e;
                    } finally {
                        if (tc != null) {
                            try {
                                tc.close();
                            } catch (final Exception ignore) {
                            }
                        }
                    }
    
                    try {
                        listener.onStoppedByErrorAndReconnecting(error);
                    } catch (final Exception e) {
                        //log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", e)
                    }
    
                    try {
                        Thread.sleep(5000); // TODO: make the timeout configurable and use backoff with jitter
                    } catch (final InterruptedException e) {
                        break; // interrupted by close()
                        // we don't restore the flag interrupted, since we still need
                        // to do some additional work like
                        // to notify listener.onStopped()
                    }
                }
    
                try {
                    listener.onStopped();
                } catch (final Exception e) {
                    //log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", e);
                }
            }
    
            void close() {
                if (closed) { // no atomicity/membars required
                    return; // since can be called only by one single thread
                }
                closed = true;
    
                // We stop the consuming with org.apache.kafka.common.errors.InterruptException
                // In here it isn't convenient to call Topic.close() directly to initiate
                // org.apache.kafka.common.errors.WakeupException, since we recreate
                // the instance of Topic and it takes additional efforts to share the
                // changeable reference to a Topic to close it from other thread.
                interrupt();
            }
        }
    }

  1. 到目前为止,我们可以通过故障转移来使用Kafka主题中的字节数组。是时候使用工作队列模式来管理我们的可配置消费者了。该类有一个线程,它使用阻塞队列接收要执行的命令(包括配置)。注意,这一点很重要。管理实例及其状态的单个线程可防止任何可能的争用情况。如果已经启动了具有相同ID的使用者,则关闭它,然后才启动新的使用者:

代码语言:javascript
复制
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;

    public class TopicConsumerManager {
        private final CommandExecutor commandExecutor;

        private volatile BlockingQueue<Object> commands = new LinkedBlockingQueue<>(10_000); // some reasonable limit
        // just to prevent possible OOM

        public TopicConsumerManager() {
            commandExecutor = new CommandExecutor(commands);
            commandExecutor.start();
        }

        public void execute(final Object command) throws InterruptedException {
            final BlockingQueue<Object> cms = commands;
            if (cms == null) { // closed
                return;
            }
            cms.put(command);
        }

        public void close() throws InterruptedException {
            synchronized (this) {
                if (commands == null) {
                    return;
                }
                commands = null;
            }

            commandExecutor.interrupt();

            if (Thread.currentThread() != commandExecutor) {
                commandExecutor.join();
            }
        }

        private class CommandExecutor extends Thread {
            private final Map<Integer, TopicConsumer> consumers = new HashMap<>();
            private final BlockingQueue<Object> commands;

            public CommandExecutor(final BlockingQueue<Object> commands) {
                super("TopicConsumerManager");
                this.commands = commands;
            }

            @Override
            public void run() {
                try {
                    while (true) {
                        final Object command = commands.take();
                        try {
                            if (command instanceof MakeTopicConsumer) {
                                final MakeTopicConsumer makeTc = (MakeTopicConsumer) command;

                                final TopicConsumer oldTc = consumers.get(makeTc.getConfig().getId());
                                if (oldTc != null) {
                                    oldTc.close();
                                }

                                final TopicConsumer newTc = new TopicConsumer(makeTc.getConfig());
                                consumers.put(makeTc.getConfig().getId(), newTc);

                                newTc.start(makeTc.getListener());
                            }
                        } catch (final InterruptedException e) {
                            throw e; // push the exception up to break the loop
                        } catch (final Exception e) {
                            // log ("An error while executing: " + command, e)
                        }
                    }
                } catch (final InterruptedException ignore) {
                    // we don't need to restore interrupted() flag, since
                    // we need to do additional job - to close
                    // all started consumers
                } finally {
                    // close all existing consumers
                    for (TopicConsumer tc : consumers.values()) {
                        try {
                            tc.close();
                        } catch (final Exception ignore) {
                        }
                    }
                }
            }
        }
    }

  1. 我们的结构是:

代码语言:javascript
复制
    public class TopicConfig {
        private String bootstrapServers;
        private String topic;
    
        public TopicConfig() {
        }
    
        public TopicConfig(final String bootstrapServers, final String topic) {
            this.bootstrapServers = bootstrapServers;
            this.topic = topic;
        }
    
        public String getBootstrapServers() {
            return bootstrapServers;
        }
    
        public void setBootstrapServers(final String bootstrapServers) {
            this.bootstrapServers = bootstrapServers;
        }
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(final String topic) {
            this.topic = topic;
        }
    
        @Override
        public String toString() {
            return getClass().getSimpleName() +
                    " bootstrapServers='" + bootstrapServers + '\'' +
                    ", topic='" + topic + '\'';
        }
    }
    
    public class IdentifiableTopicConfig extends TopicConfig {
        private int id;
    
        public IdentifiableTopicConfig() {
        }
    
        public IdentifiableTopicConfig(final int id, final String bootstrapServers, final String topic) {
            super(bootstrapServers, topic);
            this.id = id;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(final int id) {
            this.id = id;
        }
    
        @Override
        public String toString() {
            return super.toString() +
                    ", id=" + id;
        }
    }
    
    /**
     * A command
     */
    public class MakeTopicConsumer {
        private final IdentifiableTopicConfig config;
        private final TopicConsumerListener listener;
    
        public MakeTopicConsumer(final IdentifiableTopicConfig config, final TopicConsumerListener listener) {
            this.config = config;
            this.listener = listener;
        }
    
        public IdentifiableTopicConfig getConfig() {
            return config;
        }
    
        public TopicConsumerListener getListener() {
            return listener;
        }
    }

  1. 现在,让我们使用一些胶水...

代码语言:javascript
复制
    public class Application {
        public static void main(String[] args) throws Exception {
            // initial parameters to start listening to the configs
            final String configBootstrapServers = args[0];
            final String configTopic = args[1];
    
            final TopicConsumerManager topicManager = new TopicConsumerManager();
            final TopicConsumer configConsumer = new TopicConsumer(
                    new TopicConfig(configBootstrapServers,configTopic));
    
            configConsumer.start(new TopicConsumerListener() {
                @Override
                public void onStarted() {
                    System.out.println("Config consuming started.");
                }
    
                @Override
                public void onStoppedByErrorAndReconnecting(final Exception error) {
                    System.out.println("Config consuming stopped by error: " +
                                error.getLocalizedMessage() + " and reconnecting...");
                }
    
                @Override
                public void onStopped() {
                    System.out.println("Config consuming stopped.");
                }
    
                @Override
                public void onEvent(final byte[] key, final byte[] event) throws Exception {
                    final IdentifiableTopicConfig config = new IdentifiableTopicConfig();
                    // decode and fill-in IdentifiableTopicConfig from event bytes
    
                    System.out.println("Configuration consumed: " + config);
    
                    topicManager.execute(new MakeTopicConsumer(config, new TopicConsumerListener() {
                        @Override
                        public void onStarted() {
                            System.out.println("Consuming for " + config + " started.");
                        }
    
                        @Override
                        public void onStoppedByErrorAndReconnecting(final Exception error) {
                            System.out.println("Consuming for " + config + " stopped by error: " + error.getLocalizedMessage() + " and reconnecting...");
                        }
    
                        @Override
                        public void onStopped() {
                            System.out.println("Consuming for " + config + " stopped.");
                        }
    
                        @Override
                        public void onEvent(final byte[] key, final byte[] event) {
                            System.out.println("An event consumed for " + config + '.');
                            // ... process your event ...
                        }
                    }));
                }
            });

            configConsumer.close();
            topicManager.close();
        }
    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66103052

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档