posix-futex

posix-futex解析

Futex(Fast Userspace muTEX),又名快速用户区互斥,是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具。在Unikraft中,也基本实现了该功能。
本模块基本实现了wait、wake、requeue基本操作,同时提供了futex系统调用。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct uk_futex {

    uint32_t *uaddr; /** The futex address. */

    struct uk_thread *thread; /** The thread waiting on the futex. */

    struct uk_list_head list_node; /** The list containing all the futexes

                     * on which the threads are waiting.

                     */

};

static UK_LIST_HEAD(futex_list);

static uk_spinlock futex_list_lock = UK_SPINLOCK_INITIALIZER();
  • uaddr : 锁地址
  • thread : 等待该锁的线程
  • list_node : List包括所有等待中的线程的锁
  • futex_list : 存放futex的List
  • futex_list_lock : List的自旋锁。

futex_wait

输入的三个参数说明如下所示。

  • uaddr : futex用户空间地址
  • val : 期望的值
  • tm : timespec结构体,包含线程被阻塞后的超时时间。如果为空,则线程无限期阻塞。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    static int futex_wait(uint32_t *uaddr, uint32_t val, const struct timespec *tm)

    {

        // irqf : 中断标志位 用于保存中断标志

        unsigned long irqf;

        // itr, tmp : List头

        struct uk_list_head *itr, *tmp;

        // futex : 锁

        struct uk_futex *f_tmp;

        // current : 当前调用的线程

        struct uk_thread *current = uk_thread_current();

        // f : 锁

        struct uk_futex f = {.uaddr = uaddr, .thread = current};



        // 从指定内存读取变量

        if (ukarch_load_n(uaddr) == val) {

            irqf = ukplat_lcpu_save_irqf(); // 关中断,保存标志位

            uk_spin_lock(&futex_list_lock); // 对futex_List上锁



            /* Enqueue thread to wait list */

            // 将f添加到futex_list的队尾, f.list_node在哪被赋值?

            uk_list_add_tail(&f.list_node, &futex_list);

            // 有始有终

            uk_spin_unlock(&futex_list_lock);

            ukplat_lcpu_restore_irqf(irqf);



            if (tm)

                /* Block for at most timeout nanosecs */

                uk_thread_block_timeout(current, tm->tv_nsec);

            else

                /* Block indefinitely */

                uk_thread_block(current);



            // 让出CPU

            uk_sched_yield();

            // 如果没回来,说明等待的条件已经满足

            // 此时过了一定时间,回到该阻塞线程内 说明已经超时

            irqf = ukplat_lcpu_save_irqf();

            uk_spin_lock(&futex_list_lock);



            /* If the futex is still in the wait list, then it timed out */

            uk_list_for_each_safe(itr, tmp, &futex_list) {

                f_tmp = uk_list_entry(itr, struct uk_futex, list_node);



                if (f_tmp->uaddr == uaddr && f_tmp->thread == current) {

                    /* Remove the thread from the futex list */

                    uk_list_del(&f_tmp->list_node);

                    uk_spin_unlock(&futex_list_lock);

                    ukplat_lcpu_restore_irqf(irqf);



                    return -ETIMEDOUT;

                }

            }



            uk_spin_unlock(&futex_list_lock);

            ukplat_lcpu_restore_irqf(irqf);



            return 0;

        }



        /* Futex word does not contain expected val */

        return -EAGAIN;

    }
    总结一下futex_wait流程。
  • 检查*uaddr的值是否等于val,如果不相等则立刻返回。
  • 对队列上自旋锁
  • 将futex加入到等待队列
  • 阻塞对应线程
  • 等待阻塞时间结束,唤醒线程。
    与Linux的futex_wait相比,unikraft简单很多,全局的等待队列也不是使用hash实现。

futex_wake

futex_wake就比较简单,uaddr是对应期望值的地址,val是最大唤醒线程数(与wait函数中的不一样)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
static int futex_wake(uint32_t *uaddr, uint32_t val)

