Java 锁与线程的那些事
in Java 进击 with 0 comment

Java 锁与线程的那些事

in Java 进击 with 0 comment

Java 锁与线程的那些事

一、引言

引言:“操作系统的线程状态和 java 的线程状态有什么关系?” 这是校招时被问到的一个问题。当时只顾着看博文、面经等零散的资料,没有形成系统的知识体系,一时语塞,答的不是很对。在网上也没找到足够细致的讲解博文,于是整理出了这篇内容。

Java 的线程状态牵扯到了同步语义,要探讨 Java 的线程状态的,必不可免要回顾其锁机制。因此本文的主要分为两大块:一是 Synchronized 源码粗析,分析了各类锁的进入、释放、升级过程,并大致说明了 monitor 原理;二是介绍了线程的实现方式和 Java 线程状态转换的部分细节。

P.S. 本文内容较啰嗦,时间不充裕的同学可以直接看 2.6 小结3.3 小结

二、Synchronized 锁

Java 采用 synchronized 关键字、以互斥同步的方式的解决线程安全问题,那么什么是线程安全呢?这里引用《Java 并发编程实战》作者 Brian Goetz 给出的定义:

当多个线程同时访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那就称这个对象是线程安全的。 —— Brian Goetz

2.1 Synchronized 的使用

先写过个 demo,大致过一下 synchronized 的使用,包含同步代码块、实例方法和静态方法。

  public synchronized void test1(){
  }

  public void test2(){
    synchronized(new Test()){
    }
  }

  public static synchronized void test3(){
  }

反编译可查看字节码:

  public synchronized void test1();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED    // here

  public void test2();
    descriptor: ()
    flags: ACC_PUBLIC
    Code:
      stack=2, locals=3, args_size=1
         0: new           #2                  // class com/easy/helloworld/Test
         3: dup
         4: invokespecial #3                  // Method "<init>":()V
         7: dup
         8: astore_1
         9: monitorenter                   // here
        10: aload_1
        11: monitorexit                    // here
        12: goto          20
        15: astore_2
        16: aload_1
        17: monitorexit                    // here
        18: aload_2
        19: athrow
        20: return

  public static synchronized void test3();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED   // here

可以观察到:

2.2 Moniterenter、Moniterexit

monitorenter 和 monitorexit 这两个 jvm 指令,主要是基于 Mark WordObject monitor 来实现的。

在 JVM 中,对象在内存中分为三块区域:

在 JDK 1.6 之前,synchronized 只有传统的锁机制,直接关联到 monitor 对象,存在性能上的瓶颈。在 JDK 1.6 后,为了提高锁的获取与释放效率,JVM 引入了两种锁机制:偏向锁和轻量级锁。它们的引入是为了解决在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。这几种锁的实现和转换正是依靠对象头中的 Mark Word

2.3 偏向锁

引入偏向锁的目的:在没有多线程竞争的情况下,尽量减少不必要的轻量级锁的执行。轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只依赖一次 CAS 原子指令。但在多线程竞争时,需要进行偏向锁撤销步骤,因此其撤销的开销必须小于节省下来的 CAS 开销,否则偏向锁并不能带来收益。JDK 1.6 中默认开启偏向锁,可以通过 - XX:-UseBiasedLocking 来禁用偏向锁。

2.3.1 进入偏向锁

关于 HotSpot 虚拟机中获取锁的入口,网上主要有两种看法:一为 interpreterRuntime.cpp#monitorenter#1608;二为 bytecodeInterpreter.cpp#1816。在 HotSpot 的中,有两处地方对 monitorenter 指令进行解析:一个是 bytecodeInterpreter.cpp#1816 ,另一个在 templateTable_x86_64.cpp#3667。其中,bytecodeInterpreter 是 JVM 中的字节码解释器, templateInterpreter 为模板解释器。HotSpot 对运行效率有着极其执着的追求,显然会倾向于用模板解释器来实现。R 大的读书笔记中有说明,HotSpot 中只用到了模板解释器,并没有用到字节码解释器。因此,本文认为 montorenter 的解析入口为 templateTable_x86_64.cpp#3667

但模板解释器 templateInterpreter 都是汇编代码,不易读,且实现逻辑与字节码解释器 bytecodeInterpreter 大体一致。因此本文的源码都以 bytecodeInterpreter 来说明,借此窥探 synchronized 的实现原理。在看代码之前,先介绍几个在偏向锁中会被大量应用的概念,以便后续理解:

prototype_header:JVM 中的每个类有一个类似 mark wordprototype_header,用来标记该 class 的 epoch 和偏向开关等信息。

匿名偏向状态:锁对象 mark word 标志位为 101,且存储的 Thread ID 为空时的状态 (即锁对象为偏向锁,且没有线程偏向于这个锁对象)。

Atomic::cmpxchg_ptr:CAS 函数。这个方法有三个参数,依次为 exchange_valuedestcompare_value。如果 dest 的值为 compare_value 则更新为 exchange_value,并返回 compare_value。否则,不更新并返回实际原值

接下来开始源码实现分析,HotSpot 中偏向锁的具体实现可参考 bytecodeInterpreter.cpp#1816,代码如下:

CASE(_monitorenter): {  
  //锁对象
  oop lockee = STACK_OBJECT(-1);
  // derefing's lockee ought to provoke implicit null check
  CHECK_NULL(lockee);
  // 步骤1
  // 在栈中找到第一个空闲的Lock Record
  // 会找到栈中最高的
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  BasicObjectLock* entry = NULL;
  while (most_recent != limit ) {
    if (most_recent->obj() == NULL) entry = most_recent;
    else if (most_recent->obj() == lockee) break;
    most_recent++;
  }
  // entry不为null,代表还有空闲的Lock Record
  if (entry != NULL) {
    // 将Lock Record的obj指针指向锁对象
    entry->set_obj(lockee);
    int success = false;
    uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
    // markoop即对象头的mark word
    markOop mark = lockee->mark();
    intptr_t hash = (intptr_t) markOopDesc::no_hash;
    // 步骤2
    // implies UseBiasedLocking
    // 如果为偏向模式,即判断标识位是否为101
    if (mark->has_bias_pattern()) {
      ...
      // 一顿操作
      anticipated_bias_locking_value =
        (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
        ~((uintptr_t) markOopDesc::age_mask_in_place);
      // 步骤3
      if  (anticipated_bias_locking_value == 0) {
        // already biased towards this thread, nothing to do
        // 偏向的是自己,啥都不做
        if (PrintBiasedLockingStatistics) {
          (* BiasedLocking::biased_lock_entry_count_addr())++;
        }
        success = true;
      }
      // class的prototype_header不是偏向模式
      else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
        // 尝试撤销偏向
                ...
      }
      // epoch过期,重新偏向
      else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
        // try rebias
                ...
        success = true;    
      }
      else {
        // try to bias towards thread in case object is anonymously biased
        // 尝试偏向该线程,只有匿名偏向能成功
        // 构建了匿名偏向的mark word
        markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        // 用「或」操作设置thread ID
        markOop new_header = (markOop) ((uintptr_t) header | thread_ident);、
        // 只有匿名偏向才能成功
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
          // cas修改成功    
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
        }
        else {
          // 失败说明存在竞争,进入monitorenter
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
    }
    // 步骤4
    // traditional lightweight locking
    // false走轻量级锁逻辑
    if (!success) {
      // 构造一个无锁状态的Displaced Mark Word,并将lock record指向它
      markOop displaced = lockee->mark()->set_unlocked();
      entry->lock()->set_displaced_header(displaced);
      bool call_vm = UseHeavyMonitors;
      if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
        // 如果CAS替换不成功,代表锁对象不是无锁状态,这时候判断下是不是锁重入
        // Is it simple recursive case?
        if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
          // 如果是锁重入,则直接将Displaced Mark Word设置为null
          // 轻量级锁重入是使用lock record的数量来计入的
          entry->lock()->set_displaced_header(NULL);
        } else {
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
      }
    }
    UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  } else {
    // 没拿到lock record,重新执行
    istate->set_msg(more_monitors);
    UPDATE_PC_AND_RETURN(0); // Re-execute
  }
}

偏向锁流程:

步骤 1、从当前线程的栈中找到一个空闲的 Lock Record,并指向当前锁对象。

步骤 2、获取对象的 markOop 数据 mark,即对象头的 Mark Word;

步骤 3、判断锁对象的 mark word 是否是偏向模式,即低 3 位是否为 101。若不是,进入步骤 4。若是,计算 anticipated_bias_locking_value,判断偏向状态:

步骤 3.1anticipated_bias_locking_value 若为 0,代表偏向的线程是当前线程mark word 的 epoch 等于 class 的 epoch,这种情况下直接执行同步代码块,什么都不用做。

步骤 3.2、判断 class 的 prototype_header 是否为非偏向模式。若为非偏向模式,CAS 尝试将对象恢复为无锁状态。无论 cas 是否成功都会进入轻量级锁逻辑。

步骤 3.3、如果 epoch 偏向时间戳已过期,则需要重偏向。利用 CAS 指令将锁对象的 mark word 替换为一个偏向当前线程且 epoch 为类的 epoch 的新的 mark word

步骤 3.4、CAS 将偏向线程改为当前线程,如果当前是匿名偏向(即对象头中的 bit field 存储的 Thread ID 为空)且无并发冲突,则能修改成功获取偏向锁,否则进入锁升级的逻辑。

步骤 4、走到一步会进行轻量级锁逻辑。构造一个无锁状态的 mark word,然后存储到 Lock Record。设置为无锁状态的原因是:轻量级锁解锁时是将对象头的 mark wordcas 替换为 Lock Record 中的 Displaced Mark Word,所以设置为无锁状态。如果是锁重入,则将 Lock RecordDisplaced Mark Word 设置为 null,放到栈帧中,起到计数作用。

以上是偏向锁加锁的大致流程,如果当前锁已偏向其他线程 || epoch 值过期 || class 偏向模式关闭 || 获取偏向锁的过程中存在并发冲突,都会进入到 InterpreterRuntime::monitorenter 方法, 在该方法中会进行偏向锁撤销和升级。流程如下图所示:

偏向锁

Issue:有的同学可能会问了,对象一开始不是无锁状态吗,为什么上述偏向锁逻辑没有判断无锁状态的锁对象(001)?

只有匿名偏向的对象才能进入偏向锁模式。JVM 启动时会延时初始化偏向锁,默认是 4000ms。初始化后会将所有加载的 Klass 的 prototype header 修改为匿名偏向样式。当创建一个对象时,会通过 Klass 的 prototype_header 来初始化该对象的对象头。简单的说,偏向锁初始化结束后,后续所有对象的对象头都为匿名偏向样式,在此之前创建的对象则为无锁状态。而对于无锁状态的锁对象,如果有竞争,会直接进入到轻量级锁。这也是为什么 JVM 启动前 4 秒对象会直接进入到轻量级锁的原因。

为什么需要延迟初始化?

JVM 启动时必不可免会有大量 sync 的操作,而偏向锁并不是都有利。如果开启了偏向锁,会发生大量锁撤销和锁升级操作,大大降低 JVM 启动效率。

因此,我们可以明确地说,只有锁对象处于匿名偏向状态,线程才能拿到到我们通常意义上的偏向锁。而处于无锁状态的锁对象,只能进入到轻量级锁状态。

2.3.2 偏向锁的撤销

偏向锁的 撤销(revoke)是一个很特殊的操作,为了执行撤销操作,需要等待全局安全点,此时所有的工作线程都停止了执行。偏向锁的撤销操作并不是将对象恢复到无锁可偏向的状态(注意区分偏向锁撤销和释放这两个概念,撤销的触发见上图),而是在偏向锁的获取过程中,发现竞争时,直接将一个被偏向的对象升级到被加了轻量级锁的状态。这个操作的具体完成方式如下:

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))  
  ...
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
    // 开启了偏向锁
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  ...

如果开启了 JVM 偏向锁,则会进入到 ObjectSynchronizer::fast_enter 方法中。

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {  
 //再次校验
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      //不在安全点的执行
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");    
      //批量撤销,底层调用bulk_revoke_or_rebias_at_safepoint
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }
 slow_enter (obj, lock, THREAD) ;
}

