2017年11月12日 星期日

說說 C/C++ 網路伺服器使用的各種多工器 (Multiplexer)

若想單單只靠作業系統提供的 API 來實作 C/C++ 網路伺服器程序,首先面臨的問題就是該怎麼設計連線的多工處理 -- 同時間與伺服器建立的連線可能成千上萬,怎麼有效地查覺誰送了資料過來需要回應? 在通訊和資訊網路的領域裡,像這樣「把眾多輸入訊號合併匯集後循序處理」的過程稱為「多工」(Multiplexing,大陸譯為「多路復用」)。因此,由作業系統提供,判斷現有連線中是否有待處理事件的 API ,即稱為「多工器」(Multiplexer)。

我在學校裡只學到一種最基本的多工器,就是 select()。踏入業界幾經歷練,才又陸續接觸到 poll(), epoll(), kqueue(), IOCP 等等的多工器。麻煩的地方在於每個作業系統支援的多工器種類都不一致。就算是以現今 2017 年來看,也只有最古老的 select() 幾乎能保證被各作業系統支援。因此,如果跨平台運行是伺服器的一個主要考量點,那麼伺服器端通常都還要自行開發一層網路事件分派層來統整各平台上多工器的使用差異,或著直接利用現用的第三方函式庫如 boost.asio ACE 等。

我雖能力一般水平有限,但對於想一窺全貌但不知何處入門的同好們,提供一點走馬看花式的導覽,還是可勝任的。




為了避免贅述,我假設看倌們都了解 socket programming 的基本概念:

  • socket 是個什麼東西? file descriptor (fd) 又是什麼?
  • socket programming 的基本操作: listen, accept, connect, recv, send。
  • 什麼情況下才需要謹慎地考慮多工設計? 實作「使用具連線概念 (Connection-oriented) 的通訊協議」的「伺服器端」程式,例如 TCP 伺服器端。反之,對於 TCP 的客戶端、或是 UDP 的伺服器或客戶端,因為面對的 socket 數量上限是可以預期的,就算直接使用 select() 也無妨。
那麼,以下導覽開始。


select


select 是最古老、也是最廣泛被所有平台支援的多工器。多數的平台上它的接口長得像:

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

中間那三個 fd_set 是一個不透明的「socket 容器」,平台會另外提供專屬的巨集來對它們操作:

void FD_CLR(int fd, fd_set *set);   // 在容器中將指定的 socket (fd) 解除標記
int  FD_ISSET(int fd, fd_set *set); // 測試指定的 socket 看它是否被標記
void FD_SET(int fd, fd_set *set);   // 在容器中將指定的 socket 設定為已標記
void FD_ZERO(fd_set *set);          // 清空整個容器

常見的使用方式大致是這樣的:

while(server_running)
{
  fd_set rdset;
  int cnt, idx, max_fd;

  // 將容器清空
  FD_ZERO(&rdset);
  // 逐一把要監控的 socket 在容器中設定標記。同時把遇見過最大的 socket 值記到 max_fd 裡。
  for (idx=0; idx < lenof_sockets_to_read; ++idx)
  {
    int fd = sockets_to_read[idx];
    FD_SET(fd, &rdset);
    if (fd > max_fd)
      max_fd = fd;
  }

  // 容器內容準備好之後,呼叫 select()
  cnt = select(max_fd + 1, &rdset, NULL, NULL, NULL);

  // 函數返回時,容器中只有「有資料可讀取」的 socket 會處於標記的狀態
  for (idx=0; idx < lenof_sockets_to_read; ++idx)
  {
    if (FD_ISSET(sockets_to_read[idx], &rdset))
    {
      // 由 socket 中讀取資料 (recv)
    }
  }
}

伺服器處於一個常態迴圈中,每次迴圈都會建立 fd_set 容器,你需要把想監看「是否有資料可讀取」的 socket (fd) 加進去。然後呼叫 select,把填好的 fd_set 做為 readfds 參數傳入,待 select 返回時,再走訪容器中那些被你加入的 socket 們,看看它們是否被標記,有的話即代表有資料可讀取。select 的最後一個參數是一個 timeout 的設定,你可以傳入 NULL,代表一定要等到任何一個 socket 有資料可讀才返回。也可以給一個時間,時間到時就算沒資料可讀也會返回。

select 的第三、四個傳入參數也是 socket 容器,分別裝載想監看「可以進行寫入」、以及「發生了例外」這兩個事件的 socket fd。依我自身的經驗,很少看到有人使用這兩個參數,通常都直接給 NULL,具體的原因請待我稍後說明。

