为什么在10个Java线程中增加一个数字不会产生10的值?
我不明白'a'的值是0,为什么'a'不是10,那段代码的运行过程是什么,有必要从Java Memory Model分析吗?这是我的测试代码
package com.study.concurrent.demo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
//@SpringBootTest
@Slf4j
class DemoApplicationTests {
int a = 0;
int b = 0;
@Test
void contextLoads() {
ExecutorService executorService = Executors.newFixedThreadPool(1);
// final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
// try {
// semaphore.acquire();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
add();
bdd();
// log.info("a: {},concurrent_id: {}",a,Thread.currentThread().getName());
// semaphore.release();
});
}
executorService.shutdown();
log.info("The final value of a?{}",a);
log.info("The final value of b?{}",b);
}
public void add(){
a++;
}
public void bdd(){
b++;
}
}
回答
两个原因:
-
您不是在等待线程完成,您只是在关闭线程池(即:导致线程池拒绝新任务但继续处理现有任务)。
-
您没有在线程池中的写入和主线程中的读取之间建立发生之前的关系。
您可以通过(除其他方法外)执行此操作:
- 读取前获取信号量
a; - 通过使用
submit而不是为提交的每个任务execute获取一个Future<?>,并Future.get()在所有返回的期货上调用该方法。它在Javadoc 中记录ExecutorService,这建立了一个发生之前。
- 读取前获取信号量
第一点是a出现为零的“主要原因” :如果我在本地运行它,并等待线程池终止,则a结果为 10。
然而,仅仅因为它是 10 并不意味着代码可以正常工作而不注意第二点:您需要应用 Java 内存模型来保证正确运行。
回答
问题
-
可见性 - 多个线程正在访问同一个变量,并且代码没有任何可见性保证
-
volatile可以帮助保证可见性 -
原子性 - 多个线程通过
a++或b++操作进行更新。这些不是原子操作。这主要是一组操作1. fetch a. 2. increment a. 3. update a。上下文切换可能发生在这些状态中的任何一个,并导致不正确的值。 -
所以
volatile仅凭可见性不足以保证正确性 -
使用
AtomicInteger的增值业务的保证原子 -
AtomicXXX可以保证单个操作的原子性 -
如果有必要来增加两个
a和b在一起,则需要某种形式的同步 -
通信 - 这不是主线程和执行器任务线程之间的通信来传达完成事件
-
executorService.shutdown()不会确保这种沟通 -
Latch可用于此通信 -
或者像安迪提到的那样,
Future可以使用
带有AtomicInteger和的示例代码Latch
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class DemoApplicationTests {
final AtomicInteger a = new AtomicInteger(0);
final AtomicInteger b = new AtomicInteger(0);
void contextLoads() throws Exception {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
add();
bdd();
latch.countDown();
});
}
latch.await();
executorService.shutdown();
System.out.println("The final value of a?" + a);
System.out.println("The final value of b?" + b);
}
public void add() {
a.incrementAndGet();
}
public void bdd() {
b.incrementAndGet();
}
public static void main(String[] args) throws Exception {
new DemoApplicationTests().contextLoads();
}
}
用不正确的解决方案threadpool size > 1,并CompletableFuture由于在比赛条件a++,b++。
以下可以(我的知识有限,无法以任何方式确认)是线程池大小的完全合法代码1(从Eugene 的答案中复制)
但是当以线程池大小 > 执行相同的代码时1,将导致竞争条件。(同样的目的是按原样讨论多线程和数据可见性问题,而不是将 Eugene 的答案预测为不正确。Eugene 的答案是在线程池中的单线程上下文中,对于单线程线程池场景可能完全有效)
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DemoApplicationTests {
int a = 0;
int b = 0;
void contextLoads() throws Exception {
final int count = 10000;
ExecutorService executorService = Executors.newFixedThreadPool(100);
List<Runnable> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
Runnable r = () -> {
add();
bdd();
};
list.add(r);
}
CompletableFuture<?>[] futures = list.stream()
.map(task -> CompletableFuture.runAsync(task, executorService))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
executorService.shutdown();
System.out.println("The final value of a: " + a);
System.out.println("The final value of b?" + b);
}
public void add() {
a++;
}
public void bdd() {
b++;
}
public static void main(String[] args) throws Exception {
new DemoApplicationTests().contextLoads();
}
}
谢谢@Basil Bourque 修正语法错误