riscv-osqlock

Introduce riscv osqlock. (WIP)

riscv function

先看会被osq_lock使用到的riscv的函数。

__xchg

__xchg函数在 arch/riscv/include/asm/cmpxchg.h中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#define __xchg(ptr, new, size)						\
({ \
__typeof__(ptr) __ptr = (ptr); \
__typeof__(new) __new = (new); \
__typeof__(*(ptr)) __ret; \
switch (size) { \
case 4: \
__asm__ __volatile__ ( \
" amoswap.w.aqrl %0, %2, %1\n" \
: "=r" (__ret), "+A" (*__ptr) \
: "r" (__new) \
: "memory"); \
break; \
case 8: \
__asm__ __volatile__ ( \
" amoswap.d.aqrl %0, %2, %1\n" \
: "=r" (__ret), "+A" (*__ptr) \
: "r" (__new) \
: "memory"); \
break; \
default: \
BUILD_BUG(); \
} \
__ret; \
})

使用amo指令完成。

__cmpxchg_release

__cmpxchg_release 函数在 arch/riscv/include/asm/cmpxchg.h中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#define __cmpxchg_release(ptr, old, new, size)				\
({ \
__typeof__(ptr) __ptr = (ptr); \
__typeof__(*(ptr)) __old = (old); \
__typeof__(*(ptr)) __new = (new); \
__typeof__(*(ptr)) __ret; \
register unsigned int __rc; \
switch (size) { \
case 4: \
__asm__ __volatile__ ( \
RISCV_RELEASE_BARRIER \
"0: lr.w %0, %2\n" \
" bne %0, %z3, 1f\n" \
" sc.w %1, %z4, %2\n" \
" bnez %1, 0b\n" \
"1:\n" \
: "=&r" (__ret), "=&r" (__rc), "+A" (*__ptr) \
: "rJ" ((long)__old), "rJ" (__new) \
: "memory"); \
break; \
case 8: \
__asm__ __volatile__ ( \
RISCV_RELEASE_BARRIER \
"0: lr.d %0, %2\n" \
" bne %0, %z3, 1f\n" \
" sc.d %1, %z4, %2\n" \
" bnez %1, 0b\n" \
"1:\n" \
: "=&r" (__ret), "=&r" (__rc), "+A" (*__ptr) \
: "rJ" (__old), "rJ" (__new) \
: "memory"); \
break; \
default: \
BUILD_BUG(); \
} \
__ret; \
})

使用lrsc指令完成,代码逻辑是:

  • 从ptr地址中lr 一个值到__ret中
  • 判断是否与old相等
    • 如果与old值不等,则跳到1f,退出函数,并返回__ret
    • 如果与old值相等,则执行sc,把new写进ptr
      • 如果sc返回值不等于0,也就是sc失败,跳回0,重试lr
      • 如果sc返回值等于0,也就是sc成功,退出函数,返回__ret

osq_lock

osq_lock的代码在kernel的源码如下位置

1
2
kernel/locking/osq_lock.c
include/linux/osq_lock.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* An MCS like lock especially tailored for optimistic spinning for sleeping
* lock implementations (mutex, rwsem, etc).
*/
struct optimistic_spin_node {
struct optimistic_spin_node *next, *prev;
int locked; /* 1 if lock acquired */
int cpu; /* encoded CPU # + 1 value */
};

struct optimistic_spin_queue {
/*
* Stores an encoded value of the CPU # of the tail node in the queue.
* If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.
*/
atomic_t tail;
};

#define OSQ_UNLOCKED_VAL (0)

/* Init macro and function. */
#define OSQ_LOCK_UNLOCKED { ATOMIC_INIT(OSQ_UNLOCKED_VAL) }

static inline void osq_lock_init(struct optimistic_spin_queue *lock)
{
atomic_set(&lock->tail, OSQ_UNLOCKED_VAL);
}

osq_lock使用两个数据结构维护:

  • optimistic_spin_queue,里面就一个变量:
    • tail,用来指示最后一个cpu是谁。0表示没有cpu,1表示cpu0
  • optimistic_spin_node,里面是:
    • 一个双向链表,用来维护所有申请锁的cpu node
    • locked,1表示占住了锁,0表示未占住锁
    • cpu,cpu的id。0表示没有cpu,1表示cpu0.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