從上述的使用方式,不難發現這其實是一種主動輪詢 (Polling) 的方式。因此當你所關注的 socket 數量增加時,每次耗費在建立 fd_set 、以及在 select 返回後逐一處理被標記的 socket 的時間都會線性增加。這是 select 的一大缺點。然而更為致命的是,fd_set 這個容器的容量是有上限的。Windows 上預設只有 64 個,不過可向上調整。而在大部分 UNIX 平台上,上限是定死在 1024 個。如果你的設計是打算以一個伺服器程序來服務所有連線,那麼你能承接的連線數首先就會受到這個限制而無法提高。


poll


poll 的出現僅僅是為了解決 select 的 fd_set 容量上限的問題。除此之外的性質都與 select 一致。它的接口長得像這樣:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

我們改為自行維護一個 struct pollfd 的「陣列」(fds),大小由我們自行決定。每一個 struct pollfd 即代表一個監看項目,需要把監看的 socket 填在 fd、以及把關注它的哪些事件填在 events。呼叫 poll 時,傳入 struct pollfd 列陣的開端、以及監看項目總數 (nfds)。當 poll 返回時,我們再逐一走訪 struct pollfd 陣列,由 poll 所回報的結果會填在 revents 內。

若將上面 select 中提到的範例改以 poll 來實作,會像是:

// 自行定義容器上限 (MAX_POSSIBLE_POLLFDS),準備好容器空間
struct pollfd *fds_array = (struct pollfd*)malloc(
      sizeof(struct pollfd) * MAX_POSSIBLE_POLLFDS);

while(server_running)
{
  int cnt, idx;

  // 逐一把要監控的 socket 填在容器中,設定 events 為 POLLIN 代表只關注有資料可讀
  for (idx=0; idx < lenof_sockets_to_read; ++idx)
  {
    fds_array[idx].fd = sockets_to_read[idx];
    fds_array[idx].events = POLLIN;
    fds_array[idx].revents = 0;
  }

  // 呼叫 poll,給它容器的開端、以及有多少個監視項目
  cnt = poll(fds_array, lenof_sockets_to_read, -1);

  // 待 poll 返回後,逐一檢查每筆監視項目,看看是否有關注的事件發生
  for (idx=0; idx < lenof_sockets_to_read; ++idx)
  {
    if (fds_array[idx].revents & POLLIN)
    {
      // 由 socket 中讀取資料 (recv)
    }
  }
}

free(fds_array);


除了解決容器上限的問題,本質上還是主動式輪詢的運作方式。因此 socket 數量一高,poll 跟 select 其實是一樣低效的。

poll 在 UNIX 平台上的支援度與 select 同樣廣泛,而 Windows 平台則是直到 Windows Vista 之後才提供 WSAPoll 這個 poll 的等價實作。


fork() 與 c10k 問題


看到這也許會產生疑問: 既然前述的 select 與 poll,在 socket 數量大量成長時效率會愈發降低,那麼早些年 (2000 年左右) 的那些大規模  HTTP、BBS 等站台是怎麼解決這個問題的?

早期 UNIX 平台上主流的網路伺服器程式架構方式跟 Windows 平台有一個很大的區別。UNIX 平台上有個由 Kernel 提供的系統函數名為 fork(),它的作用是 -- 當程序執行到 fork() 時,會立刻像細胞分裂一樣,在系統上產生另一個跟自己一模一樣的分身程序。原程序中、無論是記憶體內容 (Heap 與 Stack)、以及大部分已開啟的 file descriptor 都會被完整複製 (各平台會有不同的例外情況,請各見平台文件)。兩個程序都會接著由 fork() 返回處再往下執行,雙方唯一的差別是 fork() 的返回值不同 -- 原程序的 fork() 返回值是分身程序的 pid,而分身程序得到的 fork() 返回值是 0。Windows 並沒有提供與 fork() 等價的功能,關於這一點可以參考 Cygwin 的 FAQ (註: Cygwin 是致力於在 Windows 平台上實現 POSIX 環境的專案,實作很多 UNIX 獨有的系統函數,其中包含不完美的 fork() ) 。

利用 fork() 的特性,UNIX 網路伺服器的接聽連線流程大致上會設計成這樣:

// 省略 server_socket 產生與 bind 動作
listen(server_socket, 50);

