博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java并发(十三):并发工具类——同步屏障CyclicBarrier
阅读量:5843 次
发布时间:2019-06-18

本文共 5043 字,大约阅读时间需要 16 分钟。

先做总结

1、CyclicBarrier 是什么?

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到要求的线程到达都屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

2、CyclicBarrier 实现原理:

  private static class Generation { // 内部类,当有parties个线程到达barrier,就会更新换代

    boolean broken = false; // 是否损坏
  }
  private final ReentrantLock lock = new ReentrantLock(); // 重入锁
  private final Condition trip = lock.newCondition();提供一个条件等待队列
  private final int parties; // 等待线程总数量
  private final Runnable barrierCommand; // 达到等待线程数量后执行的线程
  private Generation generation = new Generation(); // 当有parties个线程到达barrier,就会更新换代
  private int count; // 记录当前线程数量

(1)构造CyclicBarrier 时设置等待线程总数量parties 和 达到等待线程数量后执行的线程barrierCommand;

(2)线程中调用CyclicBarrier 的await() 方法时,将线程park() 并放入trip 的条件队列中,记录当前阻塞线程数 count;

(3)线程中调用CyclicBarrier 的await() 方法时,发现当前阻塞线程数 count达到了 构造时设置的总线程数,做如下操作:

    ① trip.signalAll()唤醒所有线程

    ② 重置count(从而实现同一个CyclicBarrier 对象可以循环使用)

    ③ 执行barrierCommand线程

一、应用举例

public class CyclicBarrierTest {    private static CyclicBarrier cyclicBarrier;    static class CyclicBarrierThread extends Thread {        public void run() {            System.out.println(Thread.currentThread().getName() + "到了");            try {                cyclicBarrier.await();            } catch (Exception e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        cyclicBarrier = new CyclicBarrier(5, new Runnable() {            @Override            public void run() {                System.out.println("人到齐了,开会吧....");            }        });        for (int i = 0; i < 5; i++) {            new CyclicBarrierThread().start();        }    }}

二、类结构

public class CyclicBarrier {    private static class Generation { // 内部类,当有parties个线程到达barrier,就会更新换代        boolean broken = false; // 是否损坏    }    private final ReentrantLock lock = new ReentrantLock(); // 重入锁    private final Condition trip = lock.newCondition();    private final int parties; // 等待线程总数量    private final Runnable barrierCommand; // 达到等待线程数量后执行的线程    private Generation generation = new Generation(); // 当有parties个线程到达barrier,就会更新换代    private int count; // 记录当前线程数量         public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }    public CyclicBarrier(int parties) {        this(parties, null);    }}

三、原理解析

public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }        private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {        final ReentrantLock lock = this.lock;        lock.lock();        try {            final Generation g = generation;            if (g.broken)                throw new BrokenBarrierException();            if (Thread.interrupted()) {                breakBarrier(); // 代失效,唤醒所有线程                throw new InterruptedException();            }            int index = --count; // 计数            if (index == 0) { // 达到要求数量                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    if (command != null)                        command.run(); // 达到等待线程数量后执行barrierCommand                    ranAction = true;                    nextGeneration(); // 唤醒本代所有线程,生成新一代,重置count                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }            // 线程数量未达到要求数量,将线程挂起等待            for (;;) {                try {                    if (!timed)                        trip.await(); // 将线程加入condition队列挂起                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    if (g == generation && !g.broken) {                        breakBarrier();                        throw ie;                    } else {                        Thread.currentThread().interrupt();                    }                }                // 特殊情况处理                if (g.broken)                    throw new BrokenBarrierException();                if (g != generation)                    return index;                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            lock.unlock();        }    }        // 代失效,唤醒所有线程    private void breakBarrier() {        generation.broken = true;        count = parties;        trip.signalAll();    }    // 唤醒本代所有线程,生成新一代,重置count    private void nextGeneration() {        trip.signalAll();        count = parties;        generation = new Generation();    }

 

 

转载于:https://www.cnblogs.com/hexinwei1/p/9982420.html

你可能感兴趣的文章
我的友情链接
查看>>
写Use Case的一种方式,从oracle的tutorial抄来的
查看>>
【C#】protected 变量类型
查看>>
爬虫去重(只是讲了去重的策略,没有具体讲实现过程,反正就是云里雾里)...
查看>>
react中将px转化为rem或者vw
查看>>
avcodec_open2()分析
查看>>
何如获取单选框中某一个选中的值
查看>>
QQ悬浮返回顶部
查看>>
MySQL建表语句的一些特殊字段
查看>>
腾讯前端二面题目详解
查看>>
mascara-1
查看>>
Jquery Form表单取值
查看>>
Android API level 与version对应关系
查看>>
Team Name
查看>>
String类
查看>>
西门子_TDC_数据耦合小经验
查看>>
[LeetCode] Copy List with Random Pointer
查看>>
openstack部署之nova
查看>>
JS组件系列——表格组件神器:bootstrap table
查看>>
存储过程Oracle(一)
查看>>