sayi.github.com icon indicating copy to clipboard operation
sayi.github.com copied to clipboard

并发(三)基础之底层技术篇

Open Sayi opened this issue 7 years ago • 0 comments

本文主要针对一些基础算法和底层类(LockSupport)作一个介绍,很多并发的高级特性都是依赖于这些技术。

位操作

位操作作为一个编程技巧或者用二进制思维解决算术问题在许多源码中都出现过,我们不妨回顾一下Java中的位操作。

移位操作

  • 左移<<:符号位不移动,低位补0,高位丢弃。相当于乘以2的N次方
  • 右移>>:符号位不移动,低位丢弃,高位补0。相当于除以2的N次方
  • 无符号右移>>>:最高一位为符号位,也会跟着右移,低位丢弃,高位补0。

位运算

  • 与&:同时为1,才为1,否则为0。
  • 或|:有一个为1则为1,否则为0。
  • 非~:1为0,0为1,相反。
  • 抑或^:相同为1,相反为0。

示例

基础知识读起来很简单,具体拿来用的时候还是需要掌握一些技巧,我们以一个整型变量为例,无符号的高16位和无符号的低16位分别存储两个整数。

所以第一个问题就是怎么通过位操作获得高16位和低16位的值?可以通过无符号右移16位获得高16位的值,获得低16位就要想办法把高16位变为0,我们可能通过与操作,掩码是0000 0000 0000 0000 1111 1111 1111 1111。

int a = 10;
// 无符号的高16位值
System.out.println(a >>> 16);

// 1111 1111 1111 1111
int mask = (1 << 16) - 1;
// 无符号的低16位值
System.out.println(a & mask);

这两个无符号整数该怎么加一呢?对于低16位直接+1,上线是2^16-1=65535,对于高16位,+1其实就是增加2^16=65536。

// 高位加1
System.out.println((a + (1 << 16)) >>> 16);
// 低位加1
System.out.println((a + 1 ) & mask);

Atomic*、CAS

JDK提供了一些原子类,它让我们避免使用synchronized关键字来解决同步问题,我们以AtomicInteger源码为例来介绍其中的核心方法。

我们先来看看声明的一些属性:

public class AtomicInteger extends Number implements java.io.Serializable {

  // setup to use Unsafe.compareAndSwapInt for updates
  private static final Unsafe unsafe = Unsafe.getUnsafe();
  private static final long valueOffset;