while(server_running)
{
  struct sockaddr_in client_addr;
  socklen_t addrlen = sizeof(struct sockaddr_in);
  int client_socket;
  int client_pid;

  // 接聽 client 連線
  client_socket = accept(server_socket,
      (struct sockaddr*)&client_addr, &addrlen);

  //  將目前程序一分為二。注意兩個程序都擁有 server_socket 與 client_socket,需各自關閉不需要的。
  client_pid = fork();

  if (0 == client_pid)
  {
    // fork() 返回 0 -- 我是被分裂出來的程序,關閉 server_socket 並且開始服務 client
    close(server_socket);
    perform_client_serving();
  }
  else
  {
    // fork() 返回 pid -- 我是原程序,關閉 client_socket,繼續接聽下一個 client 連線
    close(client_socket);
  }
}

因此,每接聽一個新的連線,系統上就會多出一個程序來服務該 client。而每個服務 client 的程序中,通常只需關心一個 socket 而已。這樣一來就不會導致「同一個程序要處理的 socket 數量愈來愈多」的情況,因此就不必在意 select 的低效輪詢特質。同時也因為每個服務 client 的邏輯都已獨立成為程序,就算服務邏輯發生錯誤而導致程序異常中止,也不會影響到服務其它 client 的程序。即便是現今也還有許多伺服器採用這樣的方式在運作,例如 Apache Web Server (Prefork MPM)

採用 fork() 來架構的伺服器會有兩個問題: 其一,不方便移植到 Windows 平台。理由前面說過是因為 Windows 平台並無原生提供 fork() 的等價實作。其二,在連線數量成長到數萬級的規模時,作業系統中也必然存在數萬個程序。而作業系統為了公平服務所有程序,必需常態性地進行 context-switch 以盡可能地讓所有程序都能被 CPU 執行到。而這個 context-switch 動作本身是有微小時間消耗的,當數以萬次的 context-switch 累計起來就會產生可觀的浪費。這個議題即稱為 c10k (同時處理 10k 條以上連線)。

前面說明 select 時有提到「是否可寫入」、以及「是否發生例外」這兩個事件似乎比較少有人監看。較少有人關心「是否可寫入」的理由是,就算在不可寫入的情況下 (例如網卡 buffer 已滿) 硬是去呼叫 send(),也只是阻塞直到寫入完成而已。而在使用 fork() 架構的伺服器,就算某個 client 服務程序發生阻塞也只會影響到它負責的那條連線,並不產生全局影響。因此「無視是否可寫入而總是直接呼叫 send()」 也是可以接受的。而「是否發生例外」不被人關注的理由是因為太罕用了 -- 以 TCP 來說,除非你預期要接收 Out-of-band 資料才會關心這個事件。



同步與異步 (Synchronous  v.s. Asynchronous)


為了解決 c10k 的問題,必需回過頭解決 select 與 poll 的根本缺陷 -- 主動輪詢。而 select 與 poll 需要主動輪詢的理由是什麼? 是因為若不先對現有 sockets 偵測是否可讀/寫的話,直接對它們做 recv/send 動作可能會導致線程陷入阻塞。那換個角度想,若把「進行 socket 操作會阻塞」的特質移除,是不是就可以避免主動輪詢呢? 於是乎「異步 I/O 處理模型」漸漸被發展出來,用以取代傳統的「同步 I/O 處理模型」。關於「同步 I/O」與「異步 I/O」,微軟在 MSDN 上有給出一個定義:

  • 同步 I/O: 發起 I/O 動作的線程會阻塞、等到動作完畢後再繼續。
  • 異步 I/O: 發起 I/O 動作的線程僅僅只有提出 I/O 要求給某個中間人 (MSDN 文件內,此中間人為 Kernel),要求提出之後,原線程就可以去處理其它事了。而中間人進行完 I/O 操作後會發送消息通知原線程處理。
前面提的 select 與 poll 當然是屬於同步 I/O。而各作業系統為了達到「異步 I/O」的目標,提出的對策各有千秋,接下來會逐一說明。不過在這之前,以下幾節會需要先行解釋一些名詞。


阻塞與非阻塞 (Blocking v.s. Non-blocking)


每個 socket 在創建的時候,預設都是會阻塞 (Blocking) 的模式 -- 對它們進行 connect, accept, send, recv 等動作時,可能會導致呼叫的線程暫停執行,直到動作完畢或逾時之後再繼續。然而 socket 創建後是可以切換為非阻塞模式 (Non-Blocking) 的,在 Windows 平台上是透過 ioctlsocket() 這個函數去設定 socket 的 FINOBIO 的值在 UNIX 平台上則統一以 fcntl() 系統函數來對 socket 添加 O_NONBLOCK

