线程安全-尝试找出 Counter 例子中重复计算的数字
in 代码 with 0 comment

线程安全-尝试找出 Counter 例子中重复计算的数字

in 代码 with 0 comment

线程安全-找出不安全的数据

什么才是线程安全?

《Java Concurrency in Practice》 有一个比较恰当的定义 :“当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步或者在调用方法进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个对象是线程安全的。”

经典 Counter 计数例子🌰

/**
 * MultiErrorDemo 多线程环境下的常见的计数错误
 *
 * @author suremotoo
 * @date 2022/11/07 12:24
 */
public class MultiErrorDemoCounter implements Runnable {

    static int index = 0;
    static MultiErrorDemoCounter errorDemo = new MultiErrorDemoCounter();

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            index++;
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(errorDemo);
        Thread t2 = new Thread(errorDemo);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println(index);
    }
}

没错,上述代码,我们启动了 2 个线程:t1、t2 ,分别对 index 进行 10 万次的计数,也就是分别执行 index++

结果,index 最终我们 预期值为:200000,为实际运行结果,往往都是 小于 200000

这就是典型的线程不安全问题操作,因为 两个线程执行同一个 index 对象的时候,总有可能是针对同一个数值计算,这样重复计算才导致真是数字往往小于预期!

小解决

当然,我们给 index++ 使用一个 synchronized 同步锁即可解决,run 方法调整后代码如下:

public void run() {
    for (int i = 0; i < 10000; i++) {
      // 需要获得 errorDemo 对象的锁才能进行 index++, 从而保证准确计算
        synchronized(errorDemo) {
            index++;
        }
    }
}

当然,更多具体过程可以参考文档 synchronized 分析文章(我还没出呢😁)

尝试找出 Counter 例子中重复计算的数字!

鉴于上面的问题,大部分想法都是去解决这个问题,既然针对某些值进行了重复计算,那么能不能尝试着能不能找出到底是哪些值呢?本着研究学习的心态,去试一试!💪💪

第一步:记录

既然是重复计算,那么我们就每次计算的都记录一下,判断一下是不是已经计算过了,计算过了就打印出来。

public class FindErrorNumsCounter implements Runnable {

    static FindErrorNumsCounter instance = new FindErrorNumsCounter();

    int index = 0;

    /**
     * 真正运行的次数,该值正好和 index 理论上计算的值是一致
     */
    static AtomicInteger realCount = new AtomicInteger();

    /**
     * 错误的次数
     */
    static AtomicInteger errorCount = new AtomicInteger();

    /**
     * 记录标记计算的数字,容量比理论计算的数值大一些,以便都能装进去
     */
    static boolean[] marked = new boolean[1000000];


    @Override
    public void run() {
        for (int i = 0; i < 100000; i++) {
            index++;
            realCount.incrementAndGet();
            // 判断 index 是否已经计算过
            if (marked[index]) {
                System.out.println("出错了: " + index);
                errorCount.incrementAndGet();
            }
            marked[index] = true;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("真正计算的次数: " + realCount.get());
        System.out.println("错误计算的次数: " + errorCount.get());
        System.out.println("实际结果: " + instance.index);
        System.out.println("----------------");
    }
}

为了方便,我们先定义了 3 个变量:

boolean[] marked:用于记录标记已经计算过的数值;
AtomicInteger realCount:用于记录理论预期的正确值;
AtomicInteger errorCount: 记录重复计算的次数;

重点就分析一下 run() 方法

@Override
public void run() {
    for (int i = 0; i < 100000; i++) {
        index++;
        // 计算次数加 1,统计真正计算的次数,也就是理论上 index 的值
        realCount.incrementAndGet();
        // 判断 index 是否已经计算过,如果已经计算过,说明重复计算,则打印出来该值
        if (marked[index]) {
            System.out.println("出错了: " + index);
            // 同时重复计算的次数加 1 
            errorCount.incrementAndGet();
        }
        // 没有重复计算则添加到数组中,标记已经计算过
        marked[index] = true;
    }
}

运行一下:

真正计算的次数: 200000
错误计算的次数: 255
实际结果: 199604

会发现数字很离谱,理论上 实际结果+错误计算的次数=真正计算的次数 才对!

这里我们犯了一个错误,就是我们统计重复计算的逻辑,也是线程不安全的!就是这里:

// 判断 index 是否已经计算过,如果已经计算过,说明重复计算,则打印出来该值
if (marked[index]) {
    System.out.println("出错了: " + index);
    // 同时重复计算的次数加 1 
    errorCount.incrementAndGet();
}
// 没有重复计算则添加到数组中,标记已经计算过
marked[index] = true;

错误分析

错误分析

前提条件: 假如两个线程发生了重复计算,index 从 0 开始,都执行完 index++index 的值都为 1;

