首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Java中使用线程安全信号来暂停线程

如何在Java中使用线程安全信号来暂停线程
EN

Stack Overflow用户
提问于 2013-01-13 21:44:28
回答 3查看 1.3K关注 0票数 3

我有一堆并发运行的线程。有时,一个线程需要通知其他线程等待它完成一个作业,并再次向它们发出恢复的信号。因为我是Java同步的新手,所以我想知道做这件事的正确方法是什么。我的代码是这样的:

代码语言:javascript
复制
private void Concurrent() {
    if (shouldRun()) {
        // notify threads to pause and wait for them
        DoJob();
        // resume threads
    }

    // Normal job...
}

更新:

请注意,我编写的代码位于一个类中,该类将由每个线程执行。我无法访问这些线程或它们是如何运行的。我只是在线程内部。

更新2:

我的代码来自一个爬虫类。crawler类(crawler4j)知道如何处理并发。我唯一需要做的就是在运行函数之前暂停其他爬虫,之后再恢复它们。这段代码是我的爬虫的基础:

代码语言:javascript
复制
   public class TestCrawler extends WebCrawler {
    private SingleThread()
    {
        //When this function is running, no other crawler should do anything
    }

    @Override
    public void visit(Page page) {
        if(SomeCriteria())
        {
            //make all other crawlers stop until I finish
            SingleThread();
            //let them resume
        }

        //Normal Stuff
    }
   }
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2013-01-13 22:11:50

下面是一个简短的示例,说明如何使用很酷的java并发工具来实现这一点:

对于Pause类,截取旧代码不再重要。

编辑:

下面是新的Test类:

代码语言:javascript
复制
package de.hotware.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    private Pause mPause;

    public Test() {
        this.mPause = new Pause();
    }

    public void concurrent() throws InterruptedException {
        while(true) {
            this.mPause.probe();
            System.out.println("concurrent");
            Thread.sleep(100);
        }
    }

    public void crucial() throws InterruptedException {
        int i = 0;
        while (true) {
            if (i++ % 2 == 0) {
                this.mPause.pause(true);
                System.out.println("crucial: exclusive execution");
                this.mPause.pause(false);
            } else {
                System.out.println("crucial: normal execution");
                Thread.sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        final Test test = new Test();
        Runnable run = new Runnable() {

            @Override
            public void run() {
                try {
                    test.concurrent();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        };
        Runnable cruc = new Runnable() {

            @Override
            public void run() {
                try {
                    test.crucial();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        };
        ExecutorService serv = Executors.newCachedThreadPool();
        serv.execute(run);
        serv.execute(run);
        serv.execute(cruc);
    }

}

和实用工具Pause类:

代码语言:javascript
复制
package de.hotware.test;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Utility class to pause and unpause threads
 * with Java Concurrency
 * @author Martin Braun
 */
public class Pause {

    private Lock mLock;
    private Condition mCondition;
    private AtomicBoolean mAwait;

    public Pause() {
        this.mLock = new ReentrantLock();
        this.mCondition = this.mLock.newCondition();
        this.mAwait = new AtomicBoolean(false);
    }

    /**
     * waits until the threads until this.mAwait is set to true
     * @throws InterruptedException
     */
    public void probe() throws InterruptedException {
        while(this.mAwait.get()) {
            this.mLock.lock();
            try {
                this.mCondition.await();
            } finally {
                this.mLock.unlock();
            }
        }
    }

    /**
     * pauses or unpauses
     */
    public void pause(boolean pValue) {
        if(!pValue){
            this.mLock.lock();
            try {
                this.mCondition.signalAll();
            } finally {
                this.mLock.unlock();
            }
        }
        this.mAwait.set(pValue);
    }

}

基本用法是在每次运行之前调用probe()。如果在调用pause(false)之前暂停,则会阻塞。

您的类将如下所示:

代码语言:javascript
复制
public class TestCrawler extends WebCrawler {

private Pause mPause;

public TestCrawler(Pause pPause) {
    this.mPause = pPause;
}

private SingleThread()
{
        //When this function is running, no other crawler should do anything
}

@Override
public void visit(Page page) {
    if(SomeCriteria())
    {
        //only enter the crucial part once if it has to be exclusive
        this.mPause.probe();
        //make all other crawlers stop until I finish
        this.mPause.pause(true);
        SingleThread();
        //let them resume
        this.mPause.pause(false);
    }
    this.mPause.probe();
    //Normal Stuff
}
}
票数 2
EN

Stack Overflow用户

发布于 2013-01-13 21:52:24

代码语言:javascript
复制
public class StockMonitor extends Thread {

    private boolean suspend = false; 
    private volatile Thread thread;

    public StockMonitor() {
        thread = this;
    }

    // Use name with underscore, in order to avoid naming crashing with
    // Thread's.
    private synchronized void _wait() throws InterruptedException {
        while (suspend) {
            wait();
        }
    }

    // Use name with underscore, in order to avoid naming crashing with
    // Thread's.
    public synchronized void _resume() {
        suspend = false;
        notify();
    }

    // Use name with underscore, in order to avoid naming crashing with
    // Thread's.
    public synchronized void _suspend() {
        suspend = true;
    }  

     public void _stop() { 
        thread = null;
        // Wake up from sleep.
        interrupt();     
     }

     @Override
     public void run() {
        final Thread thisThread = Thread.currentThread();
        while (thisThread == thread) {
            _wait();
            // Do whatever you want right here.
        }
     }
}

调用_resume_suspend将使您能够恢复和暂停Thread_stop将允许您优雅地停止线程。请注意,一旦停止Thread,就没有办法再次恢复它。Thread不再可用。

代码取自一个真实的开源项目:http://jstock.hg.sourceforge.net/hgweb/jstock/jstock/file/b17c0fbfe37c/src/org/yccheok/jstock/engine/RealTimeStockMonitor.java#l247

票数 1
EN

Stack Overflow用户

发布于 2013-01-13 21:49:29

可以使用wait()和notify()

线程正在等待:

代码语言:javascript
复制
  // define mutex as field
  Object mutex = new Object();

   // later:
   synchronized(mutex) {
        wait();
   }

通知线程继续

代码语言:javascript
复制
   synchronized (mutex) {
        notify();
   }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/14304074

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档