主要看 BiasedLocking::revoke_and_rebias 方法。这个方法的主要作用像它的方法名:撤销或者重偏向。第一个参数封装了锁对象和当前线程,第二个参数代表是否允许重偏向,这里是 true。

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {  
  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
  markOop mark = obj->mark(); //获取锁对象的对象头
  if (mark->is_biased_anonymously() && !attempt_rebias) {
    // 如果锁对象为匿名偏向状态且不允许重偏向下,进入该分支。在一个非全局安全点进行偏向锁撤销
    markOop biased_value       = mark;
    // 创建一个匿名偏向的markword
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    // 通过cas重新设置偏向锁状态
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {// 如果CAS成功,返回偏向锁撤销状态
      return BIAS_REVOKED;
    }
  } else if (mark->has_bias_pattern()) {
    // 锁为偏向模式(101)会走到这里 
    Klass* k = obj->klass(); 
    markOop prototype_header = k->prototype_header();
    // 如果对应class关闭了偏向模式
    if (!prototype_header->has_bias_pattern()) {
      markOop biased_value       = mark;
      // CAS更新对象头markword为非偏向锁
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED; // 返回偏向锁撤销状态
    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      // 如果epoch过期,则进入当前分支
      if (attempt_rebias) {
        // 如果允许重偏
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        // 通过CAS操作, 将本线程的 ThreadID 、时间戳、分代年龄尝试写入对象头中
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) { //CAS成功,则返回撤销和重新偏向状态
          return BIAS_REVOKED_AND_REBIASED;
        }
      } else {
        // 如果不允许尝试获取偏向锁,进入该分支取消偏向
        // 通过CAS操作更新分代年龄
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) { //如果CAS操作成功,返回偏向锁撤销状态
          return BIAS_REVOKED;
        }
      }
    }
  }
  //执行到这里有以下两种情况:
  //1.对象不是偏向模式
  //2.上面的cas操作失败
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    // 非偏向从这出去
    // 轻量级锁、重量级锁
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {
    // 撤销单个线程
    // Mark,最常见的执行分支
    // Mark,最常见的执行分支
    // Mark,最常见的执行分支
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    if (mark->biased_locker() == THREAD &&
        prototype_header->bias_epoch() == mark->bias_epoch()) {
      // 偏向当前线程且不过期
      // 这里撤销的是偏向当前线程的锁,调用Object#hashcode方法时也会走到这一步
      // 因为只要遍历当前线程的栈就能拿到lock record了,所以不需要等到safe point再撤销。
      ResourceMark rm;
      if (TraceBiasedLocking) {
        tty->print_cr("Revoking bias by walking my own stack:");
      }
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      return cond;
    } else {
      // 下面代码最终会在safepoint调用revoke_bias方法撤销偏向
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      return revoke.status_code();
    }
  }
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");
   //批量撤销、批量重偏向的逻辑
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

这块代码注释写的算是比较清楚,只简单介绍下最常见的情况:锁已经偏向线程 A,此时线程 B 尝试获取锁。这种情况下会走到 Mark 标记的分支。如果需要撤销的是当前线程,只要遍历当前线程的栈就能拿到 lock record,可以直接调用 revoke_bias,不需要等到 safe point 再撤销。在调用 Object#hashcode 时,也会走到该分支将为偏向锁的锁对象直接恢复为无锁状态。若不是当前线程,会被 push 到 VM Thread 中等到 safepoint 的时候再执行。

VMThread 内部维护了一个 VMOperationQueue 类型的队列,用于保存内部提交的 VM 线程操作 VM_operation。GC、偏向锁的撤销等操作都是在这里被执行。

撤销调用的 revoke_bias 方法的代码就不贴了。大致逻辑是:

步骤 1、查看偏向的线程是否存活,如果已经死亡,则直接撤销偏向锁。JVM 维护了一个集合存放所有存活的线程,通过遍历该集合判断某个线程是否存活。

步骤 2、偏向的线程是否还在同步块中,如果不在,则撤销偏向锁。如果在同步块中,执行步骤 3。这里是否在同步块的判断基于上文提到的偏向锁的重入计数方式:在偏向锁的获取中,每次进入同步块的时候都会在栈中找到第一个可用(即栈中最高的)的 Lock Record,将其 obj 字段指向锁对象。每次解锁的时候都会把最低的 Lock Record 移除掉,所以可以通过遍历线程栈中的 Lock Record 来判断是否还在同步块中。轻量级锁的重入也是基于 Lock Record 的计数来判断。

步骤 3、升级为轻量级锁。将偏向线程所有相关 Lock RecordDisplaced Mark Word 设置为 null,再将最高位的 Lock RecordDisplaced Mark Word 设置为无锁状态,然后将对象头指向最高位的 Lock Record。这里没有用到 CAS 指令,因为是在 safepoint,可以直接升级成轻量级锁。

2.3.3 偏向锁的释放

偏向锁的释放可参考 bytecodeInterpreter.cpp#1923,这里也不贴了。偏向锁的释放只要将对应 Lock Record 释放就好了,但这里的释放并不会将 mark word 里面的 thread ID 去掉,这样做是为了下一次更方便的加锁。而轻量级锁则需要将 Displaced Mark Word 替换到对象头的 mark word 中。如果 CAS 失败或者是重量级锁则进入到 InterpreterRuntime::monitorexit 方法中。

2.3.4 批量重偏向与撤销

从上节偏向锁的加锁解锁过程中可以看出,当只有一个线程反复进入同步块时,偏向锁带来的性能开销基本可以忽略,但是当有其他线程尝试获得锁时,就需要等到 safe point 时将偏向锁撤销为无锁状态或升级为轻量级 / 重量级锁。因此,JVM 中增加了一种批量重偏向 / 撤销的机制以减少锁撤销的开销,而 mark word 中的 epoch 也是在这里被大量应用,这里不展开说明。但无论怎么优化,偏向锁的撤销仍有一定不可避免的成本。如果业务场景存在大量多线程竞争,那偏向锁的存在不仅不能提高性能,而且会导致性能下降(偏向锁并不都有利,jdk15 默认不开启)。

2.4 轻量级锁

引入轻量级锁的目的:在多线程交替执行同步块的情况下,尽量避免重量级锁使用的操作系统互斥量带来的开销,但是如果多个线程在同一时刻进入临界区,会导致轻量级锁膨胀升级重量级锁,所以轻量级锁的出现并非是要替代重量级锁。

2.4.1 进入轻量级锁

轻量级锁在上文或多或少已经涉及到,其获取流程入口为 bytecodeInterpreter.cpp#1816。前大半部分都是偏向锁逻辑,还有一部分为轻量级锁逻辑。在偏向锁逻辑中,cas 失败会执行到 InterpreterRuntime::monitorenter。在轻量级锁逻辑中,如果当前线程不是轻量级锁的重入,也会执行到 InterpreterRuntime::monitorenter。我们再看看 InterpreterRuntime::monitorenter 方法:

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))  
  ...
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  ...
IRT_END  

