2017年11月23日 星期四

C/C++ 如何實現 coroutine (fiber)?

近幾年不論在 Lua 、或 Unity C# 中,都能看到大量 coroutine 的應用範例。一旦了解 coroutine 的工作原理,很難不愛上它。C/C++ 中其實也可以實現 coroutine,但不知為何很少看到大規模應用,也或許是因為 multi-threading 的提倡而被冷落。結果反倒是在 Lua、Unity 等等以 single thread 應用為主的場合才又重新獲得關注。



coroutine 是個什麼東西,在 Google 上已可以找到非常多科普文章。如果已經對 thread 有所了解的人,可簡單將 coroutine 想象成類似 thread 的玩意 -- coroutine 跟 thread 一樣有自己的 "context" (程式執行的狀態),兩者差異之處在於:

  • 多個 thread 間、彼此交替執行 (context-switching) 的動作是由作業系統來強制執行的。因此編寫 thread 程式碼時,是無法預期某個 thread 何時被 CPU 執行的。也因如此,對於某些「同時間只能被一個 thread 來讀寫」的資料,需要特別進行上鎖來保護。
  • 而多個 coroutine 間的交替執行,則是在 coroutine 的內容中由我們主動安排的,通常這個動作會稱為 yield (讓位);編寫 coroutine 程式碼時,程式設計者要手動排定何時該呼叫 yield、將 CPU 讓出來去執行下一個 coroutine、下一個 coroutine 再呼叫 yield ... 如此在所有 coroutines 中反覆接替讓位。等到再輪回原 coroutine 時,會由上次呼叫 yield 的下一行恢復執行,而 Call Stack、區域變數的內容如舊。也因為程式設計者可以預期執行切換的時間點,因此不需要對資料進行上鎖保護。
  • coroutine 是依附於 thread 底下的。
如果多個 coroutine 中,其中有少數不肯配合,遲遲不呼叫 yield,那麼其它 coroutine 就會被堵住無法進行,整體來看就會退化為批次作業。因此 coroutine 又被譯為「協程」-- 需要所有人同心協力,才能營造出平行處理的美麗幻象。

那麼所謂的 "context" (程式執行的狀態) 究竟是個什麼玩意呢? 不管是對 thread 或 coroutine 來說,context 實際上就只是一組數據資料,內容主要是以下兩項:
  • 一份 CPU 內所有暫存器內容的快照 (snapshot),以及
  • 一塊預先配置好、用為 Stack 的記憶體空間

而 context-switching 這個動作,其實就只是把目前 CPU 上所有暫存器的內容做快照,存回 A context、然後把 B context 內,先前抓下的 CPU 快照內容讀出、置換回 CPU 暫存器中,然後讓 CPU 繼續執行。經過這個動作後,外界看起來就會像是 B thread (或 coroutine) 恢復執行、而 A thread (或 coroutine) 被暫停執行的表像。在大部分的 CPU 中,Stack 的位置也會存放在其中一個暫存器中,因此只要置換了暫存器的值,就連同 Stack 也置換了。

要直接操作 CPU 暫存器難免會需要組合語言的知識。但幸運的是,大部分的平台都已提供專門的 API 來進行上述 context-switching 的動作,因此我們無須煩惱如何在不同 CPU 上實作暫存器內容快照、或置換內容的問題。Boost 函式庫中甚至有一個專門處理 context 的函式庫,實現了多數主流 CPU 架構中的 context 操作。

我自己也曾經有寫過跨平台的 coroutine 實作,原始碼置於 GitHub 上 (註)。內部透過兩種方式實現 context-switching:

  1. 統一在所有平台上透過 Boost.Context 來實現
  2. 在 Windows 平台上以 Fiber API 來實現;在 Un*x 平台上以 makeconext()swapcontext() 來實現
下面就逐一講講細節。

註: 我的 coroutine 實作是內含在 libxpf 這個專案裡的這幾個檔案,其餘部分可無視:
include/xpf/coroutine.h

src/coroutine.cpp
src/platform/coroutine_boost_fcontext.hpp
src/platform/coroutine_windows.hpp
src/platform/coroutine_posix.hpp
external/boost/context/asm

首先,支援 coroutine 最為完整的平台要說是 Windows,早在 Windows XP 的時代就支援了 coroutine。但名字有點不同,那時不叫 coroutine,而是叫 Fiber (比 thread 還細的意思,又譯為 "纖程")。由 MSDN 的文件,可得知 Fiber 相關的操作大致上就這六個:

  • ConvertThreadToFiber : 在一個完全沒有 fiber 的 thread 中創建出第一條 fiber。可傳入一個 userdata 後續在產出的 fiber 中可透過 GetFiberData() 來將它取回。返回 fiber 物件,這個 fiber 物件就是目前正執行中的 fiber。 (GetCurrentFiber() 會回傳同一個 fiber 物件)
  • CreateFilber : 在 thread 中產生第二、三、... 條 fiber (第一條必需由 ConvertThreadToFiber 產生)。可指定 fiber 的 body 函數、userdata、以及 Stack 大小。產生出的 fiber 物件並不會執行,需等到明確地呼叫了 SwitchToFiber()。
  • SwitchToFiber : 將指定的 fiber 切換為執行狀態,而呼叫此 API 的 fiber 則暫停執行。此動作即為上述所謂的「讓位」 (yield)。
  • DeleteFiber : 刪除 fiber。
  • GetCurrentFiber : 取得目前正執行中的 fiber 物件。
  • GetFiberData : 取得創建 fiber 時提供的 userdata。
透過上述六個 API 來實現等價於 Lua 、Unity C# 中 coroutine 機制的方法大致上是: 先透過 ConvertThreadToFiber() 產生出第一個 fiber,把它用作為其它 fiber 的管理者,按一定的順序將執行權交付給其它 fiber (coroutine):


