RISC-V OSを作ろう (10) ~ マルチコア (タスクスケジューリング)

執筆者 : 高橋 浩和

※ 「RISC-V OSを作ろう」連載記事一覧はこちら
※ 「RISC-V OS」のコードはgithubにて公開しています。


はじめに

今回は、マルチコアOSとして起動したRISC-V OS上で、タスクをスケジューリングする仕組みを見ていきます。

タスク定義

シングルコア環境であれば、CurrentTask変数を見ることによりどのタスクが実行中であるかを簡単に識別できましたが、マルチコア環境ではこの方法は使えません。

マルチコア環境用にタスク管理用のデータ構造TaskControlcoreidメンバ追加します。実行中のタスクはcoreidメンバにコア番号を登録し、実行されていないタスクにはCORE_UNASSIGNEDという値を持たせます。スケジューラは、このcoreidメンバを参照することによりタスクが実行中であるか否かを判断できます。

アイドルタスクはコア数分用意します。アイドルタスクを除きREADY状態のタスクが居なく無くなった時、全てのコアでアイドルタスクを実行する必要があります。

typedef enum {
    TASK1 = 0,
    TASK2,
    TASK3,
    TASK4,
    TASK5,
    TASKIDLE,
    TASKIDLE_CORE0 = TASKIDLE,
    TASKIDLE_CORE1,
    TASKIDLE_CORE2,
    NUMBER_OF_TASKS,
} TaskIdType;

struct TaskControl {
    enum { READY, BLOCKED } state;
    CoreIdType coreid;
    void (*entry)(void);
    long time_slice;
    long remaining_time;
    int expire;
    SemIdType target_sem;
    unsigned long sp;
    unsigned long task_kstack[KSTACKSIZE];
} TaskControl[NUMBER_OF_TASKS] = {
    {.coreid = CORE_UNASSIGNED, .entry = Task1, .state = READY, .time_slice = 2},
    {.coreid = CORE_UNASSIGNED, .entry = Task2, .state = READY, .time_slice = 4},
    {.coreid = CORE_UNASSIGNED, .entry = Task3, .state = READY, .time_slice = 1},
    {.coreid = CORE_UNASSIGNED, .entry = Task4, .state = READY, .time_slice = 3},
    {.coreid = CORE_UNASSIGNED, .entry = Task5, .state = READY, .time_slice = 1},
    {.coreid = CORE_UNASSIGNED, .entry = Idle, .state = READY, .time_slice = 1},
    {.coreid = CORE_UNASSIGNED, .entry = Idle, .state = READY, .time_slice = 1},
    {.coreid = CORE_UNASSIGNED, .entry = Idle, .state = READY, .time_slice = 1},
};

スケジューラ

次に実行するタスクの選択時(ChooseNextTask関数)にマルチコアを意識します。実行状態のタスク(いずれかのコアのカレントタスク)のTaskControlのcoreidメンバには、そのタスクを実行しているコアのIDを登録し、それ以外のタスクのcoreidメンバにはCORE_UNASSIGNEDを設定します。

  • ChooseNextTask関数はREADY状態のタスクのうち、どこのコアでも実行されていない(CORE_UNASSIGNED)タスクを次に動作するタスクとして選択するようにします(1)。
  • ChooseNextTask関数は実行対象から外れたタスクのcoreidメンバをCORE_UNASSIGNEDにし(2)、選択したタスクのcoreidにカレントコアのIDを登録します(3)。
  • 次に実行すべきタスクが見つからない時は、IDLEタスクを選択します。それぞれのコア専用のIDLEタスクが用意されているので、そのタスクを選びます(4)。
stateメンバ coreidメンバ タスク状態
READY コアID 実行中、いずれかのコアのCurrentTaskとなっている
READY CORE_UNASSIGNED 実行可能状態、ChooseNextTask関数が選択可能なタスク
BLOCKED CORE_UNASSIGNED 待機状態
static TaskIdType ChooseNextTask(void)
{
    /* roundrobin scheduling */
    TaskIdType task = CurrentTask;

    TaskControl[task].coreid = CORE_UNASSIGNED;             (2)

    do {
        task = (task + 1) % NUMBER_OF_TASKS;
    } while ((TaskControl[task].state != READY || TaskControl[task].coreid != CORE_UNASSIGNED || task >= TASKIDLE) && task != CurrentTask);  (1)

    if (TaskControl[task].state != READY) {
        task = TASKIDLE + ThisCore;                         (4)
    }
    TaskControl[task].coreid = ThisCore;                    (3)

    return task;
}

