coroutine 是個什麼東西,在 Google 上已可以找到非常多科普文章。如果已經對 thread 有所了解的人,可簡單將 coroutine 想象成類似 thread 的玩意 -- coroutine 跟 thread 一樣有自己的 "context" (程式執行的狀態),兩者差異之處在於:
- 多個 thread 間、彼此交替執行 (context-switching) 的動作是由作業系統來強制執行的。因此編寫 thread 程式碼時,是無法預期某個 thread 何時被 CPU 執行的。也因如此,對於某些「同時間只能被一個 thread 來讀寫」的資料,需要特別進行上鎖來保護。
- 而多個 coroutine 間的交替執行,則是在 coroutine 的內容中由我們主動安排的,通常這個動作會稱為 yield (讓位);編寫 coroutine 程式碼時,程式設計者要手動排定何時該呼叫 yield、將 CPU 讓出來去執行下一個 coroutine、下一個 coroutine 再呼叫 yield ... 如此在所有 coroutines 中反覆接替讓位。等到再輪回原 coroutine 時,會由上次呼叫 yield 的下一行恢復執行,而 Call Stack、區域變數的內容如舊。也因為程式設計者可以預期執行切換的時間點,因此不需要對資料進行上鎖保護。
- coroutine 是依附於 thread 底下的。
如果多個 coroutine 中,其中有少數不肯配合,遲遲不呼叫 yield,那麼其它 coroutine 就會被堵住無法進行,整體來看就會退化為批次作業。因此 coroutine 又被譯為「協程」-- 需要所有人同心協力,才能營造出平行處理的美麗幻象。
那麼所謂的 "context" (程式執行的狀態) 究竟是個什麼玩意呢? 不管是對 thread 或 coroutine 來說,context 實際上就只是一組數據資料,內容主要是以下兩項:
- 一份 CPU 內所有暫存器內容的快照 (snapshot),以及
- 一塊預先配置好、用為 Stack 的記憶體空間
而 context-switching 這個動作,其實就只是把目前 CPU 上所有暫存器的內容做快照,存回 A context、然後把 B context 內,先前抓下的 CPU 快照內容讀出、置換回 CPU 暫存器中,然後讓 CPU 繼續執行。經過這個動作後,外界看起來就會像是 B thread (或 coroutine) 恢復執行、而 A thread (或 coroutine) 被暫停執行的表像。在大部分的 CPU 中,Stack 的位置也會存放在其中一個暫存器中,因此只要置換了暫存器的值,就連同 Stack 也置換了。
要直接操作 CPU 暫存器難免會需要組合語言的知識。但幸運的是,大部分的平台都已提供專門的 API 來進行上述 context-switching 的動作,因此我們無須煩惱如何在不同 CPU 上實作暫存器內容快照、或置換內容的問題。Boost 函式庫中甚至有一個專門處理 context 的函式庫,實現了多數主流 CPU 架構中的 context 操作。
我自己也曾經有寫過跨平台的 coroutine 實作,原始碼置於 GitHub 上 (註)。內部透過兩種方式實現 context-switching:
- 統一在所有平台上透過 Boost.Context 來實現
- 在 Windows 平台上以 Fiber API 來實現;在 Un*x 平台上以 makeconext()、swapcontext() 來實現
註: 我的 coroutine 實作是內含在 libxpf 這個專案裡的這幾個檔案,其餘部分可無視:
include/xpf/coroutine.h
src/coroutine.cpp
src/platform/coroutine_boost_fcontext.hpp
src/platform/coroutine_windows.hpp
src/platform/coroutine_posix.hpp
external/boost/context/asm
首先,支援 coroutine 最為完整的平台要說是 Windows,早在 Windows XP 的時代就支援了 coroutine。但名字有點不同,那時不叫 coroutine,而是叫 Fiber (比 thread 還細的意思,又譯為 "纖程")。由 MSDN 的文件,可得知 Fiber 相關的操作大致上就這六個:
- ConvertThreadToFiber : 在一個完全沒有 fiber 的 thread 中創建出第一條 fiber。可傳入一個 userdata 後續在產出的 fiber 中可透過 GetFiberData() 來將它取回。返回 fiber 物件,這個 fiber 物件就是目前正執行中的 fiber。 (GetCurrentFiber() 會回傳同一個 fiber 物件)
- CreateFilber : 在 thread 中產生第二、三、... 條 fiber (第一條必需由 ConvertThreadToFiber 產生)。可指定 fiber 的 body 函數、userdata、以及 Stack 大小。產生出的 fiber 物件並不會執行,需等到明確地呼叫了 SwitchToFiber()。
- SwitchToFiber : 將指定的 fiber 切換為執行狀態,而呼叫此 API 的 fiber 則暫停執行。此動作即為上述所謂的「讓位」 (yield)。
- DeleteFiber : 刪除 fiber。
- GetCurrentFiber : 取得目前正執行中的 fiber 物件。
- GetFiberData : 取得創建 fiber 時提供的 userdata。
透過上述六個 API 來實現等價於 Lua 、Unity C# 中 coroutine 機制的方法大致上是: 先透過 ConvertThreadToFiber() 產生出第一個 fiber,把它用作為其它 fiber 的管理者,按一定的順序將執行權交付給其它 fiber (coroutine):
#include <Windows.h> #include <list> #include <string> #include <iostream> //----------------------------------------------- typedef void(*coroutine_body_t)(void*); // 記錄每個 fiber (coroutine) 的細節,用為該 fiber 的 userdata struct co_record_t { coroutine_body_t body_func; void* fiber; void* param; bool finished; }; // 記錄所有 fibers 的排程資訊 struct co_registry_t { void* main_fiber; std::list<co_record_t*> coroutines; }; co_registry_t* _co_registry = 0; // 每個 coroutine 需要定期呼叫這個 API 把執行權讓給別的 coroutine void yield() { // 總是將執行權讓給管理者 fiber,讓它來決定下一個執行的 fiber 是誰 void* cur_fiber = GetCurrentFiber(); if (!_co_registry || cur_fiber == _co_registry->main_fiber) return; // 交出執行權,呼叫 yield() 的 fiber 會暫停在這一行,等待再次取得執行權。 SwitchToFiber(_co_registry->main_fiber); } void WINAPI coroutine_body_wrapper(void* param) { co_record_t * p = (co_record_t*)param; // 呼叫 coroutine 的 body 函數 p->body_func(p->param); // 按 Windows 設計,Fiber 的 body 函數是絕不能返回的,否則會導致 ExitThread()。 // 因此此處將 fiber 標記為已完畢,並持續 yield 給管理者 fiber,讓它來把這個 fiber 刪掉。 p->finished = true; while (true) yield(); } // 創建新的 coroutine,可以在 coroutine 中呼叫。傳入 coroutine body 函數以及參數。 void* create_coroutine(coroutine_body_t body_func, void * param) { co_record_t *p = new co_record_t; p->body_func = body_func; p->param = param; p->finished = false; void * f = CreateFiber(0, coroutine_body_wrapper, p); _co_registry->coroutines.push_back(p); p->fiber = f; return f; } // 刪除 coroutine void delete_coroutine(void* co) { DeleteFiber(co); } //------------------------------------------------ void my_coroutine_a(void * p) { int tab_cnt = (int)p; std::string tabs(tab_cnt, '\t'); for (int i = 0; i < 30; ++i) { std::cout << tabs << "my_coroutine_a #" << i << std::endl; yield(); } } void my_coroutine_b(void * p) { for (int i = 0; i < 50; ++i) { std::cout << "my_coroutine_b #" << i << std::endl; yield(); if (i == 25) create_coroutine(my_coroutine_a, (void*)1); } } //------------------------------------------------ int main() { // 創建出第一個 fiber,並用作為管理者 fiber。 _co_registry = new co_registry_t; _co_registry->main_fiber = ConvertThreadToFiber(_co_registry); // 由此開始就是第一個 fiber 執行中的狀態。 // 為了演示,此處啟始兩個 coroutine create_coroutine(my_coroutine_a, (void*)0); create_coroutine(my_coroutine_b, nullptr); // 管理者 fiber 的排程迴圈,當已無運作中 fiber 時才會退出。 while (true) { if (_co_registry->coroutines.empty()) break; co_record_t* co = (co_record_t*)_co_registry->coroutines.front(); SwitchToFiber(co->fiber); // 每次在 coroutine 呼叫 yield() 後,查看是否已結束,若是則刪除 fiber if (co->finished) { DeleteFiber(co->fiber); delete co; _co_registry->coroutines.pop_front(); continue; } else { _co_registry->coroutines.pop_front(); _co_registry->coroutines.push_back(co); } } delete _co_registry; return 0; }
在 Un*x 平台上,並不存在像 Windows Fiber 如此完整、可用來實現 coroutine 的基礎設施,而是提供了更為低階的 makeconext()、setcontext()、swapcontext() 等等 context 操作。可惜的是這三個 API 已經被 POSIX 標準宣告為待廢棄,然而新的後繼者卻還不見蹤影。幸好截至目前為止,所有主流 PC 平台、Android、以及 iOS 上都還是可以使用的狀態。這三個 API 大致的用途是:
- makecontext : 呼叫者提供 body 函數、Stack 空間等資訊,透過它來打包進一個 ucontext_t 的不透明資料結構裡,供後續 setcontext() 、 swapcontext() 使用。
- setcontext : 切換目前 context 到提供的 ucontext_t 中 (事先用 makecontext() 產生)
- swapcontext : 先將目前的 context 儲存到一個 ucontext_t 中,再將 context 切換到提供的另一個 ucontext_t 中。
我採取的策略是透過上述的 API (實際上只用到兩個,setcontext 的功能完全可被 swapcontext 取代),來實作出等價於 Windows Fiber 的那六個 API。如此一來不管在哪個平台上的使用方式就會統一。具體的實作請直接參考程式碼:
如果「被 POSIX 標示為待廢棄」這件事很令你在意的話,也可以考慮另一個解決方案:抽出 Boost.Context 中的部分程式碼來使用 (註)。Boost.Context 中已有用組合語言來實作 make_fcontext() 與 jump_fcontext() 這兩個 API,這兩者的角色等同於 POSIX 的 makecontext() 與 swapcontext(),因此可以很方便地用為替代品。具體原始碼請參考:
註: Boost 函式庫採用的授權是允許再製 (reproduce) 的。
include/xpf/coroutine.h
沒有留言:
張貼留言