#include <Windows.h>
#include <list>
#include <string>
#include <iostream>

//-----------------------------------------------

typedef void(*coroutine_body_t)(void*);

// 記錄每個 fiber (coroutine) 的細節,用為該 fiber 的 userdata
struct co_record_t {
    coroutine_body_t body_func;
    void* fiber;
    void* param;
    bool finished;
};

// 記錄所有 fibers 的排程資訊
struct co_registry_t {
    void* main_fiber;
    std::list<co_record_t*> coroutines;
};

co_registry_t* _co_registry = 0;

// 每個 coroutine 需要定期呼叫這個 API 把執行權讓給別的 coroutine
void yield()
{
    // 總是將執行權讓給管理者 fiber,讓它來決定下一個執行的 fiber 是誰 
    
    void* cur_fiber = GetCurrentFiber();
    if (!_co_registry || cur_fiber == _co_registry->main_fiber)
        return;

    // 交出執行權,呼叫 yield() 的 fiber 會暫停在這一行,等待再次取得執行權。
    SwitchToFiber(_co_registry->main_fiber);
}

void WINAPI coroutine_body_wrapper(void* param)
{
    co_record_t * p = (co_record_t*)param;

    // 呼叫 coroutine 的 body 函數
    p->body_func(p->param);

    // 按 Windows 設計,Fiber 的 body 函數是絕不能返回的,否則會導致 ExitThread()。
    // 因此此處將 fiber 標記為已完畢,並持續 yield 給管理者 fiber,讓它來把這個 fiber 刪掉。
    p->finished = true;
    while (true)
        yield();
}

// 創建新的 coroutine,可以在 coroutine 中呼叫。傳入 coroutine body 函數以及參數。
void* create_coroutine(coroutine_body_t body_func, void * param)
{
    co_record_t *p = new co_record_t;
    p->body_func = body_func;
    p->param = param;
    p->finished = false;

    void * f = CreateFiber(0, coroutine_body_wrapper, p);
    _co_registry->coroutines.push_back(p);
    p->fiber = f;
    return f;
}

// 刪除 coroutine
void delete_coroutine(void* co)
{
    DeleteFiber(co);
}

//------------------------------------------------

void my_coroutine_a(void * p)
{
    int tab_cnt = (int)p;
    std::string tabs(tab_cnt, '\t');

    for (int i = 0; i < 30; ++i)
    {
        std::cout << tabs << "my_coroutine_a #" << i << std::endl;
        yield();
    }
}

void my_coroutine_b(void * p)
{
    for (int i = 0; i < 50; ++i)
    {
        std::cout << "my_coroutine_b #" << i << std::endl;
        yield();
        if (i == 25)
            create_coroutine(my_coroutine_a, (void*)1);
    }
}

//------------------------------------------------

int main()
{
    // 創建出第一個 fiber,並用作為管理者 fiber。
    _co_registry = new co_registry_t;
    _co_registry->main_fiber = ConvertThreadToFiber(_co_registry);

    // 由此開始就是第一個 fiber 執行中的狀態。

    // 為了演示,此處啟始兩個 coroutine
    create_coroutine(my_coroutine_a, (void*)0);
    create_coroutine(my_coroutine_b, nullptr);

    // 管理者 fiber 的排程迴圈,當已無運作中 fiber 時才會退出。
    while (true)
    {
        if (_co_registry->coroutines.empty())
            break;

        co_record_t* co = (co_record_t*)_co_registry->coroutines.front();
        SwitchToFiber(co->fiber);

        // 每次在 coroutine 呼叫 yield() 後,查看是否已結束,若是則刪除 fiber
        if (co->finished)
        {
            DeleteFiber(co->fiber);
            delete co;
            _co_registry->coroutines.pop_front();
            continue;
        }
        else
        {
            _co_registry->coroutines.pop_front();
            _co_registry->coroutines.push_back(co);
        }
    }

    delete _co_registry;
    return 0;
}


在 Un*x 平台上,並不存在像 Windows Fiber 如此完整、可用來實現 coroutine 的基礎設施,而是提供了更為低階的  makeconext()setcontext()swapcontext() 等等 context 操作。可惜的是這三個 API 已經被 POSIX 標準宣告為待廢棄,然而新的後繼者卻還不見蹤影。幸好截至目前為止,所有主流 PC 平台、Android、以及 iOS 上都還是可以使用的狀態。這三個 API 大致的用途是:


  • makecontext : 呼叫者提供 body 函數、Stack 空間等資訊,透過它來打包進一個 ucontext_t 的不透明資料結構裡,供後續 setcontext() 、 swapcontext() 使用。
  • setcontext : 切換目前 context 到提供的 ucontext_t 中 (事先用 makecontext() 產生)
  • swapcontext : 先將目前的 context 儲存到一個 ucontext_t 中,再將 context 切換到提供的另一個 ucontext_t 中。
我採取的策略是透過上述的 API (實際上只用到兩個,setcontext 的功能完全可被 swapcontext 取代),來實作出等價於 Windows Fiber 的那六個 API。如此一來不管在哪個平台上的使用方式就會統一。具體的實作請直接參考程式碼:



如果「被 POSIX 標示為待廢棄」這件事很令你在意的話,也可以考慮另一個解決方案:抽出 Boost.Context 中的部分程式碼來使用 (註)。Boost.Context 中已有用組合語言來實作 make_fcontext() 與 jump_fcontext() 這兩個 API,這兩者的角色等同於 POSIX 的 makecontext() 與 swapcontext(),因此可以很方便地用為替代品。具體原始碼請參考:

註: Boost 函式庫採用的授權是允許再製 (reproduce) 的。


include/xpf/coroutine.h

沒有留言:

張貼留言