タイマ割り込み

ラウンドロビンスケジューリング処理は各々のコアで行ないます(A)。そのコアのカレントタスクのタイムスライス値の計算を行ない、タイムスライスを使い切ったら再スケジュールを要求します。

時限待ちをしているBLOCKED状態のタスクの制御はコア0が代表して行なうこととします(B)。BLOCKED状態である全てのタスクのexpireメンバの値から1を引き、expireメンバが0になった時点でそのタスクの状態をREADYに変更し、タスクの待機状態を解除します。

int Timer(void)
{
    TaskIdType task;

    _print_message("Timer\n");

    do {
        *reg_mtimecmp += INTERVAL;
    } while ((long)(*reg_mtime - *reg_mtimecmp) >= 0);

    if (ThisCore == CORE0) {                                (B)
        for (task = 0; task < NUMBER_OF_TASKS; task++) {
            if (TaskControl[task].state == BLOCKED && TaskControl[task].expire > 0) {
                if (--TaskControl[task].expire == 0) { 
                    TaskSetReady(task);
                }
            }
        }
    }

    if (--TaskControl[CurrentTask].remaining_time <= 0) {   (A)
        TaskControl[CurrentTask].remaining_time = TaskControl[CurrentTask].time_slice;
        return TRUE;
    }

    return FALSE;
}

コア間割り込み

Snoozeシステムコールにより時限待ち状態になっていたタスクの解除、セマフォ取得待ちになっていたタスクの待ち合わせ解除時にタスクがREADY状態に遷移します。このとき全コアにコア間割り込みを発生させ(broardcast_IPI関数)、再スケジューリングを要求します。

今回はアルゴリズムを単純にするために無条件で全コアに再スケジューリングを要求する実装を採用しましたが、相手コアの状態を見て、負荷が低い時のみ再スケジューリングを要求するという実装も可能です。

static void TaskSetReady(TaskIdType task)
{
    MemBarrier();
    TaskControl[task].state = READY;
    broardcast_IPI();
}

RISC-Vのソフトウェア割り込みをコア間割り込みとして利用します。コア毎にソフトウェア割り込みを発生させるためレジスタ*1が用意されており、ここに1を書き込むことで目的のコアに割り込みを発生させることができます(raise_IPI関数)*2

複数のコアから同時にある1つのコアにコア間割り込みを要求するタイミングが発生することもあります。その場合はコア間割り込みが縮退し、1回の割込み発生となります。しかし今回の用途では特に問題にならないため、対処は特に必要ありません。

volatile unsigned int * const reg_msip_base = ((unsigned int *)0x2000000U);

void raise_IPI(CoreIdType core)
{
    *(reg_msip_base + core) = 1U;
}

void broardcast_IPI(void)
{
    CoreIdType core = ThisCore + 1;
    do {
        raise_IPI(core);
        core = (core + 1) % NUMBER_OF_CORES;
    } while (core != ThisCore);
}

コア間割り込みが発生するとInterCoreInt関数が呼び出されます。InterCoreInt関数は、自コアが忙しければ再スケジューリング要求を無視することとします。 再スケジューリングによるオーバヘッドを減らすためにこのような実装にしましたが、これはOS実装の方針次第です。

InterCoreInt関数が終了する前にコア間割り込みをキャンセルする必要があります(clear_IPI関数)。この処理を行ないと、コア間割り込みが無限に発生し続けます。

static void clear_IPI(void)
{
    *(reg_msip_base + ThisCore) = 0U;
}

int InterCoreInt(void)
{
    _print_message("Inter-Core Interrupt\n");

    clear_IPI();

    if (CurrentTask >= TASKIDLE) {
        return TRUE;   /* reschedule request */
    }
    return FALSE;
}

スピンロック

コア間でアクセスが競合する資源はスピンロックで保護します。ロックを握ったコアのみが資源へのアクセスを許可されます。

ロック手順

SpinLock関数は引数で指定されたロック変数のロックを行ないます。ロック試行にはTestAndSetという関数を用います。TestAndSet関数はロックに成功した場合はゼロを、ロックに失敗した場合は非ゼロを返します。SpinLock関数は、既に他コアがロックを握っていた場合(TestAndSet関数が失敗した場合)、自コアがロックを握れるまで何度もロックを試みます。

TestAndSet関数でロック変数に設定する値としては「コア番号+1」を指定するようにしました。どこのコアからロックを行なっても常に1を設定する実装でも構わないのですが、問題発生時の解析の手助けとなるようにこのような実装にしました。