fast_enter 的流程在偏向锁的撤销小节中已经分析过,主要逻辑为 revoke_and_rebias:如果当前是偏向模式且偏向的线程还在使用锁,会将锁的 mark word 改为轻量级锁的状态,并将偏向的线程栈中的 Lock Record 修改为轻量级锁对应的形式(此时 Lock Record 是无锁状态),且返回值不是 BIAS_REVOKED_AND_REBIASED,会继续执行 slow_enter

我们直接看 slow_enter 的流程:

void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {  
  // 步骤1
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  // 步骤2
  // 如果为无锁状态
  if (mark->is_neutral()) {
    // 步骤3
    // 设置mark word到栈 
    lock->set_displaced_header(mark);
    // CAS更新指向栈中Lock Record的指针
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
    // Fall through to inflate() ... cas失败走下面锁膨胀方法
  } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    // 步骤4
    // 为轻量级锁且owner为当前线程
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    // 设置Displaced Mark Word为null,重入计数用
    lock->set_displaced_header(NULL);
    return;
  }
  // 步骤5
  // 走到这一步说明已经是存在多个线程竞争锁了,需要膨胀或已经是重量级锁
  lock->set_displaced_header(markOopDesc::unused_mark());
  // 进入、膨胀到重量级锁的入口
  // 膨胀后再调用monitor的enter方法竞争锁
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

步骤 1markOop mark = obj->mark() 方法获取对象的 markOop 数据 mark;

步骤 2mark->is_neutral() 方法判断 mark 是否为无锁状态,标识位 001

步骤 3、如果 mark 处于无锁状态,把 mark 保存到 BasicLock 对象 (Lock Record 的属性) 的 displaced_header 字段;

步骤 3.1、通过 CAS 尝试将 Mark Word 更新为指向 BasicLock 对象的指针,如果更新成功,表示竞争到锁,则执行同步代码,否则执行步骤 4;

步骤 4、如果是重入,则设置 Displaced Mark Word 为 null。

步骤 5、到这说明有多个线程竞争轻量级锁,轻量级锁需要膨胀升级为重量级锁;

结合上文偏向锁的流程,可以整理得到如下的流程图:

轻量级锁

2.4.2 轻量级锁的释放

轻量级锁释放的入口在 bytecodeInterpreter.cpp#1923

轻量级锁释放时需要将 Displaced Mark Word 替换回对象头的 mark word 中。如果 CAS 失败或者是重量级锁则进入到 InterpreterRuntime::monitorexit 方法中。monitorexit 直接调用 slow_exit 方法释放 Lock Record。直接看 slow_exit

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))  
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (elem == NULL || h_obj()->is_unlocked()) {
    THROW(vmSymbols::java_lang_IllegalMonitorStateException());
  }
  // 直接调用slow_exit
  ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
  // Free entry. This must be done here, since a pending exception might be installed on
  // exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
  elem->set_obj(NULL);
IRT_END

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {  
  fast_exit (object, lock, THREAD) ;
}

void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {  
  ...
  // displaced header就是对象mark word的拷贝
  markOop dhw = lock->displaced_header();
  markOop mark ;
  if (dhw == NULL) {
     // 什么也不做
     // Recursive stack-lock. 递归堆栈锁
     // Diagnostics -- Could be: stack-locked, inflating, inflated. 
     ...
     return ;
  }
  mark = object->mark() ;
  // 此处为轻量级锁的释放过程,使用CAS方式解锁。
  // 如果对象被当前线程堆栈锁定,尝试将displaced header和锁对象中的MarkWord替换回来。
  // If the object is stack-locked by the current thread, try to
  // swing the displaced header from the box back to the mark.
  if (mark == (markOop) lock) {
     assert (dhw->is_neutral(), "invariant") ;
     if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
        TEVENT (fast_exit: release stacklock) ;
        return;
     }
  }
  //走到这里说明已经是重量级锁或者解锁时发生了竞争,膨胀后再调用monitor的exit方法释放
  ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}

最后执行的是如果是 fast_exit 方法。如果是轻量级锁,尝试 cas 替换 mark word。若解锁时有竞争,会调用 inflate 方法进行重量级锁膨胀,升级到到重量级锁后再执行 exit 方法。

2.5 重量级锁

2.5.1 重量级锁的进入

重量级锁通过对象内部的监视器(monitor)实现,其依赖于底层操作系统的 Mutex Lock 实现,需要额外的用户态到内核态切换的开销。由上文分析,slow_enter 获取轻量级锁未成功时,会在 inflate 中完成锁膨胀:

ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {  
  ...
  for (;;) {
      const markOop mark = object->mark() ;
      assert (!mark->has_bias_pattern(), "invariant") ;  
      // mark是以下状态中的一种:
      // *  Inflated(重量级锁状态)     - 直接返回
      // *  Stack-locked(轻量级锁状态) - 膨胀
      // *  INFLATING(膨胀中)    - 忙等待直到膨胀完成
      // *  Neutral(无锁状态)      - 膨胀
      // *  BIASED(偏向锁)       - 非法状态,在这里不会出现

      // CASE: inflated
      if (mark->has_monitor()) {
          // 已经是重量级锁状态了,直接返回
          ObjectMonitor * inf = mark->monitor() ;
          ...
          return inf ;
      }
      // CASE: inflation in progress
      if (mark == markOopDesc::INFLATING()) {
         // 正在膨胀中,说明另一个线程正在进行锁膨胀,continue重试
         TEVENT (Inflate: spin while INFLATING) ;
         // 在该方法中会进行spin/yield/park等操作完成自旋动作 
         ReadStableMark(object) ;
         continue ;
      }
      // 当前是轻量级锁,后面分析
      // CASE: stack-locked
          if (mark->has_locker()) {
        ...
      }
      // 无锁状态
      // CASE: neutral
      // 分配以及初始化ObjectMonitor对象
      ObjectMonitor * m = omAlloc (Self) ;
      // prepare m for installation - set monitor to initial state
      m->Recycle();
      m->set_header(mark);
      // owner为NULL
      m->set_owner(NULL);
      m->set_object(object);
      m->OwnerIsThread = 1 ;
      m->_recursions   = 0 ;
      m->_Responsible  = NULL ;
      m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;       // consider: keep metastats by type/class
        // 用CAS替换对象头的mark word为重量级锁状态
      if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
          // 不成功说明有另外一个线程在执行inflate,释放monitor对象
          m->set_object (NULL) ;
          m->set_owner  (NULL) ;
          m->OwnerIsThread = 0 ;
          m->Recycle() ;
          omRelease (Self, m, true) ;
          m = NULL ;
          continue ;
          // interference - the markword changed - just retry.
          // The state-transitions are one-way, so there's no chance of
          // live-lock -- "Inflated" is an absorbing state.
      }

      ...
      return m ;
}