// Windows
u_long nonBlocking = (u_long)1;
ioctlsocket(socket, FIONBIO, &nonBlocking);

// Linux, *BSD, MacOSX, iOS
int flags = fcntl(socket, F_GETFL);
fcntl(socket, F_SETFL, flags | O_NONBLOCK);

一旦你把 socket 切換為非阻塞模式之後,對它進行 connect, accept, send, recv 等動作就保證會立刻返回。若動作成功則比照阻塞模式時的處理方式;若動作無法完成 (將會陷入等待狀態),則會得到特殊的錯誤碼,你必需自行再安排時間重新進行一次動作。不幸的是這個錯誤碼有些歷史包袱,你必需判斷錯誤碼是否為 EAGAIN 或 EWOULDBLOCK,把它們都當成是動作暫時無法完成的情況。

將 socket 切換為非阻塞模式的常見應用有:

  1. 構造一個 TCP Client 端時,因為需操作的 socket 只有一個,配合非阻塞模式的 socket,可以簡單地把 socket 動作合併到主線程的事件迴圈中 (若為阻塞模式通常必須另開線程來操作 socket)。很適合 GUI 或 Game 程式等等以單線程事件迴圈為主體的應用。
  2. 配合 Reactor 多工器 (見下節說明) 來實現異步 I/O 處理模型。
請注意不要把 socket 的非阻塞模式 (Non-blocking) 與異步 I/O 劃上等號,它們是兩個不同層次的獨立概念。


Proactor 與 Reactor


上面提到,每個平台是各自獨立地往「異步 I/O」的目標發展。為了達到這個終極目標,必需對傳統的 select/poll 多工器進行改良。各平台改良後的多工器,依照其使用的方式,可歸納成兩種設計模式 (pattern),即 Proactor 與 Reactor。網上詳細解釋這二者的文章蠻多的,這邊試著用一個比較能快速意會,但不一定精確的方式來描述它們:

Proactor 多工器

  • 應用程式透過 Proactor 多工器對 socket 提出操作 (recv/send 等等) 之後,保證立即返回不會阻塞。
  • Proactor 背後有一組工作線程 (thread pool) 來實際執行這些操作。
  • 操作完成後,工作線程將結果 (無論成功或失敗) 統一添加到一個 thread-safe 的消息容器中。應用程式需要週期性地由消息容器中取出消息並且進行適當後續處理。
Reactor 多工器
  • Reactor 多工器僅僅是在 socket 「可以進行某些操作時」來通知你。它本身並不會直接幫你執行 socket 操作。
  • 「可以進行某些操作」的通知也會統一放置在一個 thread-safe 的容器裡。應用程式需要週期性地由容器中取出通知並進行實際的操作。

不難發現 Proactor 提供的「服務」是比較高層次的 -- 我們只管下達命令 (提出 socket 操作),然後就等結果就好。相比之下,Reactor 像是只提供了「半套服務」 -- 它只負責通知我們什麼時候該行動,真正執行操作的人還是我們。

也正因為 Proactor 比起 Reactor 來說是比較高層次的行為,所以如果想要建構一個跨平台的「異步 I/O 」網路層,建議是設計為 Proactor 模式,而在只提供 Reactor 多工器的平台上自行用 Reactor 模擬 Proactor 的運作。

我有試做了一個異步 I/O 網路層的 jnet 函式庫,對外它是以 Proactor 的模式在工作的,然而它在 Windows 上是以 IOCP 實現、而在 UNIX 上分別是以 epoll 、kqueue 兩種 Reactor 為基底來模擬 Proactor 行為。其源碼可作為參考提供給同好們。

下面開始會逐一說明各平台上的 Proactor 與 Reactor 多工器。以及 jnet 中是如何用 Reactor 模擬出 Proactor 的行為。


IOCP (Proactor)


IOCP 的全名為 I/O Completion Port,中文譯為「輸入輸出完成埠」,個人覺得蠻拗口所以偏好以 IOCP 來稱呼。IOCP 是 Windows 平台上獨有的設計,早在 Windows NT 3.5 時就已存在,其設計目的就是為了實現異步 I/O。IOCP 同時也是 Proactor 模式的代表性範本。以下我們一邊說說使用它的流程,一邊就可以逐漸了解它是個什麼東西。

首先是產生一個 IOCP 物件的方式:

HANDLE hIocp = CreateIoCompletionPort(NULL, NULL, userdata, 0);

