執筆者 : 高橋 浩和
※ 「RISC-V OSを作ろう」連載記事一覧はこちら
※ 「RISC-V OS」のコードはgithubにて公開しています。
はじめに
今回は、マルチコアOSとして起動したRISC-V OS上で、タスクをスケジューリングする仕組みを見ていきます。
タスク定義
シングルコア環境であれば、CurrentTask変数を見ることによりどのタスクが実行中であるかを簡単に識別できましたが、マルチコア環境ではこの方法は使えません。
マルチコア環境用にタスク管理用のデータ構造TaskControl
にcoreid
メンバ追加します。実行中のタスクはcoreid
メンバにコア番号を登録し、実行されていないタスクにはCORE_UNASSIGNEDという値を持たせます。スケジューラは、このcoreid
メンバを参照することによりタスクが実行中であるか否かを判断できます。
アイドルタスクはコア数分用意します。アイドルタスクを除きREADY状態のタスクが居なく無くなった時、全てのコアでアイドルタスクを実行する必要があります。
typedef enum { TASK1 = 0, TASK2, TASK3, TASK4, TASK5, TASKIDLE, TASKIDLE_CORE0 = TASKIDLE, TASKIDLE_CORE1, TASKIDLE_CORE2, NUMBER_OF_TASKS, } TaskIdType; struct TaskControl { enum { READY, BLOCKED } state; CoreIdType coreid; void (*entry)(void); long time_slice; long remaining_time; int expire; SemIdType target_sem; unsigned long sp; unsigned long task_kstack[KSTACKSIZE]; } TaskControl[NUMBER_OF_TASKS] = { {.coreid = CORE_UNASSIGNED, .entry = Task1, .state = READY, .time_slice = 2}, {.coreid = CORE_UNASSIGNED, .entry = Task2, .state = READY, .time_slice = 4}, {.coreid = CORE_UNASSIGNED, .entry = Task3, .state = READY, .time_slice = 1}, {.coreid = CORE_UNASSIGNED, .entry = Task4, .state = READY, .time_slice = 3}, {.coreid = CORE_UNASSIGNED, .entry = Task5, .state = READY, .time_slice = 1}, {.coreid = CORE_UNASSIGNED, .entry = Idle, .state = READY, .time_slice = 1}, {.coreid = CORE_UNASSIGNED, .entry = Idle, .state = READY, .time_slice = 1}, {.coreid = CORE_UNASSIGNED, .entry = Idle, .state = READY, .time_slice = 1}, };
スケジューラ
次に実行するタスクの選択時(ChooseNextTask
関数)にマルチコアを意識します。実行状態のタスク(いずれかのコアのカレントタスク)のTaskControlのcoreidメンバには、そのタスクを実行しているコアのIDを登録し、それ以外のタスクのcoreidメンバにはCORE_UNASSIGNEDを設定します。
ChooseNextTask
関数はREADY状態のタスクのうち、どこのコアでも実行されていない(CORE_UNASSIGNED)タスクを次に動作するタスクとして選択するようにします(1)。ChooseNextTask
関数は実行対象から外れたタスクのcoreidメンバをCORE_UNASSIGNEDにし(2)、選択したタスクのcoreidにカレントコアのIDを登録します(3)。- 次に実行すべきタスクが見つからない時は、IDLEタスクを選択します。それぞれのコア専用のIDLEタスクが用意されているので、そのタスクを選びます(4)。
stateメンバ | coreidメンバ | タスク状態 |
---|---|---|
READY | コアID | 実行中、いずれかのコアのCurrentTaskとなっている |
READY | CORE_UNASSIGNED | 実行可能状態、ChooseNextTask関数が選択可能なタスク |
BLOCKED | CORE_UNASSIGNED | 待機状態 |
static TaskIdType ChooseNextTask(void) { /* roundrobin scheduling */ TaskIdType task = CurrentTask; TaskControl[task].coreid = CORE_UNASSIGNED; (2) do { task = (task + 1) % NUMBER_OF_TASKS; } while ((TaskControl[task].state != READY || TaskControl[task].coreid != CORE_UNASSIGNED || task >= TASKIDLE) && task != CurrentTask); (1) if (TaskControl[task].state != READY) { task = TASKIDLE + ThisCore; (4) } TaskControl[task].coreid = ThisCore; (3) return task; }
タイマ割り込み
ラウンドロビンスケジューリング処理は各々のコアで行ないます(A)。そのコアのカレントタスクのタイムスライス値の計算を行ない、タイムスライスを使い切ったら再スケジュールを要求します。
時限待ちをしているBLOCKED状態のタスクの制御はコア0が代表して行なうこととします(B)。BLOCKED状態である全てのタスクのexpireメンバの値から1を引き、expireメンバが0になった時点でそのタスクの状態をREADYに変更し、タスクの待機状態を解除します。
int Timer(void) { TaskIdType task; _print_message("Timer\n"); do { *reg_mtimecmp += INTERVAL; } while ((long)(*reg_mtime - *reg_mtimecmp) >= 0); if (ThisCore == CORE0) { (B) for (task = 0; task < NUMBER_OF_TASKS; task++) { if (TaskControl[task].state == BLOCKED && TaskControl[task].expire > 0) { if (--TaskControl[task].expire == 0) { TaskSetReady(task); } } } } if (--TaskControl[CurrentTask].remaining_time <= 0) { (A) TaskControl[CurrentTask].remaining_time = TaskControl[CurrentTask].time_slice; return TRUE; } return FALSE; }
コア間割り込み
Snoozeシステムコールにより時限待ち状態になっていたタスクの解除、セマフォ取得待ちになっていたタスクの待ち合わせ解除時にタスクがREADY状態に遷移します。このとき全コアにコア間割り込みを発生させ(broardcast_IPI
関数)、再スケジューリングを要求します。
今回はアルゴリズムを単純にするために無条件で全コアに再スケジューリングを要求する実装を採用しましたが、相手コアの状態を見て、負荷が低い時のみ再スケジューリングを要求するという実装も可能です。
static void TaskSetReady(TaskIdType task) { MemBarrier(); TaskControl[task].state = READY; broardcast_IPI(); }
RISC-Vのソフトウェア割り込みをコア間割り込みとして利用します。コア毎にソフトウェア割り込みを発生させるためレジスタ*1が用意されており、ここに1を書き込むことで目的のコアに割り込みを発生させることができます(raise_IPI
関数)*2。
複数のコアから同時にある1つのコアにコア間割り込みを要求するタイミングが発生することもあります。その場合はコア間割り込みが縮退し、1回の割込み発生となります。しかし今回の用途では特に問題にならないため、対処は特に必要ありません。
volatile unsigned int * const reg_msip_base = ((unsigned int *)0x2000000U); void raise_IPI(CoreIdType core) { *(reg_msip_base + core) = 1U; } void broardcast_IPI(void) { CoreIdType core = ThisCore + 1; do { raise_IPI(core); core = (core + 1) % NUMBER_OF_CORES; } while (core != ThisCore); }
コア間割り込みが発生するとInterCoreInt
関数が呼び出されます。InterCoreInt
関数は、自コアが忙しければ再スケジューリング要求を無視することとします。
再スケジューリングによるオーバヘッドを減らすためにこのような実装にしましたが、これはOS実装の方針次第です。
InterCoreInt
関数が終了する前にコア間割り込みをキャンセルする必要があります(clear_IPI
関数)。この処理を行ないと、コア間割り込みが無限に発生し続けます。
static void clear_IPI(void) { *(reg_msip_base + ThisCore) = 0U; } int InterCoreInt(void) { _print_message("Inter-Core Interrupt\n"); clear_IPI(); if (CurrentTask >= TASKIDLE) { return TRUE; /* reschedule request */ } return FALSE; }
スピンロック
コア間でアクセスが競合する資源はスピンロックで保護します。ロックを握ったコアのみが資源へのアクセスを許可されます。
ロック手順
SpinLock
関数は引数で指定されたロック変数のロックを行ないます。ロック試行にはTestAndSet
という関数を用います。TestAndSet
関数はロックに成功した場合はゼロを、ロックに失敗した場合は非ゼロを返します。SpinLock
関数は、既に他コアがロックを握っていた場合(TestAndSet
関数が失敗した場合)、自コアがロックを握れるまで何度もロックを試みます。
TestAndSet
関数でロック変数に設定する値としては「コア番号+1」を指定するようにしました。どこのコアからロックを行なっても常に1を設定する実装でも構わないのですが、問題発生時の解析の手助けとなるようにこのような実装にしました。
ロック失敗後のロック再試行は少し時間を空けて行なう方が良いです。連続したロック再試行はメモリバスに負担を掛け、他コアや各種コントローラからのメモリアクセスに悪影響を与えます。
ここではPause
関数を呼び出すことで実現しています。Pause
関数はfence.i
命令を実行する関数です。fence.i
命令は、命令パイプラインをフラッシュします。fence.i
命令実行前にフェッチした命令列が破棄され、再度フェッチし直されます。今回はfence.i
命令を使いましたが、ローカルコア内だけで完結する結果に影響を与えない命令(命令列)であれば、どんな処理でも構いません。
typedef volatile int Lock_t; void SpinLock(Lock_t *lock) { while (TestAndSet(lock, ThisCore + 1)) { Pause(); } }
TestAndSet
関数は次のC関数の操作を不可分(アトミック)に行ないます。lockが指す変数が0であるか否かの確認と、lockが指す変数の更新を同時に行ないます。
int TestAndSet(Lock_t *lock, int newval) { if ( *lock == 0 ) { *lock = newval; return 0; } else { return 1; } }
この処理をC記述で実現することはできないため、マルチコア用の命令を用いて実現します。ここでは、load-reserved命令・store-conditional命令を利用して実現します。
.global TestAndSet .type TestAndSet,@function .balign 4 TestAndSet: mv a2, a0 lr.w a0, (a2) bne a0, zero, 1f sc.w a0, a1, (a2) 1: ret .size TestAndSet,.-TestAndSet
lr.w
命令(load-reserved命令)でロック変数の値を読み込みます。この時同時にロック変数のアドレスがこのコアからの監視対象となります。正確にはロック変数を囲む一定のアドレス区間が監視対象(reserved状態)になります。ロック変数から読み出した値が0以外であった場合は既に他のコアによってロックされていることを意味するため、ここでTestAndSet
関数をエラー終了させます。
sc.w
命令(store-conditional命令)は、lr.w
命令実行後にロック変数のあるアドレス区間に書き込みが行なわれていなかった時(reserved状態を維持)に限りロック変数の値を更新します。lr.w
命令とsc.w
命令の間に他コアからこのアドレス範囲への書き込みがあった場合、lr.w
命令を実行したコアはこれを検出しreserved状態を解除します。これにより後に続くsc.w
命令は失敗することになります。
この仕組みはload-link/store-conditionalとも呼ばれており、他のCPUアーキテクチャでも採用されているメジャーな方法です。Wikipediaの説明も参考にしてください。
RISC-Vは、AMOと呼ばれるアトミック更新命令も提供しており、この命令を利用してTestAndSet
関数を実装することも可能です。
アンロック手順
アンロック処理はロック変数の単純なゼロクリアで実現します。ただし、ロック変数クリアの前に行なわれたメモリ更新処理が全て完了すること保証するために、ロック変数クリア直前にMemBarrier
関数呼び出しを置きます。
void SpinUnlock(Lock_t *lock) { MemBarrier(); *lock = 0; }
スケジューラ間の排他
スケジューラは全てのコア上で独立して動作します。複数のスケジューラが同時に同じタスクに実行権を与えようとしてしまうことを防ぐために、ロック変数lock_sched
を用意し、そのロック変数を握ることができたスケジューラのみが再スケジューリングの権限を得られるようにします。
static Lock_t lock_sched; void _Schedule(void) { SpinLock(&lock_sched); TaskIdType from = CurrentTask; CurrentTask = ChooseNextTask(); if (from != CurrentTask) { TaskSwitch(&TaskControl[from], &TaskControl[CurrentTask]); } SpinUnlock(&lock_sched); }
初めて動作するタスクはTaskEntry
から動き始めますが、この時lock_sched
によるロックの解除を行なう必要があります。このタスクはスケジューラ_Schedule
関数の出口を通らないため、ひとつ前に動いていたタスクが_Schedule
関数の先頭で握ったロックをTaskEntry
で解放してあげる必要があります。
static void TaskEntry(void) { SpinUnlock(&lock_sched); TaskStart(TaskControl[CurrentTask].entry, &task_ustack[CurrentTask][USTACKSIZE]); }
また、OS起動時もlock_sched
によるロックを握った後に初期タスクを起動する必要があります。この初期タスクは_Schedule
を経由せずTaskEntry
を呼び出します。TaskEntry
のlock_sched
ロック解除と対になるロック取得操作はここで行なう必要があります。
void main(void) { : : SpinLock(&lock_sched); CurrentTask = TASKIDLE + ThisCore; TaskControl[CurrentTask].coreid = ThisCore; load_context(&TaskControl[CurrentTask].sp); }
セマフォの排他
1つのセマフォを複数コアから同時に操作できないようセマフォ操作前にロックを取得するようにします。ロック変数は、それぞれのセマフォ毎に用意します(SemaphoreControl
のlock
メンバ)。
struct SemaphoreControl {
TaskIdType owner_task;
Lock_t lock;
} SemaphoreControl[NUMBER_OF_SEMS] = {
:
セマフォ取得・解放操作時は、セマフォのlock
メンバを使いロックします。スケジューラ呼び出し前にはセマフォのロックを解除します。
マルチコア環境では_AcquireSemaphore
にてセマフォ取得が失敗しスケジューラ_Schedule
を呼び出す(2)直前、セマフォのロックを解除(1)した瞬間に他コアで_ReleaseSemaphore
が呼び出されセマフォが解放される可能性があります。これはシングルコア環境では発生しない状態です。
セマフォが解放されると_AcquireSemaphore
実行中のタスクがREADY状態に遷移します(3)。その場合、スケジューラ_Schedule
(2)は次に動作するタスクとして_AcquireSemaphore
を呼び出したタスク自身(カレントタスク)を選択するかもしれません。
void _AcquireSemaphore(SemIdType sem) { SpinLock(&SemaphoreControl[sem].lock); while (SemaphoreControl[sem].owner_task != SEM_AVAILABLE) { TaskControl[CurrentTask].target_sem = sem; TaskControl[CurrentTask].state = BLOCKED; SpinUnlock(&SemaphoreControl[sem].lock); (1) _Schedule(); (2) SpinLock(&SemaphoreControl[sem].lock); } SemaphoreControl[sem].owner_task = CurrentTask; SpinUnlock(&SemaphoreControl[sem].lock); } void _ReleaseSemaphore(SemIdType sem) { TaskIdType task; SpinLock(&SemaphoreControl[sem].lock); SemaphoreControl[sem].owner_task = SEM_AVAILABLE; for (task = 0; task < NUMBER_OF_TASKS; task++) { if (TaskControl[task].state == BLOCKED && TaskControl[task].target_sem == sem) { TaskControl[task].target_sem = NO_SEM; TaskControl[task].expire = 0; /* XXX */ SpinUnlock(&SemaphoreControl[sem].lock); _TaskUnblock(task); (3) SpinLock(&SemaphoreControl[sem].lock); } } SpinUnlock(&SemaphoreControl[sem].lock); }
割り込みエントリ
割り込み共通エントリ
タイマ割り込みとコア間割り込みで共通の割り込みエントリを使用します。 割り込み発生毎にTPレジスタがコアローカル域の先頭を指すように計算し直します。「MSCRATCHレジスタ値 - _CLV_SIZE」がTPの値となります(A)。
割込みスタックはコア毎に用意されています。MSCRATCHレジスタの値から割込みスタックの底のアドレスを求めSPレジスタに設定します。「MSCRATCHレジスタ値 + _STACK_SIZE」がSPの値となります(B)(C)。発生したコア用の割込みスタックはそのコア用のメモリ領域内にあります。MSCRACHレジスタの値を元に割込みスタックの位置を計算します。
.balign 4 int_handler: csrrw tp, mscratch, tp sd t0, 5*8(tp) sd t1, 6*8(tp) sd t2, 7*8(tp) sd t3, 8*8(tp) mv t3, tp (B) la t0, _CLV_SIZE sub t0, tp, t0 csrrw tp, mscratch, tp mv tp, t0 /* restore tp, which might have been broken */ (A) : csrr a0, mcause (D) : mv s0, sp la t0, _STACK_SIZE add sp, t3, t0 (C) jal InterruptHandler (E) : :
割り込み共通ハンドラ
割り込み発生時、一旦すべての割り込みをInterruptHandler
ハンドラで受け付け、そこからタイマ割り込みハンドラTimer
、コア間割り込みハンドラInterCore
を呼び出すようにしました*3。割り込みエントリint_handler
は、mcauseレジスタの値を引数としてInterruptHandler
ハンドラを呼び出します(D)(E)。
#define INT_MMODE_SOFT 3 #define INT_MMODE_TIMER 7 int InterruptHandler(unsigned long cause) { switch ((unsigned short)cause) { case INT_MMODE_SOFT: return InterCoreInt(); break; case INT_MMODE_TIMER: return Timer(); break; default: break; } return FALSE; }
print_message関数
print_message関数をマルチコア用に拡張します。
- どのコアからの出力であるか識別できるように、出力文字にコア番号を追加することとします(1)。
- 複数のコアから同時に出力した時、出力が交じらないようにします。ロック変数
lock_uart
を用意し、このロックを握ったコアのみが出力可能という実装にします(2)。
従来の_print_message
関数を__print_message
関数に名前を変更し、新規に作り直した_print_message
関数の中でロックの制御およびコア番号出力の追加を行なった上で__print_message
関数を呼び出すようにします。
void _print_message(const char *s, ...) { unsigned long coreid = ThisCore; static Lock_t lock_uart; SpinLock(&lock_uart); (2) __print_message("core%x: ", &coreid); (1) va_list ap; va_start (ap, s); __print_message(s, ap); va_end (ap); SpinUnlock(&lock_uart); (2) }
動かしてみよう
ここで紹介したプログラムはgithubにて公開しています。ここからプログラムをダウンロードし、添付されたmakefileを利用してビルドします。いつまでも名前がa.out OSのままでは寂しいので、食事をする哲学者を動かすOSということでsophiaというOS名にしました。
$ make riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c main.c riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c primitives.s riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c start.s riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -c syscall.s riscv64-unknown-elf-gcc -O2 -mcmodel=medany -ffreestanding -g -msmall-data-limit=0 -c application.c riscv64-unknown-elf-ld main.o primitives.o start.o syscall.o application.o -T riscv-virt.lds -o sophia
qemu起動時のオプションに-smp 3
を指定し、3コアでOSを起動します。
3つのコアが起動し、それぞれのコア上でタスクが動作していることが分かります。ここまでの連載で使ってきたタスク群をマルチコア環境で動かしてみます。5タスクに対してコア3つを用意することはオーバスペックです。もっと多くのタスクを定義し、どのように動作するか確認してみてください。
$ qemu-system-riscv64 -smp 3 -nographic -machine virt -m 128M -kernel sophia -bios none core1: Core1 started. core2: Core2 started. core0: Core0 started. core2: Task2 Eating core0: Task1 Meditating core0: Task4 Eating core1: Timer core0: Timer core2: Timer core0: Timer core2: Timer core1: Timer core0: Task1 Meditating core2: Inter-Core Interrupt core1: Inter-Core Interrupt core2: Timer core1: Timer core0: Timer core2: Inter-Core Interrupt core1: Inter-Core Interrupt core0: Task1 Meditating core2: Timer core1: Timer core0: Timer core2: Timer core1: Timer core0: Timer core2: Inter-Core Interrupt core1: Inter-Core Interrupt core0: Task1 Eating core2: Task3 Meditating core2: Task5 Meditating core2: Inter-Core Interrupt core0: Inter-Core Interrupt core1: Task4 Eating core0: Timer core2: Timer
最後に
少しでも単純で短いコードとなるように実装してきましたが、メモリ保護やマルチコアの機能まで実装すると、そこそこ複雑なコードになってしまいました。
READY状態かつTaskControlのcoreidがCORE_UNASSIGNEDのタスク以外はスケジューラが無視することを前提に、ロック変数lock_sched
による排他は最小限にしましたが、もしかしたら排他処理に考慮漏れがあるかもしれません。
次回の記事では、OSをRISC-V SBI(Supervisor Binary Interface)に対応させます。OSをスーパバイザモードで動かすことになります。更にその後の連載では、ハイパバイザ実装に切り込む予定です。