inflate 其中是一个 for 循环,主要是为了处理多线程同时调用 inflate 的情况。然后会根据锁对象的状态进行不同的处理:

1. 已经是重量级状态,说明膨胀已经完成,返回并继续执行 ObjectMonitor::enter 方法。
2. 如果是轻量级锁则需要进行膨胀操作。
3. 如果是膨胀中状态,则进行忙等待。
4. 如果是无锁状态则需要进行膨胀操作。

轻量级锁膨胀流程如下:

if (mark->has_locker()) {  
  // 步骤1
  // 当前轻量级锁状态,先分配一个ObjectMonitor对象,并初始化值
  ObjectMonitor * m = omAlloc (Self) ;          
  m->Recycle();
  m->_Responsible  = NULL ;
  m->OwnerIsThread = 0 ;
  m->_recursions   = 0 ;
  m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;   // Consider: maintain by type/class
  // 步骤2
  // 将锁对象的mark word设置为INFLATING (0)状态 
  markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
  if (cmp != mark) {
    omRelease (Self, m, true) ;
    continue ;       // Interference -- just retry
  }
  // 步骤3
  // 栈中的displaced mark word
  markOop dmw = mark->displaced_mark_helper() ;
  assert (dmw->is_neutral(), "invariant") ;
  // 设置monitor的字段
  m->set_header(dmw) ;
  // owner为Lock Record
  m->set_owner(mark->locker());
  m->set_object(object);
  ...
  // 步骤4
  // 将锁对象头设置为重量级锁状态
  object->release_set_mark(markOopDesc::encode(m));
  ...
  return m ;
}

步骤 1、调用 omAlloc 获取一个可用的 ObjectMonitor 对象。在 omAlloc 方法中会先从线程私有monitor 集合 omFreeList 中分配对象,如果 omFreeList 中已经没有 monitor 对象,则从 JVM 全局gFreeList 中分配一批 monitoromFreeList 中;

步骤 2、通过 CAS 尝试将 Mark Word 设置为 markOopDesc:INFLATING,标识当前锁正在膨胀中。如果 CAS 失败,说明同一时刻其它线程已经将 Mark Word 设置为 markOopDesc:INFLATING,当前线程进行自旋等待膨胀完成。

步骤 3、如果 CAS 成功,设置 monitor 的各个字段:设置 monitor 的 header 字段为 displaced mark word,owner 字段为 Lock Record,obj 字段为锁对象等;

步骤 4、设置锁对象头的 mark word 为重量级锁状态,指向第一步分配的 monitor 对象;

2.5.2 monitor 竞争

当锁膨胀 inflate 执行完并返回对应的 ObjectMonitor 时,并不表示该线程竞争到了锁,真正的锁竞争发生在 ObjectMonitor::enter 方法中。

void ATTR ObjectMonitor::enter(TRAPS) {  
  Thread * const Self = THREAD ;
  void * cur ;
  // 步骤1
  // owner为null,如果能CAS设置成功,则当前线程直接获得锁
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL) {
     ...
     return ;
  }
  // 如果是重入的情况
  if (cur == Self) {
     // TODO-FIXME: check for integer overflow!  BUGID 6557169.
     _recursions ++ ;
     return ;
  }
  // 步骤2
  // 如果当前线程是之前持有轻量级锁的线程
  // 上节轻量级锁膨胀将owner指向之前Lock Record的指针
  // 这里利用owner判断是否第一次进入。
  if (Self->is_lock_owned ((address)cur)) {
    assert (_recursions == 0, "internal state error");
    // 重入计数重置为1
    _recursions = 1 ;
    // 设置owner字段为当前线程
    _owner = Self ;
    OwnerIsThread = 1 ;
    return ;
  }
  ...
  // 步骤3
  // 在调用系统的同步操作之前,先尝试自旋获得锁
  if (Knob_SpinEarly && TrySpin (Self) > 0) {    
     ...
     //自旋的过程中获得了锁,则直接返回
     Self->_Stalled = 0 ;
     return ;
  }
  ...
  { 
    ...
    // 步骤4
    for (;;) {
      jt->set_suspend_equivalent();
      // 在该方法中调用系统同步操作
      EnterI (THREAD) ;
      ...
    }
    Self->set_current_pending_monitor(NULL); 
  }
  ...
}

步骤 1、当前是无锁、锁重入,简单操作后返回。

步骤 2、当前线程是之前持有轻量级锁的线程,则为首次进入,设置 recursions 为 1,owner 为当前线程,该线程成功获得锁并返回。

步骤 3、先自旋尝试获得锁,尽可能减少同步操作带来的开销。

步骤 4、调用 EnterI 方法。

这里注意,轻量级锁膨胀成功时,会把 owner 字段设置为 Lock Record 的指针,并在竞争时判断。这么做的原因是,假设当前线程 A 持有锁对象的锁,线程 B 进入同步代码块,并把锁对象升级为重量级锁。但此时,线程 A 可能还在执行,并无法感知其持有锁对象的变化。因此,需要线程 B 在执行 ObjectMonitor::enter 时,将自己放入到阻塞等列等待。并需要线程 A 第二次进入、或者退出的时候对 monitor 进行一些操作,以此保证代码块的同步。

这里有个自旋操作,直接看 TrySpin 对应的方法:

// TrySpin对应的方法
int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) {  
    // Dumb, brutal spin.  Good for comparative measurements against adaptive spinning.
    int ctr = Knob_FixedSpin ;  // 固定自旋次数
    if (ctr != 0) {
        while (--ctr >= 0) {
            if (TryLock (Self) > 0) return 1 ;
            SpinPause () ;
        }
        return 0 ;
    }
    // 上一次自旋次数
    for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) {
      if (TryLock(Self) > 0) {  // 尝试获取锁
        // Increase _SpinDuration ...
        // Note that we don't clamp SpinDuration precisely at SpinLimit.
        // Raising _SpurDuration to the poverty line is key.
        int x = _SpinDuration ;
        if (x < Knob_SpinLimit) {
           if (x < Knob_Poverty) x = Knob_Poverty ;
           _SpinDuration = x + Knob_BonusB ;
        }
        return 1 ;
      }
      ...
      ...

从方法名和注释可以看出,这就是自适应自旋,和网上说的轻量级锁 cas 失败会自旋的说法并不一致。实际上,无论是轻量级锁 cas 自旋还是重量级锁 cas 自旋,都是在用户态尽可能减少同步操作带来的开销,并没有太多本质上的区别。 到此为止,我们可以再结合上述的内容,整理出如下的状态转换图:

重量级锁

2.5.3 monitor 等待

ObjectMonitor 竞争失败的线程,通过自旋执行 ObjectMonitor::EnterI 方法等待锁的释放,EnterI 方法的部分逻辑实现如下:

void ATTR ObjectMonitor::EnterI (TRAPS) {  
        // 尝试自旋
    if (TrySpin (Self) > 0) {
        ...
        return ;
    }
    ...
    // 将线程封装成node节点中
    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;
    // 将node节点插入到_cxq队列的头部,cxq是一个单向链表
    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
        // CAS失败的话 再尝试获得锁,这样可以降低插入到_cxq队列的频率
        if (TryLock (Self) > 0) {
            ...
            return ;
        }
    }
        ...
}

EnterI 大致原理:一个 ObjectMonitor 对象包括两个同步队列(_cxq_EntryList) ,以及一个等待队列_WaitSet。cxq、EntryList 、WaitSet 都是由 ObjectWaiter 构成的链表结构。其中,_cxq 为单向链表,_EntryList 为双向链表。

同步队列、阻塞队列

当一个线程尝试获得重量级锁且没有竞争到时,该线程会被封装成一个 ObjectWaiter 对象插入到 cxq 的队列的队首,然后调用 park 函数挂起当前线程,进入 BLOCKED 状态。当线程释放锁时,会根据唤醒策略,从 cxq 或 EntryList 中挑选一个线程 unpark 唤醒。如果线程获得锁后调用 Object#wait 方法,则会将线程加入到 WaitSet 中,进入 WAITING 或 TIMED_WAITING 状态。当被 Object#notify 唤醒后,会将线程从 WaitSet 移动到 cxq 或 EntryList 中去,进入 BLOCKED 状态。需要注意的是,当调用一个锁对象的 waitnotify 方法时,若当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。

2.5.4 monitor 释放

当某个持有锁的线程执行完同步代码块时,会进行锁的释放。在 HotSpot 中,通过退出 monitor 的方式实现锁的释放,并通知被阻塞的线程,具体实现位于 ObjectMonitor::exit 方法中。

