线程同步与数据一致性
线程同步的实现方式
线程同步是指让多个线程按照顺序访问同一个共享资源,避免因并发冲突导致的问题。主要有以下几种实现方式:
synchronized关键字
synchronized是Java中最基本的线程同步机制,可以修饰代码块或方法,保证同一时间只有一个线程访问该代码块或方法,其他线程需要等待锁的释放。
public class SynchronizedExample {
private int counter = 0;
// 修饰方法
public synchronized void incrementMethod() {
counter++;
}
// 修饰代码块
public void incrementBlock() {
synchronized (this) {
counter++;
}
}
// 静态同步方法
public static synchronized void staticMethod() {
System.out.println("静态同步方法");
}
}
synchronized的特点:
- 互斥性: 同一时间只有一个线程能获得锁
- 可见性: 锁释放时会将修改刷新到主内存
- 可重入: 同一线程可以多次获取同一个锁
- 自动释放: 代码块执行完自动释放锁
ReentrantLock
ReentrantLock是一个可重入的互斥锁,与synchronized功能类似,但更灵活,支持公平锁、可中断锁、多个条件变量等高级功能。
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private int sharedData = 0;
public void updateData() {
lock.lock();
try {
sharedData++;
System.out.println("数据更新: " + sharedData);
} finally {
lock.unlock(); // 必须在finally中释放锁
}
}
// 尝试获取锁
public boolean tryUpdate() {
if (lock.tryLock()) {
try {
sharedData++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
ReentrantLock的优势:
- 可以设置公平/非公平策略
- 支持可中断的锁获取
- 支持尝试非阻塞获取锁
- 可以绑定多个条件变量(Condition)
Semaphore信号量
Semaphore允许多个线程同时访问共享资源,但限制访问的线程数量。可以用于控制并发访问的线程数,避免系统资源被过度占用。
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
// 允许3个线程同时访问
private final Semaphore semaphore = new Semaphore(3);
public void accessResource(String threadName) throws InterruptedException {
semaphore.acquire(); // 获取许可
try {
System.out.println(threadName + " 开始访问资源");
Thread.sleep(2000); // 模拟资源访问
System.out.println(threadName + " 访问完成");
} finally {
semaphore.release(); // 释放许可
}
}
public static void main(String[] args) {
SemaphoreExample example = new SemaphoreExample();
for (int i = 1; i <= 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
example.accessResource("任务-" + taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
应用场景:
- 限流: 控制同时访问某个资源的线程数
- 连接池: 限制数据库连接数
- 资源管理: 管理有限的系统资源
CountDownLatch倒计时门闩
CountDownLatch允许一个或多个线程等待其他线程执行完毕后再执行,用于线程之间的协调和通信。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
// 创建5个工作线程
for (int i = 1; i <= workerCount; i++) {
final int workerId = i;
new Thread(() -> {
try {
System.out.println("工作线程" + workerId + " 开始执行");
Thread.sleep(1000 * workerId);
System.out.println("工作线程" + workerId + " 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 完成后计数减1
}
}).start();
}
System.out.println("主线程等待所有工作线程完成");
latch.await(); // 等待计数归零
System.out.println("所有工作线程已完成,主线程继续执行");
}
}
特点:
- 计数器只能使用一次,不能重置
- 适合一次性的同步场景
- 常用于主线程等待多个子线程完成
CyclicBarrier循环屏障
CyclicBarrier允许多个线程在一个栅栏处等待,直到所有线程都到达栅栏位置后,才会继续执行。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
final int threadCount = 4;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程已到达屏障点,开始下一阶段");
});
for (int i = 1; i <= threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程" + threadId + " 准备中");
Thread.sleep(1000 * threadId);
System.out.println("线程" + threadId + " 到达屏障点");
barrier.await(); // 等待其他线程
System.out.println("线程" + threadId + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
与CountDownLatch的区别:
- CyclicBarrier可以重复使用,CountDownLatch不能
- CyclicBarrier是线程之间相互等待,CountDownLatch是一个或多个线程等待其他线程
- CyclicBarrier可以设置屏障点动作
Phaser分阶段协调器
Phaser是一种更灵活的多线程同步工具,支持动态注册和注销参与者,可以控制各个参与者的到达和离开。
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3个参与者
for (int i = 1; i <= 3; i++) {
final int threadId = i;
new Thread(() -> {
System.out.println("线程" + threadId + " 第1阶段开始");
phaser.arriveAndAwaitAdvance(); // 等待第1阶段完成
System.out.println("线程" + threadId + " 第2阶段开始");
phaser.arriveAndAwaitAdvance(); // 等待第2阶段完成
System.out.println("线程" + threadId + " 第3阶段开始");
phaser.arriveAndAwaitAdvance(); // 等待第3阶段完成
System.out.println("线程" + threadId + " 全部完成");
}).start();
}
}
}
Phaser的优势:
- 支持动态调整参与者数量
- 可以监控每个阶段的完成情况
- 比CyclicBarrier更灵活
保证i++操作的正确性
i++的线程安全问题
在多线程环境下,i++操作不是原子的,它包含三个步骤:
- 读取i的值
- 将i的值加1
- 将新值写回i
要保证多线程下i++的正确性,需要考虑:
- 可见性: 一个线程修改后,其他线程能看到
- 原子性: 操作不可被中断
- 有序性: 操作按预期顺序执行
解决方案
使用AtomicInteger
AtomicInteger通过CAS操作保证原子性:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerExample {
private static AtomicInteger counter = new AtomicInteger(0);
public static void increment() {
counter.incrementAndGet(); // 原子操作
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果: " + counter.get()); // 输出: 10000
}
}
使用synchronized
synchronized关键字可以保证原子性、可见性和有序性:
public class SynchronizedCounter {
private static int counter = 0;
public static synchronized void increment() {
counter++;
}
// 或者使用同步代码块
public static void incrementBlock() {
synchronized (SynchronizedCounter.class) {
counter++;
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果: " + counter); // 输出: 10000
}
}
使用ReentrantLock
ReentrantLock提供了显式的锁控制:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockCounter {
private int counter = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
counter++;
} finally {
lock.unlock();
}
}
public int getCounter() {
return counter;
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockCounter counter = new ReentrantLockCounter();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果: " + counter.getCounter()); // 输出: 10000
}
}
为什么volatile不能保证i++的正确性
volatile关键字只能保证可见性和有序性,不能保证原子性。
public class VolatileCounter {
private volatile static int counter = 0;
public static void increment() {
counter++; // 不是原子操作
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果: " + counter); // 结果小于10000
}
}
虽然volatile保证了可见性,但counter++包含读-改-写三个步骤,不是原子操作,多个线程可能读到相同的值,导致最终结果小于期望值。
最佳实践
- 简单计数场景: 优先使用AtomicInteger,性能好且代码简洁
- 需要更复杂的同步逻辑: 使用synchronized或ReentrantLock
- 只需要可见性保证: 使用volatile
- 大量读少量写: 考虑使用ReadWriteLock