  static {
    try {
      valueOffset = unsafe.objectFieldOffset
        (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
  }

  private volatile int value;

sun.misc.Unsafe通过内存地址偏移量来操作数据,关于Unsafe参见国外的这一篇文章:

Java Magic. Part 4: sun.misc.Unsafe:http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/

volatile int value 正是存储这个整型变量的值,我们知道volatile声明的变量的读和写都是原子的,并且是可见的,所以不会存在内存一致性问题,我们来看看没有任何多余动作的读和写:

public final int get() {
  return value;
}

public final void set(int newValue) {
  value = newValue;
}

AtomicInteger类的作用就是除了读写之外,提供更多的关于整型的原子操作,比如累加操作:

 /**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int expect, int update) {
  return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndIncrement() {
  return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
  return unsafe.getAndAddInt(this, valueOffset, -1);
}
public final int getAndAdd(int delta) {
  return unsafe.getAndAddInt(this, valueOffset, delta);
}
public final int incrementAndGet() {
  return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndSet(int newValue) {
  return unsafe.getAndSetInt(this, valueOffset, newValue);
}
public final int decrementAndGet() {
  return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}

我们注意到其中有一段特别关键的代码:unsafe.compareAndSwapInt,类似的还有unsafe.compareAndSwapObject等,这就不得不说一种很重要的实现无锁同步的机制:Compare And Swap,即CAS。

CAS是当更新某个V的值时,会比较是否和期望值相同,当且仅当它们相同则更新,否则返回V的最新值,这就有效的防止了在我取得期望值后,V的值被其它线程修改,可以理解成为乐观锁,所有操作都可以执行,但是当更新时如果发现被其它线程更新过,则不允许执行更新。因为1.8用了Unsafe这个类,我们不妨来看看1.7中AtomicInteger的getAndIncrement方法:

public final int getAndIncrement() {
  for (;;) {
    int current = get();
    int next = current + 1;
    if (compareAndSet(current, next))
      return current;
  }
}
public final boolean compareAndSet(int expect, int update) {
  return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

for循环给出了一个使用CAS很好的示例,不断比较期望值与获取的值是否相同,不相同,则重新获取进而操作。

在使用CAS时,会带来新的问题:ABA问题,即V的值从A修改成B,再修改成A,所以CAS不会察觉到这个变量被修改过,虽然大多数情况下这都不会带来什么问题,但是我们也可以通过记录这个变量的修改次数来解决这个问题。

LockSupport

我们已经知道,Object.wait和synchronized都可以产生阻塞,这些实现都是JVM层面,JDK提供了基础线程阻塞原语LockSupport,它更底层并且能创造出更高级的同步器和锁,接下来我们就来看看LockSupport的源码。


public class LockSupport {
  private LockSupport() {} // Cannot be instantiated.

  private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    UNSAFE.putObject(t, parkBlockerOffset, arg);
  }

  public static void unpark(Thread thread) {
    if (thread != null)
      UNSAFE.unpark(thread);
  }

  public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
  }

  public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
      Thread t = Thread.currentThread();
      setBlocker(t, blocker);
      UNSAFE.park(false, nanos);
      setBlocker(t, null);
    }
  }

  public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(true, deadline);
    setBlocker(t, null);
  }

  public static Object getBlocker(Thread t) {
    if (t == null)
      throw new NullPointerException();
    return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
  }

  public static void park() {
    UNSAFE.park(false, 0L);
  }

  public static void parkNanos(long nanos) {
    if (nanos > 0)
      UNSAFE.park(false, nanos);
  }

  public static void parkUntil(long deadline) {
    UNSAFE.park(true, deadline);
  }

  // Hotspot implementation via intrinsics API
  private static final sun.misc.Unsafe UNSAFE;
  private static final long parkBlockerOffset;
  private static final long SEED;
  private static final long PROBE;
  private static final long SECONDARY;
  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class<?> tk = Thread.class;
      parkBlockerOffset = UNSAFE.objectFieldOffset
        (tk.getDeclaredField("parkBlocker"));
      SEED = UNSAFE.objectFieldOffset
        (tk.getDeclaredField("threadLocalRandomSeed"));
      PROBE = UNSAFE.objectFieldOffset
        (tk.getDeclaredField("threadLocalRandomProbe"));
      SECONDARY = UNSAFE.objectFieldOffset
        (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
    } catch (Exception ex) { throw new Error(ex); }
  }
}

从源码中我们看到很重要的几个方法:

  • park(Object blocker) 提供了阻塞当前线程的操作
  • unpark(Thread thread) 提供了唤醒某个线程的操作

park和unpark需要配合使用,注意到park()不建议使用,因为它没有提供线程阻塞的同步对象参数,通过这个参数我们能更清晰的知道当前阻塞的对象。接下来我们不妨看一下源码注释中给的例子先进先出锁的例子:

class FIFOMutex {
   private final AtomicBoolean locked = new AtomicBoolean(false);
   private final Queue<Thread> waiters
     = new ConcurrentLinkedQueue<Thread>();

   public void lock() {
     boolean wasInterrupted = false;
     Thread current = Thread.currentThread();
     waiters.add(current);

     // Block while not first in queue or cannot acquire lock
     while (waiters.peek() != current ||
            !locked.compareAndSet(false, true)) {
       LockSupport.park(this);
       if (Thread.interrupted()) // ignore interrupts while waiting
         wasInterrupted = true;
     }

     waiters.remove();
     if (wasInterrupted)          // reassert interrupt status on exit
       current.interrupt();
   }

   public void unlock() {
     locked.set(false);
     LockSupport.unpark(waiters.peek());
   }
 }

我们必须注意,LockSupport.park(obj)方法也必须在守护循环代码块中,因为也可能会虚假唤醒,就和Object.wait一样。

Sayi avatar Oct 15 '18 10:10 Sayi