  1. 第 1 个线程判断 if (marked[index]) 不符合,那么要标记该 index 已经计算过 ,也就是要执行 marked[index] = true;

  2. 结果第 1 线程还没执行 marked[index] = true;,偏偏 CPU 调度切换执行第 2 个线程;

  3. 第 2 个线程判断 if (marked[index]) 也不符合,然后第 2 个线程就执行了 marked[index] = true;

  4. 第 2 个线程执行完成后,CPU 调度又切回第 1 个线程去执行 marked[index] = true;

那么最终两个线程对同一个 index 进行了标记!就跟 index++ 重复计算一样,这样就是两个线程冲突,却没有统计到出错的数字。

示例图

可以配合该动图,理解上面的话

find-counter-error-marked-unsafe

没解决问题反而还新造出了新问题🤦🤦

第二步:记录调整-增加 synchronized

上面记录重复次数的代码不安全,那么我们用 synchronized 来同步这段代码试试呀!😏😏

@Override
public void run() {
    for (int i = 0; i < 100000; i++) {
        index++;
        realCount.incrementAndGet();
        // 使用 synchronized 保护
        synchronized (instance) {
            if (marked[index]) {
                System.out.println("出错了: " + index);
                errorCount.incrementAndGet();
            }
            marked[index] = true;
        }
    }
}

这样我们再看看~

... ... 
出错了: 183544
出错了: 190630
真正计算的次数: 200000
错误计算的次数: 1781
实际结果: 199999

多运行几次,实际结果 199999 都已经逼近 真正计算的次数 200000 了,可是这个 错误计算的次数 竟然高的离谱!应该是 1 的呀!😦😦


错误分析

错误分析
前提条件: 假如两个线程没有发生冲突,正常计算,index 从 0 开始,第 1 个线程执行完 index++index 的值为 1

提示 2 : 这个又要提到一个概念,就是 synchronized 拥有一个特性:线程可见

  1. 第 1 个线程正常执行 index++ ,index 值变为 1,紧接着进入 synchronized 中,第 2 个线程是无法进入 synchronized 代码块的。

  2. 第 1 个线程此时即将要执行 if (marked[index]) 代码,却又没执行的时候, CPU 调度又回去让第 2 个线程继续执行;

  3. 这时候第 2 个线程又执行 index++index 变为 2,执行完后 CPU 调度又回去让第 1 个线程执行,这时候第 1 个线程要执行: if (marked[index]) 代码

counter-error-img-1

  1. 由于 synchronized 的线程可见性, 1 个线程可以看到之前的线程干了什么事情,这样第 1 个线程本来要 if (marked[index]) 判断的是 marked[1],由于第 2 个线程的结果导致变成了 marked[2]

  2. 如此的话,第 1 个线程将 index 2 就被标记为 true,然后退出 synchronzied 代码块。