ロック失敗後のロック再試行は少し時間を空けて行なう方が良いです。連続したロック再試行はメモリバスに負担を掛け、他コアや各種コントローラからのメモリアクセスに悪影響を与えます。

ここではPause関数を呼び出すことで実現しています。Pause関数はfence.i命令を実行する関数です。fence.i命令は、命令パイプラインをフラッシュします。fence.i命令実行前にフェッチした命令列が破棄され、再度フェッチし直されます。今回はfence.i命令を使いましたが、ローカルコア内だけで完結する結果に影響を与えない命令(命令列)であれば、どんな処理でも構いません。

typedef volatile int Lock_t;

void SpinLock(Lock_t *lock)
{
    while (TestAndSet(lock, ThisCore + 1)) {
        Pause();
    }
}

TestAndSet関数は次のC関数の操作を不可分(アトミック)に行ないます。lockが指す変数が0であるか否かの確認と、lockが指す変数の更新を同時に行ないます。

int TestAndSet(Lock_t *lock, int newval)
{
    if ( *lock == 0 ) {
        *lock = newval;
        return 0;
    } else {
        return 1;
    }
}

この処理をC記述で実現することはできないため、マルチコア用の命令を用いて実現します。ここでは、load-reserved命令・store-conditional命令を利用して実現します。

    .global TestAndSet
    .type TestAndSet,@function
    .balign 4
TestAndSet:
    mv    a2, a0
    lr.w  a0, (a2)
    bne   a0, zero, 1f
    sc.w  a0, a1, (a2)
1:
    ret
    .size TestAndSet,.-TestAndSet

lr.w命令(load-reserved命令)でロック変数の値を読み込みます。この時同時にロック変数のアドレスがこのコアからの監視対象となります。正確にはロック変数を囲む一定のアドレス区間が監視対象(reserved状態)になります。ロック変数から読み出した値が0以外であった場合は既に他のコアによってロックされていることを意味するため、ここでTestAndSet関数をエラー終了させます。

sc.w命令(store-conditional命令)は、lr.w命令実行後にロック変数のあるアドレス区間に書き込みが行なわれていなかった時(reserved状態を維持)に限りロック変数の値を更新します。lr.w命令とsc.w命令の間に他コアからこのアドレス範囲への書き込みがあった場合、lr.w命令を実行したコアはこれを検出しreserved状態を解除します。これにより後に続くsc.w命令は失敗することになります。

load-reserved命令とstore-conditional命令

この仕組みはload-link/store-conditionalとも呼ばれており、他のCPUアーキテクチャでも採用されているメジャーな方法です。Wikipediaの説明も参考にしてください。

RISC-Vは、AMOと呼ばれるアトミック更新命令も提供しており、この命令を利用してTestAndSet関数を実装することも可能です。

アンロック手順

アンロック処理はロック変数の単純なゼロクリアで実現します。ただし、ロック変数クリアの前に行なわれたメモリ更新処理が全て完了すること保証するために、ロック変数クリア直前にMemBarrier関数呼び出しを置きます。

void SpinUnlock(Lock_t *lock)
{
    MemBarrier();
    *lock = 0;
}

スケジューラ間の排他

スケジューラは全てのコア上で独立して動作します。複数のスケジューラが同時に同じタスクに実行権を与えようとしてしまうことを防ぐために、ロック変数lock_schedを用意し、そのロック変数を握ることができたスケジューラのみが再スケジューリングの権限を得られるようにします。

static Lock_t lock_sched;

void _Schedule(void)
{
    SpinLock(&lock_sched);
    TaskIdType from = CurrentTask;
    CurrentTask = ChooseNextTask();
    if (from != CurrentTask) {
        TaskSwitch(&TaskControl[from], &TaskControl[CurrentTask]); 
    }
    SpinUnlock(&lock_sched);
}

初めて動作するタスクはTaskEntryから動き始めますが、この時lock_schedによるロックの解除を行なう必要があります。このタスクはスケジューラ_Schedule関数の出口を通らないため、ひとつ前に動いていたタスクが_Schedule関数の先頭で握ったロックをTaskEntryで解放してあげる必要があります。

static void TaskEntry(void)
{
    SpinUnlock(&lock_sched);
    TaskStart(TaskControl[CurrentTask].entry, &task_ustack[CurrentTask][USTACKSIZE]);
}