bool osq_lock(struct optimistic_spin_queue *lock)
{
struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
struct optimistic_spin_node *prev, *next;
int curr = encode_cpu(smp_processor_id());
int old;

node->locked = 0;
node->next = NULL;
node->cpu = curr;

// 快速通道
// 这里是通过riscv atomic指令来完成的。
// 当锁没有被任何cpu占用,第一个cpu过来申请时,从该通道申请。
// lock->tail 默认是OSQ_UNLOCKED_VAL
// 第一个cpu来申请时,将curr(也就是cpuid)存入lock->tail中
// 并将原来的lock->tail中的值,swap出来,也就是OSQ_UNLOCKED_VAL
// 申请成功,返回。
// 当锁被cpu0占用时,lock->tail的值等于1,此时cpu1来申请:
// 将2存入lock->tail,并将1拿出来,放入了old
// 这样就知道cpu1没有申请成功,进入下面的申请流程
// 而且从old的值为1,可以知道最后一个申请的cpu是cpu0
/*
* We need both ACQUIRE (pairs with corresponding RELEASE in
* unlock() uncontended, or fastpath) and RELEASE (to publish
* the node fields we just initialised) semantics when updating
* the lock tail.
*/
old = atomic_xchg(&lock->tail, curr);
if (old == OSQ_UNLOCKED_VAL)
return true;

prev = decode_cpu(old);
node->prev = prev;

/*
* osq_lock() unqueue
*
* node->prev = prev osq_wait_next()
* WMB MB
* prev->next = node next->prev = prev // unqueue-C
*
* Here 'node->prev' and 'next->prev' are the same variable and we need
* to ensure these stores happen in-order to avoid corrupting the list.
*/
smp_wmb();

WRITE_ONCE(prev->next, node);

/*
* Normally @prev is untouchable after the above store; because at that
* moment unlock can proceed and wipe the node element from stack.
*
* However, since our nodes are static per-cpu storage, we're
* guaranteed their existence -- this allows us to apply
* cmpxchg in an attempt to undo our queueing.
*/

/*
* Wait to acquire the lock or cancellation. Note that need_resched()
* will come with an IPI, which will wake smp_cond_load_relaxed() if it
* is implemented with a monitor-wait. vcpu_is_preempted() relies on
* polling, be careful.
*/
if (smp_cond_load_relaxed(&node->locked, VAL || need_resched() ||
vcpu_is_preempted(node_cpu(node->prev))))
return true;

/* unqueue */
/*
* Step - A -- stabilize @prev
*
* Undo our @prev->next assignment; this will make @prev's
* unlock()/unqueue() wait for a next pointer since @lock points to us
* (or later).
*/

for (;;) {
/*
* cpu_relax() below implies a compiler barrier which would
* prevent this comparison being optimized away.
*/
if (data_race(prev->next) == node &&
cmpxchg(&prev->next, node, NULL) == node)
break;

/*
* We can only fail the cmpxchg() racing against an unlock(),
* in which case we should observe @node->locked becoming
* true.
*/
if (smp_load_acquire(&node->locked))
return true;

cpu_relax();

/*
* Or we race against a concurrent unqueue()'s step-B, in which
* case its step-C will write us a new @node->prev pointer.
*/
prev = READ_ONCE(node->prev);
}

/*
* Step - B -- stabilize @next
*
* Similar to unlock(), wait for @node->next or move @lock from @node
* back to @prev.
*/

next = osq_wait_next(lock, node, prev);
if (!next)
return false;

/*
* Step - C -- unlink
*
* @prev is stable because its still waiting for a new @prev->next
* pointer, @next is stable because our @node->next pointer is NULL and
* it will wait in Step-A.
*/

WRITE_ONCE(next->prev, prev);
WRITE_ONCE(prev->next, next);

return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void osq_unlock(struct optimistic_spin_queue *lock)
{
struct optimistic_spin_node *node, *next;
int curr = encode_cpu(smp_processor_id());

// 快速通道
// 使用lrsc指令完成
// 当只有cpu0申请锁时:
// 读取lock->tail,得到1,和curr相等,就将OSQ_UNLOCKED_VAL写入lock->tail。
// 当有cpu0和cpu1申请锁,并且锁被cpu0占用时:
// 读取lock->tail,得到2,和curr不相等,不做sc,不return,走下面的解锁逻辑。
/*
* Fast path for the uncontended case.
*/
if (likely(atomic_cmpxchg_release(&lock->tail, curr,
OSQ_UNLOCKED_VAL) == curr))
return;

/*
* Second most likely case.
*/
node = this_cpu_ptr(&osq_node);
next = xchg(&node->next, NULL);
if (next) {
WRITE_ONCE(next->locked, 1);
return;
}

next = osq_wait_next(lock, node, NULL);
if (next)
WRITE_ONCE(next->locked, 1);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
* Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
* Can return NULL in case we were the last queued and we updated @lock instead.
*/
static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
struct optimistic_spin_node *node,
struct optimistic_spin_node *prev)
{
struct optimistic_spin_node *next = NULL;
int curr = encode_cpu(smp_processor_id());
int old;

/*
* If there is a prev node in queue, then the 'old' value will be
* the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
* we're currently last in queue, then the queue will then become empty.
*/
old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;

for (;;) {
if (atomic_read(&lock->tail) == curr &&
atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
/*
* We were the last queued, we moved @lock back. @prev
* will now observe @lock and will complete its
* unlock()/unqueue().
*/
break;
}

/*
* We must xchg() the @node->next value, because if we were to
* leave it in, a concurrent unlock()/unqueue() from
* @node->next might complete Step-A and think its @prev is
* still valid.
*
* If the concurrent unlock()/unqueue() wins the race, we'll
* wait for either @lock to point to us, through its Step-B, or
* wait for a new @node->next from its Step-C.
*/
if (node->next) {
next = xchg(&node->next, NULL);
if (next)
break;
}

cpu_relax();
}

return next;
}

总结来看,当只有一个cpu来抢锁时,只用optimistic_spin_queue来维护,它保存了cpuid;当多个cpu抢锁时,就需要用optimistic_spin_queueoptimistic_spin_node配合维护,tail中保存最后一个申请锁的cpu,node用链表将所有申请的cpu链接起来。

参考

https://carlyleliu.github.io/2021/Linux%E5%B9%B6%E5%8F%91%E4%B8%8E%E5%90%8C%E6%AD%A5%EF%BC%88%E4%BA%8C%EF%BC%89%E8%87%AA%E6%97%8B%E9%94%81/

https://blog.csdn.net/jianchi88/article/details/123249958