在 Windows 平台上,「HANDLE」的存在意義類似於 UNIX 平台上的「File Descriptor (fd)」,它就是一個操作對象,背後通常是檔案、socket、或是某些系統物件。

產生了 IOCP 物件之後,也是透過同一個 API 來關連既有的 socket (也是一種 HANDLE) 進 IOCP 中:

CreateIoCompletionPort((HANDLE)socket, hIocp, userdata, 0);

也就是說,當 CreateIoCompletionPort 的前兩個參數皆為 NULL 時,代表要產生新的 IOCP 物件;而皆為非 NULL 值時,代表要關聯一個 HANDLE 進指定的 IOCP 物件中。第三個參數是任由應用程式指派的 userdata,其後所有由這個 IOCP 所回報的消息中都會帶著這個值。而最後一個參數是控制 IOCP 內部能允許有多少線程並發運作,或不關心可直接給 0 來採用預設值。

只要是被關聯進 IOCP 的 socket,後續對它們進行 WSARecvWSASend 等等異步 I/O 專用的操作時,保證呼叫後立即返回、不會產生阻塞。而具體的完成結果則會集中到 IOCP 內部的一個消息佇列中。對這個消息佇列提取完成結果的 API 為 GetQueuedCompletionStatus :

DWORD       numOfBytes;
ULONG_PTR   userdata;
OVERLAPPED* pOverlapped;

GetQueuedCompletionStatus(hIocp, &numOfBytes, &userdata, &pOverlapped, 0);

應用程式應定期透過以上 API 向 IOCP 提取完成結果、逐一對其做出因應處理。
也因為這些完成結果像船一樣一班班地向碼頭停靠,所以稱為 I/O Completion Port。

所有異步 I/O 操作 API (WSARecv、WSASend 等) 都有一個特性,就是它們的最後兩個參數必定帶有 "Overlapped" 的字樣。Overlapped 一詞實際上指的是 "Overlapped I/O",也是 "Asynchronous I/O" 的同義詞 (見MSDN說明)。舉 WSARecv 為例,其接口為:

int WSARecv(
  SOCKET                             s,
  LPWSABUF                           lpBuffers,
  DWORD                              dwBufferCount,
  LPDWORD                            lpNumberOfBytesRecvd,
  LPDWORD                            lpFlags,
  LPWSAOVERLAPPED                    lpOverlapped,     
  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

這些異步 I/O 操作 API 的倒數第二個參數是一個指向 WSAOVERLAPPED 結構的指針,這個結構是異步 I/O 動作進行時,給作業系統用以暫存狀態、進度用的。我們需要先把它的內容歸零再指派給異步 I/O 操作的 API。而最後一個參數可以指派一個回呼函數 (callback),當操作的目標 socket 並沒有關聯上任何 IOCP 時,可以靠這個回呼函數來得知操作完成的狀態;然而在有關聯 IOCP 的情況下,此值通常給 NULL。

除了 WSARecv 與 WSASend 之外,還有其它異步 I/O 操作的 API,可詳見此頁最末的一覽表。其中會發現並未提及與 connect 與 accept 等價的異步 I/O 版本。但實際上它們卻是存在的,名為 ConnectEx AcceptEx,只是因為不存在於原有的 Windows Sockets 規格書中,因此要透過 WSAIoctl 來間接提取 API 的函數指針出來才能使用 (熟悉 OpenGL 開發環境的朋友必定會心一笑)。提取方式請見以下:

DWORD dwBytes = 0;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;

LPFN_ACCEPTEX acceptEx;    // 將會取到 AcceptEx 的函數指標
LPFN_CONNECTEX connectEx;  // 將會取到 ConnectEx 的函數指標

WSAIoctl(socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
 &GuidAcceptEx, sizeof(GuidAcceptEx),
 &acceptEx, sizeof(LPFN_ACCEPTEX),
 &dwBytes, NULL, NULL);


WSAIoctl(socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
 &GuidConnectEx, sizeof(GuidConnectEx),
 &connectEx, sizeof(LPFN_CONNECTEX),
 &dwBytes, NULL, NULL);

如此一來 socket 的四項操作的異步 I/O 版本就都齊備了。如果需要取消正在進行中、但尚未完成的異步操作 -- 例如說要主動切斷某個 Client 連線,則可透過 CancelIoEx 這個 API來達成。

補充一點:整個 IOCP 的使用流程中,會發現只有呼叫 CreateIoCompletionPort 時可以夾帶一個 userdata 進去,而 GetQueuedCompletionStatus 時可以取回這個 userdata。除此之外似乎沒辦法針對每一個異步操作個別夾帶 userdata,如果無法把「異步 I/O 結果」與「這是哪個 socket 上的哪個操作」關聯上,整個處理邏輯是無法完成的。通常解法是去「擴充」 WSAOVERLAPPED 的結構,使它「最前面長得跟 WSAOVERLAPPED 一樣」就可以了。因為是丟指針給異步 I/O API,它們也只會利用到前面 sizeof(WSAOVERLAPPED) 長度的內容,因此以下這兩種擴充方式都是可以的:

// C/C++

struct MYOVERLAPPED
{
  WSAOVERLAPPED overlapped;
  // ... 自行新增的客製化資料
};


// C++

struct MYOVERLAPPED : public WSAOVERLAPPED
{
  // 自行定義的客製化資料
  // 注意: 需保持 POD 
};

呼叫異步 I/O API 時,改用 MYOVERLAPPED 的指針來取代 WSAOVERLAPPED 指針;處理完成消息時,也先將 WSAOVERLAPPED 指針 cast 回 MYOVERLAPPED 再處理。在我們擴充出來的欄位裡夾帶一些資訊,就可以分判出這個操作的結果是屬於哪個 socket 的哪種操作。

(詳細流程可參考 jnet 中的 asio_iocp.cpp 的內容)



POSIX aio (Proactor)


POSIX aio 是整合在 glibc 內、與 IOCP 等價的 Proactor 模式多工器。主流 UNIX 平台如 Linux、BSD、以及 MacOS/iOS 平台上都有提供 aio 支援。Android 是例外 -- 雖它也是基於 Linux,但礙於授權條款並沒有整合 glibc,因此尚未提供 aio 支援。

要對一個現有的 socket 透過 aio 發起異步 I/O,只需要填寫一個 aiocb (AIO Control-Block) 後直接丟給 aio_readaio_write 即可。以下舉 aio_read 為例:

struct aiocb {
    int             aio_fildes;     /* File descriptor */
    off_t           aio_offset;     /* File offset */
    volatile void  *aio_buf;        /* Location of buffer */
    size_t          aio_nbytes;     /* Length of transfer */
    int             aio_reqprio;    /* Request priority */
    struct sigevent aio_sigevent;   /* Notification method */
    int             aio_lio_opcode; /* Operation to be performed;
                                       lio_listio() only */
};

// AIO Control-Block 應先歸零再填值、使用
struct aiocb cb;
memset(&cb, 0, sizeof(struct aiocb);

char recvbuf[1024];

// 設定操作 socket、I/O 使用的 buffer 以及長度
cb.aio_fildes = socket;
cb.aio_buf = recvbuf;
cb.aio_nbytes = 1024;
// I/O 完成後要如何通知原發起者。此處採用 Signal 通知。
cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
cb.aio_sigevent.sigev_signo = SIGUSR1;
cb.aio_sigevent.sigev_value.sival_ptr = userdata;

// 進行異步 I/O (讀取)
aio_read(&cb);

aio 在「通知操作完成」這一部分採用比較特別的作法,並不像 IOCP 把通知存放在容器中,而是用以下兩種方式擇一來通知:
  1. 應用程式提供一個函數給 aio 作為 thread body,aio 在每個異步操作完成後,皆會創建一個獨立的線程去執行你提供的 thread body。或
  2. 應用程式指定一個 UNIX Signal 以及 userdata 給 aio,aio 在每個異步操作完成後,皆會向目前程序發送這個 Signal、夾帶該 userdata。
應用程式是靠 aiocb 中的 aio_sigevent.sigev_notify 這個欄位來指定 aio 的通知方式, SIGEV_THREAD 是選項1、SIGEV_SIGNAL 是選項2。一般來說 Signal 通知會較適合網路應用,畢竟每個操作完成都會另開一個 thread 的行為實在有點嚇人。

為了接收 Signal,在執行任何 aio 操作前,程序應已先行註冊好 Signal 處理函數。上述程式範例中是使用 SIGUSR1,那麼在啟始任何 aio 前應該透過 sigaction() 來設定好 SIGUSR1 的處理函數:


void my_aio_sig_handler(int signo, siginfo_t *info, void *unused)
{
    void* userdata = info->si_value.sival_ptr;
    // 取回 userdata,處理 aio 完成的事件
}


struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_sigaction = my_aio_sig_handler;
sa.sa_flags = SA_SIGINFO;
sigaction(SIGUSR1, &sa, NULL);


my_aio_sig_handler 裡的 userdata 是在發起 aio 操作時,在 aiocb 裡面給的那個 userdata。由此可知它的角色與 IOCP 裡的 WSAOVERLAPPED 結構相似。不管你怎麼設計這個 userdata 的結構,必需要能透過它來取回發起操作時帶的那個 aiocb。透過那個 aiocb 指針我們才能透過 aio_return()aio_error() 詢問操作的結果:

void my_aio_sig_handler(int signo, siginfo_t *info, void *unused)
{
    void* userdata = info->si_value.sival_ptr;
    // 取回 userdata,處理 aio 完成的事件

    // 此處假設 userdata 就是發起 aio 時的 aiocb,實際上應是一個你設計的結構,裡面保有 aiocb
    struct aiocb* cb = (struct aiocb*)userdata;

    // 提取 aio 結果
    int ec = aio_error(cb);
    
    // ec 為 0 (成功) 時,提取實際讀取或寫入的長度
    ssize_t bytes = aio_return(cb);
}


最後,如果要中斷目前仍在進行中的異步 I/O,則是要透過 aio_cancel(),給它發起 aio 時的 socket 與 aiocb。中斷後,完成的事件通知一樣會產生,但此時 aio_error() 會得到 ECANCELED 的錯誤訊息可用來作區別。

值得注意的是 aio 並沒有提供與 connect 和 accept 相應的異步版本,因此這兩個操作必需自行解決。一個可行的解決方案是: 先把要操作的 socket 設定為非阻塞模式,呼叫操作之後把它們丟給其它多工器來監看事件 -- connect 完成於否可監看「是否可寫入」的事件;而 accept 完成與否則監看「是否可讀取」的事件。

註: 使用 aio 的程式在連結時要指定 "-lrt" 的參數。


epoll (Reactor)


epoll 是一個由 Linux Kernel 提供的功能,因此所有基於 Linux 的作業系統都必定支援 epoll,其中包含 Android。epoll 的運作機制可視為是 select/poll 的延續 -- 也是用來監看「可讀取」、「可寫入」等事件,但效率更好。

使用 epoll 的第一步是以 epoll_create() 來創建一個代表 epoll 物件的 fd:

int epollfd = epoll_create(1);

epoll_create() 的傳入參數現今無任何作用了,可以隨便給,但必需是一個正整數否則在較舊的 Kernel 中會被視為錯誤。epoll 物件產生之後,就可以用 epoll_ctl() 來關聯 fd 到這個 epoll 物件上:

typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

// 填寫 epoll_event 的內容,設定我們想要監看的事件 (epoll_event 結構如上)
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;

// 將指定的 socket 關聯進 epoll 中
epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, struct epoll_event *event);

epoll_ctl() 的第一個傳入參數即為以 epoll_create() 產生出來的 epoll 物件。第二個參數是要進行什麼操作,可能值為:

  • EPOLL_CTL_ADD - 關聯新的 socket 到此 epoll 物件中
  • EPOLL_CTL_MOD - 變更已關聯的 socket 的監看設定
  • EPOLL_CTL_DEL - 移除已關聯的 socket
第三個參數即為操作的對象 fd,此文中我們主要探討網路應用,因此上述範例中是使用 socket。最後一個參數是一個 epoll_event 的結構,內含兩個內容: 其一是我們希望 epoll 監看這個 fd 的哪些事件 (epoll_event.events)、其二是指定一個 userdata (epoll_event.data),當事件發生時 epoll 會將這個 userdata 夾在通知中回報給我們,方便流程串接。

應用程式必需週期性地呼叫 epoll_wait() 來檢查 epoll 物件是否有通知產生:

struct epoll_event events[10];

int cnt = epoll_wait(epollfd, events, 10, -1);

第一個參數是 epoll 物件,第二、三個參數是我們準備好的通知儲存空間、以及最大能儲存的通知數。epoll_wait() 丟回的通知數最多不會超過這個數目。最後一個則是 timeout 的設定。

相較於 select/poll,epoll 的其中一個改良是包辦了「監視中的 fd 成員名單」 的管理工作 -- 使用 select 時需自行管理 fd_set、而使用 poll 時需自行管理 pollfd 陣列。另一個改良點是簡化了應用程式端「判斷哪些 fd 上有事件發生」的流程 -- select/poll 都必需在返回後再逐個 fd 查一次返回值來判斷,而 epoll 則是把事件排進內部佇列中等待你用 epoll_wait() 來提取。事實上只要你不在 epoll_event.events 中使用 EPOLLET 這個 flag,文件說此時 epoll 就可用作為一個更高效的 poll 來使用。

關於 epoll_event.events 中的 flags,有兩個這邊特別提出來講一下:

EPOLLET : 切換為 Edge-Trigger 監看方式。不指定 EPOLLET 時預設是 Level-Trigger 監看方法。Edge-Trigger 與 Level-Trigger 的詳細說明請參考我的另一篇文章。select/poll 就是 Level-Trigger 監看方式 -- 假設某個 socket 內有資料可讀而你一直不去讀它,不管對它進行幾次 select/poll,都會得到「可讀取」的通知。如果改成 Edge-Trigger 監看方式的話,只有第一次會得到「可讀取」的通知,後續無論再測試幾次都不會再有通知,直到你真的把資料都讀完了,後續再有資料來才會又測得「可讀取」的通知。

EPOLLONESHOT : 一旦你由 epoll_wait() 中取到一個 socket 的通知之後,epoll 會即刻將這個 socket 在 epoll 內被停用,待你後續用 epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, .....) 來將它再度啟用。