{

    // 相关变量声明

    unsigned long irqf;

    struct uk_list_head *itr, *tmp;

    struct uk_futex *f;

    uint32_t count = 0;



    irqf = ukplat_lcpu_save_irqf();

    uk_spin_lock(&futex_list_lock);



    // 遍历等待List

    uk_list_for_each_safe(itr, tmp, &futex_list) {

        f = uk_list_entry(itr, struct uk_futex, list_node);



        // 若当前等待thread f->uaddr等于uaddr, 即等待函数参数指定的val。

        if (f->uaddr == uaddr) {

            /* Remove the thread from the futex list */

            uk_list_del(&f->list_node);



            /* TODO: Replace with uk_thread_wakeup when the new

             * scheduler API is ready

             */

            uk_thread_wake(f->thread);



            // val 限制了最大唤醒线程数量

            /* Wake at most val threads */

            if (++count >= val)

                break;

        }

    }



    uk_spin_unlock(&futex_list_lock);

    ukplat_lcpu_restore_irqf(irqf);



    return (int) count;

}

futex_cmp_requeue

futex_cmp_requeue函数比较复杂。参数说明如下所示。

  • uaddr : 源futex用户地址
  • val : 唤醒等待线程的数量
  • val2 : 需要重排的线程数量
  • uaddr2 : 目的futex用户地址
  • val3 : uaddr地址期待的变量值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    static int futex_cmp_requeue(uint32_t *uaddr, uint32_t val, uint32_t val2,

                     uint32_t *uaddr2, uint32_t val3)

    {

        // 相关变量的初始化

        unsigned long irqf;

        struct uk_list_head *itr, *tmp;

        struct uk_futex *f;

        // 唤醒线程数 目的数是val

        int woken_uaddr1;

        uint32_t waiters_uaddr2 = 0;



        // val3 != *(uaddr) 立刻返回

        if (!((uint32_t)val3 == ukarch_load_n(uaddr)))

            return -EAGAIN;



        // 唤醒val个等待的线程

        /* Wake up val waiters on uaddr */

        woken_uaddr1 = futex_wake(uaddr, val);



        // 如果val2 == 0 则不需要requeue 该函数的任务到此结束 返回

        if (!val2)

            return woken_uaddr1;



        // 保存中断标志 对futex_list_lock上锁

        irqf = ukplat_lcpu_save_irqf();

        uk_spin_lock(&futex_list_lock);



        // 将avl2个等待的线程 将其等待的futex锁从uaddr更改为uaddr2

        /* Requeue val2 waiters on uaddr2 */

        uk_list_for_each_safe(itr, tmp, &futex_list) {

            f = uk_list_entry(itr, struct uk_futex, list_node);



            if (f->uaddr == uaddr) {

                /* Requeue thread to uaddr2 */

                uk_list_del(&f->list_node);

                f->uaddr = uaddr2;

                uk_list_add_tail(&f->list_node, &futex_list);



                /* Requeue at most val2 threads */

                if (++waiters_uaddr2 >= val2)

                    break;

            }

        }



        uk_spin_unlock(&futex_list_lock);

        ukplat_lcpu_restore_irqf(irqf);



        // 返回 唤醒线程数和重排数 一定是>=0 代表执行成功

        return woken_uaddr1 + waiters_uaddr2;

    }

最后则是对三个函数封装出同一个接口,通过futex_op进行选择操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
UK_LLSYSCALL_R_DEFINE(int, futex, uint32_t *, uaddr, int, futex_op,

              uint32_t, val, const struct timespec *, timeout,

              uint32_t *, uaddr2, uint32_t, val3)

{

    switch (futex_op) {

    case FUTEX_WAIT:

    case FUTEX_WAIT_PRIVATE:

        return futex_wait(uaddr, val, timeout);



    case FUTEX_WAKE:

    case FUTEX_WAKE_PRIVATE:

        return futex_wake(uaddr, val);



    case FUTEX_FD:

    case FUTEX_REQUEUE:

        return -ENOSYS;



    case FUTEX_CMP_REQUEUE:

        return futex_cmp_requeue(uaddr, val, (unsigned long)timeout,

                     uaddr2, val3);



    default:

        return -ENOSYS;

        /* TODO: other operations? */

    }

}