void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {  
   Thread * Self = THREAD ;
   // 如果_owner不是当前线程
   if (THREAD != _owner) {
     // 轻量级锁膨胀上来,还没调用过enter方法,_owner还指向之前轻量级锁Lock Record的指针。
     if (THREAD->is_lock_owned((address) _owner)) {
       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } else {
       // 异常情况:当前不是持有锁的线程
       TEVENT (Exit - Throw IMSX) ;
       assert(false, "Non-balanced monitor enter/exit!");
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       return;
     }
   }
   // 重入计数器还不为0,则计数器-1后返回
   if (_recursions != 0) {
     _recursions--;        // this is simple recursive enter
     TEVENT (Inflated exit - recursive) ;
     return ;
   }
   ...
   //这块开始是唤醒操作
   for (;;) {
     ...
     ...
     ObjectWaiter * w = NULL ;
     // 根据QMode的不同会有不同的唤醒策略,默认为0
     int QMode = Knob_QMode ;
     if (QMode == 2 && _cxq != NULL) {
          ...
          ...

步骤 1、处理 owner 不是当前线程的状况。这里特指之前持有轻量级锁的线程,由于没有调用过 enter,owner 指向仍为 Lock Record 的指针,以及其他异常情况。

步骤 2、重入计数器还不为 0,则计数器 - 1 后返回。

步骤 3、唤醒操作。根据不同的策略(由 QMode 指定),从 cxq 或 EntryList 中获取头节点,通过 ObjectMonitor::ExitEpilog 方法唤醒该节点封装的线程,唤醒操作最终由 unpark 完成。

根据 QMode 的不同 (默认为 0),有不同的处理方式:

QMode = 0:暂时什么都不做;
QMode = 2 且 cxq 非空:取 cxq 队列队首的 ObjectWaiter 对象,调用 ExitEpilog 方法,该方法会唤醒 ObjectWaiter 对象的线程,然后立即返回,后面的代码不会执行了;
QMode = 3 且 cxq 非空:把 cxq 队列插入到 EntryList 的尾部;
QMode = 4 且 cxq 非空:把 cxq 队列插入到 EntryList 的头部;

只有 QMode=2 的时候会提前返回,等于 0、3、4 的时继续执行:

1. 如果 EntryList 的首元素非空,就取出来调用 ExitEpilog 方法,该方法会唤醒 ObjectWaiter 对象的线程,然后立即返回;
2. 如果 EntryList 的首元素为空,就将 cxq 的所有元素放入到 EntryList 中,然后再从 EntryList 中取出来队首元素执行 ExitEpilog 方法,然后立即返回;
3. 被唤醒的线程,继续竞争 monitor;

2.6 本章小节

本章介绍了 Synchronized 的底层实现和锁升级过程。对于锁升级,再看看本文整理的图,一图胜千言:

重量级锁

这里有几个点可以注意一下:

1.HotSpot 中,只用到了模板解释器,并没有用到字节码解释器,monitorenter 的实际入口位于 templateTable_x86_64.cpp#3667。本文的分析是基于字节码解释器的,因此部分结论不能作为实际执行情况。本章的内容只能作为 Synchronized 锁升级原理、各类锁的适用场景的一种 窥探
2. 再次强调,无锁状态只能升级为轻量级锁,匿名偏向状态才能进入到偏向锁。
3. 偏向锁并不都有利,其适用于单个线程重入的场景,原因为:偏向锁的撤销需要进入 safepoint,开销较大。需要进入 safepoint 是由于,偏向锁的撤销需要对锁对象的 lock record 进行操作,而 lock record 要到其他线程的栈帧中遍历寻找。在非 safepoint,栈帧是动态的,会引入更多的问题。目前看来,偏向锁存在的价值是为历史遗留的 Collection 类如 Vector 和 HashTable 等做优化,迟早药丸。Java 15 中默认不开启。
4. 执行 Object 类的 hashcode 方法,偏向锁撤销并且锁会膨胀为轻量级锁或者重量锁。执行 Object 类的 wait/notify/notifyall 方法,偏向锁撤销并膨胀成重量级锁。
5. 轻量级锁适用于两个线程的交替执行场景:线程 A 进入轻量级锁,退出同步代码块并释放锁,会将锁对象恢复为无锁状态;线程 B 再进入锁,发现为无锁状态,会 cas 尝试获取该锁对象的轻量级锁。如果有竞争,则直接膨胀为重量级锁,没有自旋操作,详情看 10。
6. 唤醒策略依赖于 QMode。重量级锁获取失败后,线程会加入 cxq 队列。当线程释放锁时,会从 cxq 或 EntryList 中挑选一个线程唤醒。线程获得锁后调用 Object#wait 方法,则会将线程加入到 WaitSet 中。当被 Object#notify 唤醒后,会将线程从 WaitSet 移动到 cxq 或 EntryList 中去。
7. 重量级锁,会将线程放进等待队列,等待操作系统调度。而偏向锁和轻量级锁,未交由操作系统调度,依然处于用户态,只是采用 CAS 无锁竞争的方式获取锁。CAS 通过 Unsafe 类中 compareAndSwap 方法,jni 调用 C++ 方法,通过汇编指令锁住 cpu 中的北桥信号。
8. 许多文章声称一个对象关联到一个 monitor,这个说法不够准确。如果对象已经是重量级锁了,对象头的确指向了一个 monitor。但对于正在膨胀的锁,会先从线程私有monitor 集合 omFreeList 中分配对象。如果 omFreeList 中已经没有 monitor 对象,再从 JVM 全局gFreeList 中分配一批 monitoromFreeList 中。
9. 在编译期间还有锁消除锁粗化这两步锁优化操作,本章没做介绍。
10. 字节码实现中没有体现轻量级锁自旋逻辑。这可能是模板解释器中的实现,或者是 jvm 在不同平台、不同 jvm 版本的不同实现。但本文分析的字节码链路中没有发现该逻辑,倒是发现了重量级锁会自适应自旋竞争锁。因此个人对轻量级锁自适应自旋的说法存疑,至少 hotspot jdk8u 字节码实现中没有这个逻辑。但两者都是在用户态进行自适应自旋,以尽可能减少同步操作带来的开销,没有太多本质上的区别,并不需要特别关心。

三、线程的实现与状态转换

3.1 线程的实现

(1)内核线程实现

内核线程(Kernel-Level Thread,KLT):由内核来完成线程切换,内核通过调度器对线程进行调度,并负责将线程的任务映射到各个处理器上。 程序一般不会直接去使用内核线程,而是使用内核线程的一种高级接口 —— 轻量级进程(Light Weight Process,LWP),也就是通常意义上的线程。

优点:每个 LWP 都是独立的调度单元。一个 LWP 被阻塞,不影响其他 LWP。

缺点:基于 KLT,耗资源。线程的创建、析构、同步都需要进行系统调用,频繁的用户态、内核态切换。

内核线程

(2) 用户线程实现(User Thread,UT)

广义:非内核线程,都可认为是用户线程。(包括 LWT,虽然 LWT 的大多操作都要映射到 KLT)

狭义:完全建立在用户空间的线程库上,系统内核不能感知线程存在的实现。UT 也只感知到掌管这些 UT 的进程 P。

优点:用户线程的创建、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。

缺点:线程的创建、销毁、切换和调度都是用户必须考虑到问题。“阻塞如何处理”、“多处理器系统中如何将线程映射到其他处理器上” 这类问题解决起来将会异常困难。

内核线程2

(3) 混合实现 混合模式下,即存在用户线程,也存在轻量级进程。用户线程的创建、切换、析构等操作依然廉价,可以支持大规模的用户线程并发,且可以使用内核线程提供的线程调度功能及处理器映射。

内核线程3

线程的实现依赖操作系统支持的线程模型。在主流的操作系统上,hotspot、classic、art 等虚拟机默认是 1:1 的线程模型。在 Solaris 平台上,hotspot 支持 1:1、N:M 两种线程模型。

3.2 线程的转换

首先明确一点,当我们讨论一个线程的状态,指的是 Thread 类中 threadStatus 的值。

private volatile int threadStatus = 0;  

该值映射后对应的枚举为:

public enum State {  
    NEW,
    RUNNABLE,
    BLOCKED,
    WAITING,
    TIMED_WAITING,
    TERMINATED;
}

也就是说,线程的具体状态,看 threadStatus 就行了。

NEW

先要创建 Thread 类的对象,才能谈其状态。

Thread t = new Thread();  

这个时候,线程 t 就处于新建状态。但他还不是 “线程”。

RUNNABLE

然后调用 start () 方法。

t.start();  

调用 start () 后,会执行一个 native 方法创建内核线程,以 linux 为例:

private native void start0();

// 最后走到这
hotspot/src/os/linux/vm/os_linux.cpp  
pthread_create(...);  

这时候才有一个真正的线程创建出来,并即刻开始运行。这个内核线程与线程 t 进行 1:1 的映射。这时候 t 具备运行能力,进入 RUNNABLE 状态。 RUNNABLE 可以细分为 READY 和 RUNNING,两者的区别只是是否等待到了资源并开始运行。
处于 RUNNABLE 且未运行的线程,会进入一个就绪队列中,等待操作系统的调度。处于就绪队列的线程都在等待资源,这个资源可以是 cpu 的时间片、也可以是系统的 IO。 JVM 并不关系 READY 和 RUNNING 这两种状态,毕竟上述的枚举类都不对 RUNNABLE 进行细分。

TERMINATED

当一个线程执行完毕(或者调用已经不建议的 stop 方法),线程的状态就变为 TERMINATED。进入 TERMINATED 后,线程的状态不可逆,无法再复活。

关于 BLOCKED、WAITING、TIMED_WAITING

BLOCKED、WAITING、TIMED_WAITING 都是带有同步语义的状态,我们先看一下 waitnotify 方法的底层实现。
wait 方法的底层实现:

void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {  
    ...
    ...
  //获得Object的monitor对象
  ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
  DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
  //调用monitor的wait方法
  monitor->wait(millis, true, THREAD);
    ...
}

  inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
    ...
  if (_WaitSet == NULL) {
    //_WaitSet为null,就初始化_waitSet
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    //否则就尾插
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

主要流程:通过 object 获得 objectMonitor,将 Thread 封装成 OjectWaiter 对象,然后 addWaiter 将它插入 waitSet 中,进入 waiting 或 timed_waiting 状态。最后释放锁,并通过底层的 park 方法挂起线程;

notify 方法的底层实现:

void ObjectSynchronizer::notify(Handle obj, TRAPS) {  
    ...
    ...
    ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
    //通过inflate方法得到ObjectMonitor对象
    ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
    ...
     if (mark->has_monitor()) {
          ObjectMonitor * inf = mark->monitor() ;
          assert (inf->header()->is_neutral(), "invariant");
          assert (inf->object() == object, "invariant") ;
          assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
          return inf 
      }
    ...
      }
    //调用ObjectMonitor的notify方法
    void ObjectMonitor::notify(TRAPS) {
    ...
    //调用DequeueWaiter方法移出_waiterSet第一个结点
    ObjectWaiter * iterator = DequeueWaiter() ;
    //将上面DequeueWaiter尾插入_EntrySet或cxq等操作
    ...
    ...
  }

通过 object 获得 objectMonitor,调用 objectMonitor 的 notify 方法。这个 notify 最后会走到 ObjectMonitor::DequeueWaiter 方法,获取 waitSet 列表中的第一个 ObjectWaiter 节点。并根据不同的策略,将取出来的 ObjectWaiter 节点,加入到 EntryListcxq 中。 notifyAll 的实现类似于 notify,主要差别在多了个 for 循环。

由这里以及上一章 2.5.4 monitor 释放小节中可以了解到,notifynotifyAll 并不会立即释放所占有的 ObjectMonitor 对象,其真正释放 ObjectMonitor 的时间点是在执行 monitorexit 指令。

一旦释放 ObjectMonitor 对象了,entryListcxq 中的 ObjectWaiter 节点会依据 QMode 所配置的策略,通过 ExitEpilog 方法唤醒取出来的 ObjectWaiter 节点。被唤醒的线程,继续参与 monitor 的竞争。若竞争失败,重新进入 BLOCKED 状态,再回顾一下 monitor 的核心结构。

同步队列、阻塞队列

既然聊到了 waitnotify,那顺便也看下 joinsleeppark

打开 Thread.join () 的源码:

public final synchronized void join(long millis) throws InterruptedException {  
    ...
  if (millis == 0) {
    while (isAlive()) {
      wait(0);
    }
  } else {
    while (isAlive()) {
      long delay = millis - now;
      if (delay <= 0) {
        break;
      }
      wait(delay);
      now = System.currentTimeMillis() - base;
    }
  }
}

join 的本质仍然是 wait() 方法。在使用 join 时,JVM 会帮我们隐式调用 notify,因此我们不需要主动 notify 唤醒主线程。 而 sleep() 方法最终是调用 SleepEvent 对象的 park 方法:

int os::sleep(Thread* thread, jlong millis, bool interruptible) {  
  //获取thread中的_SleepEvent对象
  ParkEvent * const slp = thread->_SleepEvent ;
  ...
  //如果是允许被打断
  if (interruptible) {
    //记录下当前时间戳,这是时间比较的基准
    jlong prevtime = javaTimeNanos();
    for (;;) {
      //检查打断标记,如果打断标记为true,则直接返回
      if (os::is_interrupted(thread, true)) {
        return OS_INTRPT;
      }
      //线程被唤醒后的当前时间戳
      jlong newtime = javaTimeNanos();
      //睡眠毫秒数减去当前已经经过的毫秒数
      millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      //如果小于0,那么说明已经睡眠了足够多的时间,直接返回
      if (millis <= 0) {
        return OS_OK;
      }
      //更新基准时间
      prevtime = newtime;
      //调用_SleepEvent对象的park方法,阻塞线程
      slp->park(millis);
    }
  } else {
    //如果不能打断,除了不再返回OS_INTRPT以外,逻辑是完全相同的
    for (;;) {
      ...
      slp->park(millis);
      ...
    }
    return OS_OK ;
  }
}

Thread.sleep 在 jvm 层面上是调用 thread 中 SleepEvent 对象的 park() 方法实现阻塞线程,在此过程中会通过判断时间戳来决定线程的睡眠时间是否达到了指定的毫秒。看到这里,对于 sleepwait 的区别应该会有更深入的理解。

parkunpark 方法也与同步语义无关。每个线程都与一个许可 (permit) 关联。unpark 函数为线程提供 permit,线程调用 park 函数则等待并消耗 permit。park 和 unpark 方法具体实现比较复杂,这里不展开。 到此为止,我们可以整理出如下的线程状态转换图。

内核线程3

3.3 本章小节

Java 将 OS 经典五种状态中的 ready 和 running,统一为 RUNNABLE。将 WAITING(即不可能得到 CPU 运行机会的状态)细分为了 BLOCKED、WAITING、TIMED_WAITING。本章的内容较为简短,因为部分的内容已囊括在第一章中。

这里提个会使人困惑的问题: 使用 socket 时,调用 accept (),read () 等阻塞方法时,线程处于什么状态?

答案是 java 线程处于 RUNNABLE 状态,OS 线程处于 WAITING 状态。因为在 jvm 层面,等待 cpu 时间片和等待 io 资源是等价的。

这里有几个点可以注意一下:

1.JVM 线程状态不代表内核线程状态。
2.BLOCKED 的线程一定处于 entryList 或 cxq 中,而处于 WAITING 和 TIMED WAITING 的线程,可能是由于执行了 sleep 或 park 进入该状态,不一定在 waitSet 中。也就是说,处于 BLOCKED 状态的线程一定是与同步相关。由这可延伸出,调用 jdk 的 lock 并获取不到锁的线程,进入的是 WAITING 或 TIMED_WAITING 状态,而不是 BLOCKED 状态。

四、相关资料

本文主要参考资料:

死磕 Synchronized 底层实现 -- 概论

死磕 Synchronized 底层实现 -- 偏向锁

死磕 Synchronized 底层实现 -- 轻量级锁

死磕 Synchronized 底层实现 -- 重量级锁

本文转自有赞技术