Java中如何使用原子组件和同步组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
原子组件的实现原理CAS
应用场景
可用来实现变量、状态在多线程下的原子性操作
可用于实现同步锁(ReentrantLock)
原子组件
原子组件的原子性操作是靠使用cas来自旋操作volatile变量实现的
volatile的类型变量保证变量被修改时,其他线程都能看到最新的值
cas则保证value的修改操作是原子性的,不会被中断
基本类型原子类
AtomicBoolean //布尔类型 AtomicInteger //正整型数类型 AtomicLong //长整型类型
使用示例
public static void main(String[] args) throws Exception { AtomicBoolean atomicBoolean = new AtomicBoolean(false); //异步线程修改atomicBoolean CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{ try { Thread.sleep(1000); //保证异步线程是在主线程之后修改atomicBoolean为false atomicBoolean.set(false); }catch (Exception e){ throw new RuntimeException(e); } }); atomicBoolean.set(true); future.join(); System.out.println("boolean value is:"+atomicBoolean.get()); } ---------------输出结果------------------ boolean value is:false引用类原子类
AtomicReference //加时间戳版本的引用类原子类 AtomicStampedReference //相当于AtomicStampedReference,AtomicMarkableReference关心的是 //变量是否还是原来变量,中间被修改过也无所谓 AtomicMarkableReference
AtomicReference的源码如下,它内部定义了一个volatile V value,并借助VarHandle(具体子类是FieldInstanceReadWrite)实现原子操作,MethodHandles会帮忙计算value在类的偏移位置,最后在VarHandle调用Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)方法原子修改对象的属性
public class AtomicReference<V> implements java.io.Serializable { private static final long serialVersionUID = -1848883965231344442L; private static final VarHandle VALUE; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } private volatile V value; ....ABA问题
线程X准备将变量的值从A改为B,然而这期间线程Y将变量的值从A改为C,然后再改为A;最后线程X检测变量值是A,并置换为B。但实际上,A已经不再是原来的A了
解决方法,是把变量定为唯一类型。值可以加上版本号,或者时间戳。如加上版本号,线程Y的修改变为A1->B2->A3,此时线程X再更新则可以判断出A1不等于A3
AtomicStampedReference的实现和AtomicReference差不多,不过它原子修改的变量是volatile Pair
pair;,Pair是其内部类。AtomicStampedReference可以用来解决ABA问题
public class AtomicStampedReference<V> { private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } private volatile Pair<V> pair;public class Main { public static void main(String[] args) throws Exception { Test old = new Test("hello"), newTest = new Test("world"); AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1); reference.compareAndSet(old, newTest,1,2); System.out.println("对象:"+reference.getReference().name+";版本号:"+reference.getStamp()); } } class Test{ Test(String name){ this.name = name; } public String name; } ---------------输出结果------------------ 对象:world;版本号:2数组原子类
AtomicIntegerArray //整型数组 AtomicLongArray //长整型数组 AtomicReferenceArray //引用类型数组
//元素默认初始化为0 AtomicIntegerArray array = new AtomicIntegerArray(2); // 下标为0的元素,期待值是0,更新值是1 array.compareAndSet(0,0,1); System.out.println(array.get(0)); ---------------输出结果------------------ 1
属性原子类
AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater
public class Main { public static void main(String[] args) { AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name"); Test test = new Test("hello world"); fieldUpdater.compareAndSet(test,"hello world","siting"); System.out.println(fieldUpdater.get(test)); System.out.println(test.name); } } class Test{ Test(String name){ this.name = name; } public volatile String name; } ---------------输出结果------------------ siting siting累加器
Striped64 LongAccumulator LongAdder //accumulatorFunction:运算规则,identity:初始值 public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
public static void main(String[] args) throws Exception { LongAccumulator accumulator = new LongAccumulator(Long::sum, 0); for(int i=0;i<100000;i++){ CompletableFuture.runAsync(() -> accumulator.accumulate(1)); } Thread.sleep(1000); //等待全部CompletableFuture线程执行完成,再获取 System.out.println(accumulator.get()); } ---------------输出结果------------------ 100000同步组件的实现原理
java的多数同步组件会在内部维护一个状态值,和原子组件一样,修改状态值时一般也是通过cas来实现。而状态修改的维护工作被Doug Lea抽象出AbstractQueuedSynchronizer(AQS)来实现
AQS的原理可以看下之前写的一篇文章:详解锁原理,synchronized、volatile+cas底层实现[2]
同步组件
ReentrantLock、ReentrantReadWriteLock
ReentrantLock lock = new ReentrantLock(); if(lock.tryLock()){ //业务逻辑 lock.unlock(); }public static void main(String[] args) throws Exception { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); if(lock.readLock().tryLock()){ //读锁 //业务逻辑 lock.readLock().unlock(); } if(lock.writeLock().tryLock()){ //写锁 //业务逻辑 lock.writeLock().unlock(); } }Semaphore实现原理和使用场景
public static void main(String[] args) throws Exception { Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 3; i++) CompletableFuture.runAsync(() -> { try { System.out.println(Thread.currentThread().toString() + " start "); if(semaphore.tryAcquire(1)){ Thread.sleep(1000); semaphore.release(1); System.out.println(Thread.currentThread().toString() + " 无阻塞结束 "); }else { System.out.println(Thread.currentThread().toString() + " 被阻塞结束 "); } } catch (Exception e) { throw new RuntimeException(e); } }); //保证CompletableFuture 线程被执行,主线程再结束 Thread.sleep(2000); } ---------------输出结果------------------ Thread[ForkJoinPool.commonPool-worker-19,5,main] start Thread[ForkJoinPool.commonPool-worker-5,5,main] start Thread[ForkJoinPool.commonPool-worker-23,5,main] start Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞结束 Thread[ForkJoinPool.commonPool-worker-5,5,main] 无阻塞结束 Thread[ForkJoinPool.commonPool-worker-19,5,main] 无阻塞结束可以看出三个线程,因为信号量设定为2,第三个线程是无法获取信息成功的,会打印阻塞结束
CountDownLatch实现原理和使用场景
public static void main(String[] args) throws Exception { CountDownLatch count = new CountDownLatch(2); for (int i = 0; i < 2; i++) CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); System.out.println(" CompletableFuture over "); count.countDown(); } catch (Exception e) { throw new RuntimeException(e); } }); //等待CompletableFuture线程的完成 count.await(); System.out.println(" main over "); } ---------------输出结果------------------ CompletableFuture over CompletableFuture over main overCyclicBarrier实现原理和使用场景
public static void main(String[] args) throws Exception { CyclicBarrier barrier = new CyclicBarrier(2); CompletableFuture.runAsync(()->{ try { System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis()); barrier.await(); //需要等待main线程也执行到await状态才能继续执行 System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis()); }catch (Exception e){ throw new RuntimeException(e); } }); Thread.sleep(1000); //和CompletableFuture线程相互等待 barrier.await(); System.out.println("main run over!"); } ---------------输出结果------------------ CompletableFuture run start-1609822588881 main run over! CompletableFuture run over-1609822589880StampedLock
//获取读锁,自旋获取,返回一个戳值 public long readLock() //尝试加读锁,不成功返回0 public long tryReadLock() //解锁 public void unlockRead(long stamp) //获取写锁,自旋获取,返回一个戳值 public long writeLock() //尝试加写锁,不成功返回0 public long tryWriteLock() //解锁 public void unlockWrite(long stamp) //尝试乐观读读取一个时间戳,并配合validate方法校验时间戳的有效性 public long tryOptimisticRead() //验证stamp是否有效 public boolean validate(long stamp)
public static void main(String[] args) throws Exception { StampedLock stampedLock = new StampedLock(); long stamp = stampedLock.tryOptimisticRead(); //判断版本号是否生效 if (!stampedLock.validate(stamp)) { //获取读锁,会空转 stamp = stampedLock.readLock(); long writeStamp = stampedLock.tryConvertToWriteLock(stamp); if (writeStamp != 0) { //成功转为写锁 //fixme 业务操作 stampedLock.unlockWrite(writeStamp); } else { stampedLock.unlockRead(stamp); //尝试获取写读 stamp = stampedLock.tryWriteLock(); if (stamp != 0) { //fixme 业务操作 stampedLock.unlockWrite(writeStamp); } } } }关于Java中如何使用原子组件和同步组件问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注天达云行业资讯频道了解更多相关知识。