這兩個 flags 的用意在避免「多個線程同時進行 epoll_wait() 時,可能會同時對同一個 fd 的通知事件進行處理」的情況。若設定監看時,同時指定這兩個 flags,那麼就可以保證 epoll 必定只會通知這個 fd 的事件一次而已,等你處理完畢之後再透過 EPOLL_CTL_MOD 開啟。

注意: 當同時指定 EPOLLET 以及 EPOLLONESHOT 時,通常也暗示你需要先將 socket fd 切換為非阻塞 (non-blocking)。因為此時的 socket 事件處理流程會是這樣的:

  1. epoll_wait() 回報 socket 的可讀/寫事件
  2. 應用程式對 socket 進行讀取/寫入,直到動作完畢、或再次得到動作無法完成的回應。
  3. 若 2 的結果是動作無法完成,則再次向 epoll_ctl 註冊該 socket 以監看可讀/寫事件
其中的第 2 點,需求 socket 為非阻塞,否則當動作無法完成時會限入阻塞狀態。


我在 jnet 中有利用 epoll 來實作出仿 IOCP 的 Proactor 多工器,可參考 asio_epoll.cpp

kqueue (Reactor)

kqueue 是類 BSD Kernel 中提供的功能,因此在 FreeBSD、MacOS、iOS 上有支援。它的運作方式與 epoll 高度相似。因此我們都與 epoll 做一個對比來介紹它。

