brpc源码解析:butex

butex可以说是与brpc架构紧密结合

简介

butex是brpc协程(bthread)间的一种同步机制,与mutex/pthread关系类似。linux中的mutex是由futex来实现的,butex也是参考futex的设计的。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Mutex {
public:
typedef bthread_mutex_t* native_handler_type;
Mutex() {
int ec = bthread_mutex_init(&_mutex, NULL);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex constructor failed");
}
}
~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
native_handler_type native_handler() { return &_mutex; }
void lock() {
int ec = bthread_mutex_lock(&_mutex);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock failed");
}
}
void unlock() { bthread_mutex_unlock(&_mutex); }
bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
// TODO(chenzhangyi01): Complement interfaces for C++11
private:
DISALLOW_COPY_AND_ASSIGN(Mutex);
bthread_mutex_t _mutex;
};


// bthread_mutex_t define
typedef struct {
unsigned* butex;
bthread_contention_site_t csite;
} bthread_mutex_t;
  1. 创建Mutex对象时,会调用bthread_mutex_init(&_mutex, NULL)函数来初始化_mutex,与pthread中的pthread_mutex_init一样
  2. Mutex::lock时调用bthread_mutex_lockMutex::unlock则调用bthrad_mutex_unlock,基本与pthread的mutex的lock与unlock方法一致

深入细节

bthread_mutex_init

bthread_mutex_init -> bthread_create_checked -> butex_create() -> butil::get_object<Butex>,最终返回的是 &Butex->value,即value成员变量的地址,需要注意这里的value是一个atomic<int>对象,后续会有相关的内存可见性和读写的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int bthread_mutex_init(bthread_mutex_t* __restrict m,
const bthread_mutexattr_t* __restrict) {
bthread::make_contention_site_invalid(&m->csite);
m->butex = bthread::butex_create_checked<unsigned>();
if (!m->butex) {
return ENOMEM;
}
*m->butex = 0;
return 0;
}


// Check width of user type before casting.
template <typename T> T* butex_create_checked() {
BAIDU_CASSERT(sizeof(T) == sizeof(int), sizeof_T_must_equal_int);
return static_cast<T*>(butex_create());
}

// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
// NOTE: all butexes are private(not inter-process).
void* butex_create() {
// 从objectpool中拿一个Butex对象
Butex* b = butil::get_object<Butex>();
if (b) {
return &b->value;
}
return NULL;
}


struct BAIDU_CACHELINE_ALIGNMENT Butex {
Butex() {}
~Butex() {}

butil::atomic<int> value;
ButexWaiterList waiters;
internal::FastPthreadMutex waiter_lock;
};

bthread_mutex_lock/bthread_mutex_unlock

首先需要注意到MutexInternal这个结构是butil::atomic<int> Butex::value的拆分,所以这里split->locked其实也是改变butex->value.

没有锁竞争的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int bthread_mutex_lock(bthread_mutex_t* m) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
//在没有锁的情况下,locked初始值为0,所以在这里就直接返回了
return 0;
}
// Don't sample when contention profiler is off.
return bthread::mutex_lock_contended(m);
}

// Implement bthread_mutex_t related functions
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
butil::static_atomic<unsigned char> contended;
unsigned short padding;
};

const MutexInternal MUTEX_CONTENDED_RAW = {{1},{1},0};
const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
// Define as macros rather than constants which can't be put in read-only
// section and affected by initialization-order fiasco.
#define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)

下面是bthread_mutex_unlock的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int bthread_mutex_unlock(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
bthread_contention_site_t saved_csite = {0, 0};
if (bthread::is_contention_site_valid(m->csite)) {
saved_csite = m->csite;
bthread::make_contention_site_invalid(&m->csite);
}
const unsigned prev = whole->exchange(0, butil::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
// 没有竞争的情况下,这里就直接返回了
if (prev == BTHREAD_MUTEX_LOCKED) {
return 0;
}
// Wakeup one waiter
if (!bthread::is_contention_site_valid(saved_csite)) {
bthread::butex_wake(whole);
return 0;
}
const int64_t unlock_start_ns = butil::cpuwide_time_ns();
bthread::butex_wake(whole);
const int64_t unlock_end_ns = butil::cpuwide_time_ns();
saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
bthread::submit_contention(saved_csite, unlock_end_ns);
return 0;
}

在没有锁的情况下,lock操作将locked这个字节变为1,unlock的情况下,若将整个Butex::value exchange为0,且prev值=BTHREAD_MUTEX_LOCKED [{1},{0},0],则说明当前只有一把锁,可以直接解锁了

锁竞争的情况

bthread_mutex_lock在有锁的情况下,会执行mutex_lock_contended的逻辑,从而调用到butex_wait->TaskGroup::sched->wait_for_butex,最终当前bthread被挂起,并创建ButexBthreadWaiter塞到Butex的waiter队列里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
inline int mutex_lock_contended(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
// 由于当前已经有锁,那么锁住后
// 第一个调用whole->exchange(BTHREAD_MUTEX_CONTENDED)将返回BTHREAD_MUTEX_LOCKED
// 后续调用的,将返回BTHREAD_MUTEX_CONTENDED
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interrruptions in general since
// user code is unlikely to check the return value.
return errno;
}
}
return 0;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
// Sometimes we may take actions immediately after unmatched butex,
// this fence makes sure that we see changes before changing butex.
butil::atomic_thread_fence(butil::memory_order_acquire);
return -1;
}
TaskGroup* g = tls_task_group;
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
bbw.tid = g->current_tid();
bbw.container.store(NULL, butil::memory_order_relaxed);
bbw.task_meta = g->current_task();
bbw.sleep_id = 0;
bbw.waiter_state = WAITER_STATE_READY;
bbw.expected_value = expected_value;
bbw.initial_butex = b;
bbw.control = g->control();


// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw);
// 调度其他bthread执行
TaskGroup::sched(&g);

// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
// running and using bbw. The chance is small, just spin until it's done.
BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
30/*nops before sched_yield*/);

// If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
// Spin until current_waiter != NULL.
BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
NULL, butil::memory_order_acquire) == NULL,
30/*nops before sched_yield*/);

bool is_interrupted = false;
if (bbw.task_meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
bbw.task_meta->interrupted = false;
is_interrupted = true;
}
// If timed out as well as value unmatched, return ETIMEDOUT.
if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
errno = ETIMEDOUT;
return -1;
} else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
errno = EWOULDBLOCK;
return -1;
} else if (is_interrupted) {
errno = EINTR;
return -1;
}
return 0;
}

// 将bw塞到当前butex的waiter队列中
static void wait_for_butex(void* arg) {
ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
// and removed by TimerThread, in which case we should stop queueing.
//
// Visibility of waiter_state:
// [bthread] [TimerThread]
// waiter_state = TIMED
// tt_lock { add task }
// tt_lock { get task }
// waiter_lock { waiter_state=TIMEDOUT }
// waiter_lock { use waiter_state }
// tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
// sequenced by two locks, both threads are guaranteed to see the correct
// value.
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw);
bw->container.store(b, butil::memory_order_relaxed);
return;
}
}

// b->container is NULL which makes erase_from_butex_and_wakeup() and
// TaskGroup::interrupt() no-op, there's no race between following code and
// the two functions. The on-stack ButexBthreadWaiter is safe to use and
// bw->waiter_state will not change again.
unsleep_if_necessary(bw, get_global_timer_thread());
tls_task_group->ready_to_run(bw->tid);
// FIXME: jump back to original thread is buggy.

// // Value unmatched or waiter is already woken up by TimerThread, jump
// // back to original bthread.
// TaskGroup* g = tls_task_group;
// ReadyToRunArgs args = { g->current_tid(), false };
// g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
// // 2: Don't run remained because we're already in a remained function
// // otherwise stack may overflow.
// TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

bthread_mutex_unlock在有锁的情况又是如何的呢?
调用butex_wake->remove waiter from butex->waiters,然后将waiter->tid塞到调度队列中,此时whole的状态为[{0},{0},0]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int bthread_mutex_unlock(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
bthread_contention_site_t saved_csite = {0, 0};
if (bthread::is_contention_site_valid(m->csite)) {
saved_csite = m->csite;
bthread::make_contention_site_invalid(&m->csite);
}
const unsigned prev = whole->exchange(0, butil::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
if (prev == BTHREAD_MUTEX_LOCKED) {
return 0;
}
// Wakeup one waiter
if (!bthread::is_contention_site_valid(saved_csite)) {
bthread::butex_wake(whole);
return 0;
}
const int64_t unlock_start_ns = butil::cpuwide_time_ns();
bthread::butex_wake(whole);
const int64_t unlock_end_ns = butil::cpuwide_time_ns();
saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
bthread::submit_contention(saved_csite, unlock_end_ns);
return 0;
}

int butex_wake(void* arg) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
ButexWaiter* front = NULL;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->waiters.empty()) {
return 0;
}
front = b->waiters.head()->value();
front->RemoveFromList();
front->container.store(NULL, butil::memory_order_relaxed);
}
if (front->tid == 0) {
wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
return 1;
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
}
return 1;
}

原来被挂起的bthread被重新调度后,此时butex_wait返回0,重新执行这个while循环,但此时whole=[{0},{0},0],因此可以跳出这个循环,返回0,本bthread将获取到锁,锁的状态重新变为whole=[{1},{1},0]。当bthread再继续调用bthread_mutex_unlock后,那么锁的状态将变为0,但因为锁之前的状态为whole=[{1},{1},0],因此会再调用一次butex_wake

1
2
3
4
5
6
7
8
9
10
11
12
inline int mutex_lock_contended(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interrruptions in general since
// user code is unlikely to check the return value.
return errno;
}
}
return 0;
}