  3. 轮到第 2 个线程继续执行的时候,第 2 个线程执行 if (marked[index]) 判断的也是 marked[2],因为第 1 个线程已经标记过了,所以会满足 ``if (marked[index])的条件, 从而打印出出错了`。

可实际上两个线程并没有冲突,1 个线程将 0+1=1,另 1 个线程将 1+1=2。

这样下来,本来正确的计算,却打印出了 1 次 出错了,就会导致上述 错误计算的次数 统计过多的问题了。

示例图

可以配合该动图,理解上面的话

find-counter-error-marked-synchronized

第三步:记录调整-保持每两个线程一组

上面分析增加 synchronized 还不行,因为 CPU 调度,可能会让前面的线程一、线程二中的某一个线程步骤加快,进入下一次 index++ 的计算,那么我们就控制一下,每次 index++ 前确保是 2 个线程一起来!

这时候我们引入一个新的工具类:CyclicBarrier,先不用理解它到底是什么,只需要知道它到底有什么作用即可。

CyclicBarrier 其实就个栅栏,假如你有个牧场,养了一窝的阿拉斯加,它们也总是兴致勃勃,为了不让他们乱跑,你为了个栅栏把他们圈起来~,哪天你想放出来遛遛它们,打开栅栏,那叫一个:斯如涌泉 🤪,一下子全冲出来了~

CyclicBarrier 其实就跟这个差不多,我们可以设置一个条件,比如有 2 个线程都等待就绪后,然后才允许放行!我们看代码:

/**
 * FindErrorNumsCounter
 *
 * @author suremotoo
 * @date 2022/11/07 19:57
 */
public class FindErrorNumsCounter implements Runnable {

    static FindErrorNumsCounter instance = new FindErrorNumsCounter();

    int index = 0;

    /**
     * 真正运行的次数,该值正好和 index 理论上计算的值是一致
     */
    static AtomicInteger realCount = new AtomicInteger();

    /**
     * 错误的次数
     */
    static AtomicInteger errorCount = new AtomicInteger();

    /**
     * 记录标记计算的数字,容量比理论计算的数值大一些
     */
    static boolean[] marked = new boolean[1000000];

    static volatile CyclicBarrier cyclicBarrier1 = new CyclicBarrier(2);

    @Override
    public void run() {
        for (int i = 0; i < 100000; i++) {
            try {
                cyclicBarrier1.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            index++;
            realCount.incrementAndGet();
            synchronized (instance) {
                if (marked[index]) {
                    System.out.println("出错了: " + index);
                    errorCount.incrementAndGet();
                }
                marked[index] = true;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("真正计算的次数: " + realCount.get());
        System.out.println("错误计算的次数: " + errorCount.get());
        System.out.println("实际结果: " + instance.index);
        System.out.println("----------------");
    }
}

重点看这里:

counter-error-img-cyclicBarrier1

我们定义了一个变量 cyclicBarrier1 并且设置了栅栏开启的线程数量是 2 个,这里 cyclicBarrier1.await(); 就是 2 个线程就绪了才会执行下一行!

这下我们再运行,看看结果:

真正计算的次数: 200000
错误计算的次数: 4
实际结果: 200000

🎵眼睛瞪得像铜铃🔔 ~~~~

我... ... 怎么还是有问题❓❓❓❓❓

错误分析

错误分析经过 cyclicBarrier1.await(); 代码后,我们有 2 个线程过来。

前提条件(很重要): 我们 假设 index 为 0, 第 2 个线程进来就一直没有执行,就卡在 index++;注意:是没执行 index++;

  1. 而第 1 个线程执行 index++, i 变成 1,并且进入 synchronized 代码块,if (marked[index]) marks[1] 也不满足条件(不记录错误),当即将执行 marked[index]=true 的时候却被 CPU 调度走了,让第 2 个线程继续执行了~

  2. 第 2 个线程执行 index++; index 是从 1 开始计算的,因为第 1 个线程已经 index++ 了!

  3. 这时候 第 2 个线程 index++ 完 index 就变成了 2 ,刚执行完又被 CPU 调度回去,继续让第 1 个线程执行,而这时候第 1 个线程继续执行:marked[index]=true,可此时 index 已经变成 2 了,本来是要执行 marked[1]=true,结果变成了 marked[2]=true

  4. 最后第 1 个线程执行完后退出 synchronized,第 2 个线程回来继续执行 进入 synchronized

  5. 这样第 2 个线程本来要 if (marked[index]) 判断的是 marked[1],结果也变成了 marked[2]

如此的话,第 2 个线程 if (marked[index]) 判断的也是 marked[2],这样 2 就已经重复了,就会打印出来 "出错了:" ,可实际并没有重复呢!和之前分析导致 错误计算的次数 统计过多的问题一样。

其实你会发现,这和 第二步 的场景是完全一样的嘛!😁

第四步:记录调整-保持每两个线程一组-升级!

第三步,我们添加了 1 个 CyclicBarrier 栅栏来解决 第二步两轮计算 index++ 的问题,发现还是不行,依然存在一个在 synchronized 代码块里,因为线程切换执行 index++ 导致 index 值变的问题!

既然有可能线程半路才 index++,这样,那么我再加 1 个 CyclicBarrier 栅栏,放在 index++ 后面 synchronized 前面,这样就避免了其中 1 个线程在 synchronized 里执行代码的时候突然让别的线程 index++

没问题,上代码:

/**
 * FindErrorNumsCounter
 *
 * @author suremotoo
 * @date 2022/11/07 19:57
 */
public class FindErrorNumsCounter implements Runnable {

    static FindErrorNumsCounter instance = new FindErrorNumsCounter();

    int index = 0;

    /**
     * 真正运行的次数,该值正好和 index 理论上计算的值是一致
     */
    static AtomicInteger realCount = new AtomicInteger();

    /**
     * 错误的次数
     */
    static AtomicInteger errorCount = new AtomicInteger();

    /**
     * 记录标记计算的数字,容量比理论计算的数值大一些
     */
    static boolean[] marked = new boolean[1000000];

    static volatile CyclicBarrier cyclicBarrier1 = new CyclicBarrier(2);
    static volatile CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);

    @Override
    public void run() {
        for (int i = 0; i < 100000; i++) {
            try {
                cyclicBarrier2.reset();
                cyclicBarrier1.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            index++;
            try {
                cyclicBarrier1.reset();
                cyclicBarrier2.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            realCount.incrementAndGet();
            synchronized (instance) {
                if (marked[index]) {
                    System.out.println("出错了: " + index);
                    errorCount.incrementAndGet();
                }
                marked[index] = true;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("真正计算的次数: " + realCount.get());
        System.out.println("错误计算的次数: " + errorCount.get());
        System.out.println("实际结果: " + instance.index);
        System.out.println("----------------");
    }
}

我们在 index++ 前后都添加了 CyclicBarrier 栅栏!这样可以确保让两个线程都会执行 index++

ok,我们运行一下代码。

出错了: 199920
出错了: 199922
出错了: 199924
出错了: 199926
出错了: 199928
出错了: 199930
出错了: 199932
出错了: 199934
出错了: 199936
出错了: 199938
出错了: 199940
出错了: 199942
出错了: 199944
出错了: 199946
出错了: 199948
出错了: 199950
出错了: 199952
出错了: 199954
出错了: 199956
出错了: 199958
出错了: 199960
出错了: 199962
出错了: 199964
出错了: 199966
出错了: 199968
出错了: 199970
出错了: 199972
出错了: 199974
出错了: 199976
出错了: 199978
出错了: 199980
出错了: 199982
出错了: 199984
出错了: 199986
出错了: 199988
出错了: 199990
出错了: 199992
出错了: 199994
出错了: 199996
出错了: 199998
真正计算的次数: 200000
错误计算的次数: 100000
实际结果: 199998

俏丽马!怎么还越来越离谱了❓❓❓❓❓❓而且出错的还都是偶数。

counter-error-img-cyclicBarrier2

注意:

注意: 由于在 index++ 前后都加入了栅栏,所以 2 个线程都会执行 index++ 的。

错误分析

前提条件:index 为 0

  1. 假如现在 2 个线程是正常按照逻辑执行, 第 1 个线程 index++ index 变为 1

  2. 因为第 1 个线程的结果,第 2 个线程 index++ index 就变为 2

  3. 然后放开栅栏!两个都去执行 synchronized 代码,要进行锁竞争,由于 synchronized 的线程可见特性, 1 个线程可以看到之前的线程干了什么事情所以无论哪个线程抢到锁去执行,它们用的 index 值都是 2

  4. 第 1 个线程 if (marked[index]) 判断 marked[2] 不满足条件,则执行 marked[2]=true;

  5. 第 2 个线程 if (marked[index]) 判断 marked[2] 满足条件,则打印 出错了:

这样的情况对吗?肯定不对啊,我们的目标、宗旨是要找出重复计算的,现在的 index 有重复计算吗?并没有! 1 个线程将 index 从 0 变为 1,另 1 个线程从 1 变为 2,没问题呀,但我们现在这种代码就会多打印出来 出错了:

🤨🤨你会发现,这还是和之前 第三步、第四步 类似呀,都是正常的逻辑情况下,因为其中 1 个线程将 index 变更后,导致另 1 个线程使用变更后的 index ,从而导致重复打印的问题!

第五步:记录调整-调整重复计算的判断处理!

基于上面 第四步,我们分析了正常情况多统计了,现在想办法要去掉,那么,同时我们也要会想一下错误的情况,应该是什么样的。

提示

提示:

因为 index 从 0 开始,index++ 对于 0 是不会漏算的,我们就把设置 marked[0]=true;

正常的情况,0→1→2:

线程index 值marked 标记结果备注
线程 11false并没有执行 marked[1] = true; 标记,
因为 线程 2 的将 index 变更为 2 了,所以 线程 1 实际执行的是 marked[2] = true;
所以 index 为 1 在 marked 里默认的值为 false
线程22true实际执行的是 marked[2] = true;

或者

线程index 值marked 标记结果备注
线程 21false并没有执行 marked[1] = true; 标记,
因为 线程 1 的将 index 变更为 2 了,所以 线程 2 实际执行的是 marked[2] = true;
所以 index 为 1 在 marked 里默认的值为 false
线程12true实际执行的是 marked[2] = true;

⚠️错误的情况,0→1→1:

线程index 值marked 标记结果备注
第 1 个线程1true实际执行的是 marked[1] = true;
第 2 个线程1true实际执行的是 marked[1] = true;

从上面的表格,我们可以看到,因为 2 个线程去执行 index++, 所以 正常的情况,总会像 0→1→2 这样的规律,而且 0→1→2 中间的那个 1 是会略过 sychronized 的代码处理的,2 是正确的,但我们却多打印出来来了,不应该打印它,所以 marked[0] 总是 true,marked[1] 是 false,marked[2] 是 true以此类推,后面都是 true、false 交替的结果。

错误的情况总会像 0→1→1 这样的规律,所以 marked[0] 总是 true,marked[1] 也是 true,这样呢,我就可以得出一个规律,只要出现了 marked[index] 和 marked[index-1] 都为 true 的情况,index 才是真正的重复计算,这种情况下才是需要将信息打印出来!

这样我们就知道调整哪里的代码了,就是 synchronized 代码块中 if (marked[index]) 这句代码,我们调整为 if (marked[index] && marked[index - 1])

run() 完整代码如下:

@Override
public void run() {
   // 0 的时候永远不会重复计算,手动标记为 true
    marked[0]=true;
    for (int i = 0; i < 100000; i++) {
        try {
            cyclicBarrier2.reset();
            cyclicBarrier1.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
        index++;
        try {
            cyclicBarrier1.reset();
            cyclicBarrier2.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
        realCount.incrementAndGet();
        synchronized (instance) {
          // 判断条件,如果上一个和当前这个都为 true,则说明 index 漏算
            if (marked[index] && marked[index - 1]) {
                System.out.println("出错了: " + index);
                errorCount.incrementAndGet();
            }
            marked[index] = true;
        }
    }
}

这样我们再次运行

真正计算的次数: 200000
错误计算的次数: 0
实际结果: 200000

ok,没问题了!emm,运行好多次,只是一直没有失败的,这是因为我们调整多次代码后,线程碰撞的概率变小了,没关系!我们微调一下再测试就可以!

第六步:调整 main 方法,运行代码打印!

只要出现错误,errorCount 肯定能统计到值,我们就来个循环,直到遇到漏算的错误情况。

完整代码:

/**
 * FindErrorNumsCounter 找出并发情况下哪些计数被漏算
 *
 * @author suremotoo
 * @date 2022/11/07 15:57
 */
public class FindErrorNumsCounter implements Runnable {

    static FindErrorNumsCounter instance = new FindErrorNumsCounter();

    int index = 0;

    /**
     * 真正运行的次数,该值正好和 index 理论上计算的值是一致
     */
    static AtomicInteger realCount = new AtomicInteger();

    /**
     * 错误的次数
     */
    static AtomicInteger errorCount = new AtomicInteger();

    /**
     * 记录标记计算的数字,容量比理论计算的数值大一些
     */
    static boolean[] marked = new boolean[1000000];

    static volatile CyclicBarrier cyclicBarrier1 = new CyclicBarrier(2);
    static volatile CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);

    @Override
    public void run() {
        marked[0] = true;
        for (int i = 0; i < 100000; i++) {
            try {
                cyclicBarrier2.reset();
                cyclicBarrier1.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            index++;
            try {
                cyclicBarrier1.reset();
                cyclicBarrier2.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            realCount.incrementAndGet();
            synchronized (instance) {
                if (marked[index] && marked[index - 1]) {
                    System.out.println("出错了: " + index);
                    errorCount.incrementAndGet();
                }
                marked[index] = true;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        while (errorCount.get() == 0) {
            realCount.set(0);
            errorCount.set(0);
            instance.index = 0;
            marked = new boolean[1000000];
            Thread t1 = new Thread(instance);
            Thread t2 = new Thread(instance);

            t1.start();
            t2.start();
            t1.join();
            t2.join();

            System.out.println("真正计算的次数: " + realCount.get());
            System.out.println("错误计算的次数: " + errorCount.get());
            System.out.println("实际结果: " + instance.index);

            System.out.println("----------------");

        }
    }
}

结果:

真正计算的次数: 200000
错误计算的次数: 0
实际结果: 200000
----------------
真正计算的次数: 200000
错误计算的次数: 0
实际结果: 200000
----------------
出错了: 77953
真正计算的次数: 200000
错误计算的次数: 1
实际结果: 199999
----------------

77953 重复计算了,错误 1 次,实际结果 199999,真正的计算为:200000,这次统计对咯~

可以多次运行验证,发现没问题~🎉🎉