また、OS起動時もlock_schedによるロックを握った後に初期タスクを起動する必要があります。この初期タスクは_Scheduleを経由せずTaskEntryを呼び出します。TaskEntrylock_schedロック解除と対になるロック取得操作はここで行なう必要があります。

void main(void) {
             :
             :
    SpinLock(&lock_sched);
    CurrentTask = TASKIDLE + ThisCore;
    TaskControl[CurrentTask].coreid = ThisCore;
    load_context(&TaskControl[CurrentTask].sp);
}

lock_sched獲得・解放のタイミング

セマフォの排他

1つのセマフォを複数コアから同時に操作できないようセマフォ操作前にロックを取得するようにします。ロック変数は、それぞれのセマフォ毎に用意します(SemaphoreControllockメンバ)。

struct SemaphoreControl {
    TaskIdType owner_task;
    Lock_t lock;
} SemaphoreControl[NUMBER_OF_SEMS] = {
            :

セマフォ取得・解放操作時は、セマフォのlockメンバを使いロックします。スケジューラ呼び出し前にはセマフォのロックを解除します。

マルチコア環境では_AcquireSemaphoreにてセマフォ取得が失敗しスケジューラ_Scheduleを呼び出す(2)直前、セマフォのロックを解除(1)した瞬間に他コアで_ReleaseSemaphoreが呼び出されセマフォが解放される可能性があります。これはシングルコア環境では発生しない状態です。

セマフォが解放されると_AcquireSemaphore実行中のタスクがREADY状態に遷移します(3)。その場合、スケジューラ_Schedule(2)は次に動作するタスクとして_AcquireSemaphoreを呼び出したタスク自身(カレントタスク)を選択するかもしれません。

void _AcquireSemaphore(SemIdType sem)
{
    SpinLock(&SemaphoreControl[sem].lock);
    while (SemaphoreControl[sem].owner_task != SEM_AVAILABLE) {
        TaskControl[CurrentTask].target_sem = sem;
        TaskControl[CurrentTask].state = BLOCKED;
        SpinUnlock(&SemaphoreControl[sem].lock);   (1)
        _Schedule();                               (2)
        SpinLock(&SemaphoreControl[sem].lock);
    }
    SemaphoreControl[sem].owner_task = CurrentTask;
    SpinUnlock(&SemaphoreControl[sem].lock);
}

void _ReleaseSemaphore(SemIdType sem)
{
    TaskIdType task;
    SpinLock(&SemaphoreControl[sem].lock);
    SemaphoreControl[sem].owner_task = SEM_AVAILABLE;
    for (task = 0; task < NUMBER_OF_TASKS; task++) {
        if (TaskControl[task].state == BLOCKED && TaskControl[task].target_sem == sem) {
            TaskControl[task].target_sem = NO_SEM;
            TaskControl[task].expire = 0; /* XXX */
            SpinUnlock(&SemaphoreControl[sem].lock);
            _TaskUnblock(task);                    (3)
            SpinLock(&SemaphoreControl[sem].lock);
        }
    }
    SpinUnlock(&SemaphoreControl[sem].lock);
}

割り込みエントリ

割り込み共通エントリ

タイマ割り込みとコア間割り込みで共通の割り込みエントリを使用します。 割り込み発生毎にTPレジスタがコアローカル域の先頭を指すように計算し直します。「MSCRATCHレジスタ値 - _CLV_SIZE」がTPの値となります(A)。

割込みスタックはコア毎に用意されています。MSCRATCHレジスタの値から割込みスタックの底のアドレスを求めSPレジスタに設定します。「MSCRATCHレジスタ値 + _STACK_SIZE」がSPの値となります(B)(C)。発生したコア用の割込みスタックはそのコア用のメモリ領域内にあります。MSCRACHレジスタの値を元に割込みスタックの位置を計算します。

    .balign 4
int_handler:
    csrrw tp, mscratch, tp
    sd    t0, 5*8(tp)
    sd    t1, 6*8(tp)
    sd    t2, 7*8(tp)
    sd    t3, 8*8(tp)
    mv    t3, tp                                (B)

    la    t0, _CLV_SIZE
    sub   t0, tp, t0
    csrrw tp, mscratch, tp
    mv    tp, t0  /* restore tp, which might have been broken */  (A)
            :
    csrr  a0, mcause                            (D)
            :
    mv    s0, sp
    la    t0, _STACK_SIZE
    add   sp, t3, t0                            (C)
    jal   InterruptHandler                      (E)
            :
            :

割り込み共通ハンドラ

割り込み発生時、一旦すべての割り込みをInterruptHandlerハンドラで受け付け、そこからタイマ割り込みハンドラTimer、コア間割り込みハンドラInterCoreを呼び出すようにしました*3。割り込みエントリint_handlerは、mcauseレジスタの値を引数としてInterruptHandlerハンドラを呼び出します(D)(E)。

#define INT_MMODE_SOFT    3
#define INT_MMODE_TIMER   7

int InterruptHandler(unsigned long cause)
{
    switch ((unsigned short)cause) {
    case INT_MMODE_SOFT:
        return InterCoreInt();
        break;
    case INT_MMODE_TIMER:
        return Timer();
        break;
    default:
        break;
    }
    return FALSE;
}

print_message関数

print_message関数をマルチコア用に拡張します。

  • どのコアからの出力であるか識別できるように、出力文字にコア番号を追加することとします(1)。
  • 複数のコアから同時に出力した時、出力が交じらないようにします。ロック変数lock_uartを用意し、このロックを握ったコアのみが出力可能という実装にします(2)。

従来の_print_message関数を__print_message関数に名前を変更し、新規に作り直した_print_message関数の中でロックの制御およびコア番号出力の追加を行なった上で__print_message関数を呼び出すようにします。

void _print_message(const char *s, ...)
{
    unsigned long coreid = ThisCore;
    static Lock_t lock_uart;

    SpinLock(&lock_uart);                  (2)

    __print_message("core%x: ", &coreid);  (1)

    va_list ap;
    va_start (ap, s);
    __print_message(s, ap);
    va_end (ap);

    SpinUnlock(&lock_uart);                (2)
}

動かしてみよう

ここで紹介したプログラムはgithubにて公開しています。ここからプログラムをダウンロードし、添付されたmakefileを利用してビルドします。いつまでも名前がa.out OSのままでは寂しいので、食事をする哲学者を動かすOSということでsophiaというOS名にしました。

$ make
riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c main.c
riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c primitives.s
riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c start.s
riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c syscall.s
riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -msmall-data-limit=0 -c application.c
riscv64-unknown-elf-ld main.o primitives.o start.o syscall.o application.o -T riscv-virt.lds -o sophia

qemu起動時のオプションに-smp 3を指定し、3コアでOSを起動します。

3つのコアが起動し、それぞれのコア上でタスクが動作していることが分かります。ここまでの連載で使ってきたタスク群をマルチコア環境で動かしてみます。5タスクに対してコア3つを用意することはオーバスペックです。もっと多くのタスクを定義し、どのように動作するか確認してみてください。

$ qemu-system-riscv64 -smp 3 -nographic -machine virt -m 128M -kernel sophia -bios none
core1: Core1 started.
core2: Core2 started.
core0: Core0 started.
core2: Task2 Eating
core0: Task1 Meditating
core0: Task4 Eating
core1: Timer
core0: Timer
core2: Timer
core0: Timer
core2: Timer
core1: Timer
core0: Task1 Meditating
core2: Inter-Core Interrupt
core1: Inter-Core Interrupt
core2: Timer
core1: Timer
core0: Timer
core2: Inter-Core Interrupt
core1: Inter-Core Interrupt
core0: Task1 Meditating
core2: Timer
core1: Timer
core0: Timer
core2: Timer
core1: Timer
core0: Timer
core2: Inter-Core Interrupt
core1: Inter-Core Interrupt
core0: Task1 Eating
core2: Task3 Meditating
core2: Task5 Meditating
core2: Inter-Core Interrupt
core0: Inter-Core Interrupt
core1: Task4 Eating
core0: Timer
core2: Timer

最後に

少しでも単純で短いコードとなるように実装してきましたが、メモリ保護やマルチコアの機能まで実装すると、そこそこ複雑なコードになってしまいました。

READY状態かつTaskControlのcoreidがCORE_UNASSIGNEDのタスク以外はスケジューラが無視することを前提に、ロック変数lock_schedによる排他は最小限にしましたが、もしかしたら排他処理に考慮漏れがあるかもしれません。

次回の記事では、OSをRISC-V SBI(Supervisor Binary Interface)に対応させます。OSをスーパバイザモードで動かすことになります。更にその後の連載では、ハイパバイザ実装に切り込む予定です。

*1:CLINT コアローカル割込みコントローラのレジスタ

*2:割り込み番号3番 マシンソフトウェア割り込みが発生する

*3:せっかくベクタモード指定でトラップベクタテーブルを登録するようにしていたのですが、あまり有効に使えませんでした。