intbthread_mutex_init(bthread_mutex_t* __restrict m, constbthread_mutexattr_t* __restrict){ bthread::make_contention_site_invalid(&m->csite); m->butex = bthread::butex_create_checked<unsigned>(); if (!m->butex) { return ENOMEM; } *m->butex = 0; return0; }
// Check width of user type before casting. template <typename T> T* butex_create_checked(){ BAIDU_CASSERT(sizeof(T) == sizeof(int), sizeof_T_must_equal_int); returnstatic_cast<T*>(butex_create()); }
// Create a butex which is a futex-like 32-bit primitive for synchronizing // bthreads/pthreads. // Returns a pointer to 32-bit data, NULL on failure. // NOTE: all butexes are private(not inter-process). void* butex_create(){ // 从objectpool中拿一个Butex对象 Butex* b = butil::get_object<Butex>(); if (b) { return &b->value; } returnNULL; }
const MutexInternal MUTEX_CONTENDED_RAW = {{1},{1},0}; const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0}; // Define as macros rather than constants which can't be put in read-only // section and affected by initialization-order fiasco. #define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW) #define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)
inlineintmutex_lock_contended(bthread_mutex_t* m){ butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex; // 由于当前已经有锁,那么锁住后 // 第一个调用whole->exchange(BTHREAD_MUTEX_CONTENDED)将返回BTHREAD_MUTEX_LOCKED // 后续调用的,将返回BTHREAD_MUTEX_CONTENDED while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR/*note*/) { // a mutex lock should ignore interrruptions in general since // user code is unlikely to check the return value. return errno; } } return0; }
intbutex_wait(void* arg, int expected_value, const timespec* abstime){ Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); if (b->value.load(butil::memory_order_relaxed) != expected_value) { errno = EWOULDBLOCK; // Sometimes we may take actions immediately after unmatched butex, // this fence makes sure that we see changes before changing butex. butil::atomic_thread_fence(butil::memory_order_acquire); return-1; } TaskGroup* g = tls_task_group; ButexBthreadWaiter bbw; // tid is 0 iff the thread is non-bthread bbw.tid = g->current_tid(); bbw.container.store(NULL, butil::memory_order_relaxed); bbw.task_meta = g->current_task(); bbw.sleep_id = 0; bbw.waiter_state = WAITER_STATE_READY; bbw.expected_value = expected_value; bbw.initial_butex = b; bbw.control = g->control();
// release fence matches with acquire fence in interrupt_and_consume_waiters // in task_group.cpp to guarantee visibility of `interrupted'. bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release); g->set_remained(wait_for_butex, &bbw); // 调度其他bthread执行 TaskGroup::sched(&g);
// erase_from_butex_and_wakeup (called by TimerThread) is possibly still // running and using bbw. The chance is small, just spin until it's done. BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, 30/*nops before sched_yield*/); // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw. // Spin until current_waiter != NULL. BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange( NULL, butil::memory_order_acquire) == NULL, 30/*nops before sched_yield*/);
bool is_interrupted = false; if (bbw.task_meta->interrupted) { // Race with set and may consume multiple interruptions, which are OK. bbw.task_meta->interrupted = false; is_interrupted = true; } // If timed out as well as value unmatched, return ETIMEDOUT. if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) { errno = ETIMEDOUT; return-1; } elseif (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) { errno = EWOULDBLOCK; return-1; } elseif (is_interrupted) { errno = EINTR; return-1; } return0; }
// 将bw塞到当前butex的waiter队列中 staticvoidwait_for_butex(void* arg){ ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg); Butex* const b = bw->initial_butex; // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY // before they're queued, otherwise the waiter is already timedout // and removed by TimerThread, in which case we should stop queueing. // // Visibility of waiter_state: // [bthread] [TimerThread] // waiter_state = TIMED // tt_lock { add task } // tt_lock { get task } // waiter_lock { waiter_state=TIMEDOUT } // waiter_lock { use waiter_state } // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is // sequenced by two locks, both threads are guaranteed to see the correct // value. { BAIDU_SCOPED_LOCK(b->waiter_lock); if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) { bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE; } elseif (bw->waiter_state == WAITER_STATE_READY/*1*/ && !bw->task_meta->interrupted) { b->waiters.Append(bw); bw->container.store(b, butil::memory_order_relaxed); return; } } // b->container is NULL which makes erase_from_butex_and_wakeup() and // TaskGroup::interrupt() no-op, there's no race between following code and // the two functions. The on-stack ButexBthreadWaiter is safe to use and // bw->waiter_state will not change again. unsleep_if_necessary(bw, get_global_timer_thread()); tls_task_group->ready_to_run(bw->tid); // FIXME: jump back to original thread is buggy. // // Value unmatched or waiter is already woken up by TimerThread, jump // // back to original bthread. // TaskGroup* g = tls_task_group; // ReadyToRunArgs args = { g->current_tid(), false }; // g->set_remained(TaskGroup::ready_to_run_in_worker, &args); // // 2: Don't run remained because we're already in a remained function // // otherwise stack may overflow. // TaskGroup::sched_to(&g, bw->tid, false/*2*/); }
bthread_mutex_unlock在有锁的情况又是如何的呢? 调用butex_wake->remove waiter from butex->waiters,然后将waiter->tid塞到调度队列中,此时whole的状态为[{0},{0},0]
inlineintmutex_lock_contended(bthread_mutex_t* m){ butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex; while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR/*note*/) { // a mutex lock should ignore interrruptions in general since // user code is unlikely to check the return value. return errno; } } return0; }