首先是以 kqueue() 創建一個 kqueue 物件 (同 epoll_create()):

int kqfd = kqueue(void);

接下來 epoll_ctl() 與 epoll_wait() 這兩個目的在 kqueue 中被整合成同一個名為 kevent() 的接口:

struct kevent {
    uintptr_t ident;      /* identifier for this event */
    short     filter;      /* filter for event */
    u_short   flags;      /* action flags for kqueue */
    u_int     fflags;      /* filter flag value */
    intptr_t  data;      /* filter data value */
    void      *udata;      /* opaque user data identifier */
};

int kevent(int kq, 
    const struct kevent *changelist, int nchanges, //
          struct kevent *eventlist,  int nevents,  //
    const struct timespec *timeout);

kevent() 的第一個參數即為 kqueue() 產生的 kqueue 物件。第二、三個參數作用如同 epoll_ctl() -- 將 kqueue 與 fd 建立關聯。第四、五個參數作用則如 epoll_wait() -- 取得事件通知。最後一個參數也是控制 timeout。

struct kevent 的角色地位如同 epoll_event -- 設定關聯的 fd 中該監看哪些事件、以及夾帶 userdata 於通知中使控制流程得以串接。當應用於 socket 事件監控時,其 filter 欄位通常指定 EVFILT_READ 或 EVFILT_WRITE,代表監看「可讀取」、「可寫入」事件。此時 ident 欄位需指定 socket fd。而 flags 中也有 EV_CLEAR 與 EV_ONESHOT 分別對應 epoll 中的 EPOLLET 以及 EPOLLONESHOT 功能。udata 欄位是拿來夾帶 userdata 的。

我在 jnet 中有利用 epoll 來實作出仿 IOCP 的 Proactor 多工器,可參考 asio_kqueue.cpp
若直接把 asio_kqueue.cpp 與 asio_epoll.cpp 做 diff,可以發現相異的部分只有少數幾行。

沒有留言:

張貼留言