首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我看不到Disruptor的性能改进

我看不到Disruptor的性能改进
EN

Stack Overflow用户
提问于 2014-01-26 11:49:56
回答 2查看 921关注 0票数 0

我知道我的问题违背了Disruptor API的基本主张。但随着我对它的了解,我写了一个程序来替换我使用ArrayLinkedBlockingQueue的1P-1C用例。但是当我运行这个程序时,我发现使用disruptor的总时间比ArrayLinkedBlockingQueue还要长。我一定是做错了什么或测量错了,但我不确定我的程序中是什么。有谁有意见吗?

(这是一个测试程序,所以很明显我的EventHandler什么也没做)

代码语言:javascript
复制
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class SPSCDisruptorTest {
    private static final int UNIT_SIZE = 1024;
    private static final int BUFFER_SIZE = UNIT_SIZE * 1024 * 16;
    private static final int ITERATIONS = BUFFER_SIZE;
    private static final Logger logger = LoggerFactory
            .getLogger(SPSCDisruptorTest.class);

    private static class Data {
        private String data;

        public String getData() {
            return data;
        }

        public void setData(String data) {
            this.data = data;
        }

        @Override
        public String toString() {
            return "Data [data=" + data + "]";
        }

        public final static EventFactory<Data> DATA_FACTORY = new EventFactory<Data>() {

            @Override
            public Data newInstance() {
                return new Data();
            }

        };
    }

    private static class DataEventTranslator implements EventTranslator<Data> {
        private String payload;

        public DataEventTranslator(String payload) {
            this.payload = payload;
        }

        @Override
        public void translateTo(Data d, long sequence) {
            d.setData(payload);
        }

    };

    public static void main(String[] args) throws InterruptedException {
        new SPSCDisruptorTest().testDisruptor();
        new SPSCDisruptorTest().testExecutor();
    }

    @SuppressWarnings("unchecked")
    public void testDisruptor() {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        Disruptor<Data> disruptor = new Disruptor<Data>(
                SPSCDisruptorTest.Data.DATA_FACTORY, BUFFER_SIZE, exec,
                ProducerType.SINGLE, new BusySpinWaitStrategy());
        disruptor.handleEventsWith(new EventHandler<Data>() {

            @Override
            public void onEvent(Data data, long sequence, boolean endOfBatch)
                    throws Exception {
            }

        });
        long t1 = System.nanoTime();
        RingBuffer<Data> buffer = disruptor.start();
        for (int i = 1; i <= ITERATIONS; i++) {
            buffer.publishEvent(new DataEventTranslator("data" + i));
        }
        logger.info("waiting for shutdown");
        disruptor.shutdown();
        logger.info("Disruptor Time (ms): " + (System.nanoTime() - t1 * 1.0)
                / 1000);
        logger.info("Disruptor is shutdown");
        exec.shutdown();
    }

    public void testExecutor() throws InterruptedException {
        ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(
                        BUFFER_SIZE));
        long t1 = System.nanoTime();
        for (int i = 1; i <= ITERATIONS; i++) {
            executor.submit(new DataRunner("data" + i));
        }
        executor.shutdown();
        executor.awaitTermination(5000, TimeUnit.SECONDS);
        logger.info("Executor Time (ms): " + (System.nanoTime() - t1 * 1.0)
                / 1000);
    }

    private static class DataRunner implements Runnable {
        private String data;

        public DataRunner(String data) {
            this.data = data;
        }

        @Override
        public void run() {
        }

    }
}
EN

回答 2

Stack Overflow用户

发布于 2014-02-06 20:43:00

你实际上是在错误地衡量它。您应该在启动中断程序后开始测量,因为预热需要时间(分配环形缓冲区)。因为你的缓冲区很大,所以在热身时需要相当长的时间。尝试下面的示例代码。这应该会给你更好的时间。

代码语言:javascript
复制
    RingBuffer<Data> buffer = disruptor.start();
    long t1 = System.nanoTime();
    for (int i = 1; i <= ITERATIONS; i++) {
        buffer.publishEvent(new DataEventTranslator("data" + i));
    }
    logger.info("waiting for shutdown");
    disruptor.shutdown();
    logger.info("Disruptor Time (ms): " + (System.nanoTime() - t1 * 1.0)
            / 1000);
票数 0
EN

Stack Overflow用户

发布于 2018-04-18 07:14:46

您几乎没有足够的争论点来展示无锁中断程序是如何提供帮助的。特别是,你的executor队列和迭代一样大!所有数据都可以放入executor队列中,因此它基本上不会在notempty/notfull条件下运行。

Executor服务也很糟糕,因为如果队列更小,您就会拒绝执行。您需要比较的是具有有限队列(可能有1000个长)并阻塞.put()/.take()调用的两个线程。

更糟糕的是,您需要成批的数据(而不是1乘1)和许多读取器,甚至可能需要许多写入器。使用executor测试中的争用队列访问,中断器应该可以毫无问题地显示其性能。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21359445

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档