單線程性能秒殺多線程!多路複用IO實現高性能網絡服務

Kqueue和其他的多路複用IO的核心是,單消費者同時監聽不同種類的生產者,從而提供高性能的單線程IO,減少調度開銷。而Kqueue通過在內核態維持狀態提供了更高的性能。
生產者消費者模型
單線程性能秒殺多線程!多路複用IO實現高性能網絡服務

單Producer和單Consumer

生產者/消費者模型是常見的通信模型,通過共享內核緩衝區環形隊列,實現異步的事件通知。雙方只關注緩衝區內的數據,而不關注彼此,因此常常被用於網絡通信。

信號量

為了避免消費者在緩存區未滿時無意義的輪詢,消費者block直到生產者通知。wait時線程設置信號量並且block,notify時內核通知所有等待信號的線程狀態改為RUNNABLE。

事實上就是Linux的pthread_cond_wait和phread_cond_signal原語。consumer之所以要帶鎖wait,是因為在內部進行調度yield_wait前要放掉鎖,否則其他線程無法進入臨界區;喚醒之後重新獲得鎖。(這裏指的鎖是外部事務的鎖)

wait和notify需要增加鎖,防止notify先於wait進行。(這裏的鎖指的是內部事務的鎖)

wait調用的yield_wait在調度時需要臨時釋放並隨後獲取內部事務鎖,否則會阻塞其他的notify造成全員block。

send(bb, msg): acquire(bb.lock) while True: if bb.in - bb.out < N: bb.buf[bb.in mod N] bb.out: msg

Eventcount & Sequencer

這是1979年提出的算法,作為信號量的可替換實現。Sequencer的目的是處理多producer。

semaphores
send(Buffer& buffer,Message msg) { t=TICKET(T); AWAIT(buffer.in, t); AWAIT(buffer.out, READ(buffer.in)-N); buffer[READ(buffer.in)%N]=msg; ADVANCE(in);}receive(Buffer& buffer) { AWAIT(buffer.in, READ(buffer.out)); msg = buffer[READ(buffer.out)%N]; ADVANCE(buffer.out); return msg;}

AWAIT(event*,val) - 比較event.count和val,如果大於則返回,否則存入線程TCB並yield

ADVANCE(event*) - 自增event.count並將所有同event且event.count>val的線程喚醒

TICKET(sequencer*) - 原子性自增序號,目的是處理併發的sender

READ(event*) - 原子性讀event.count,因為可能讀操作涉及多memory cell

send等待in超過ticket,相當於拿排隊鎖輪到自己。然後等待緩存區未滿時寫入數據。

receive等待緩衝區存在數據時讀取數據。

Kqueue

https://people.freebsd.org/~jlemon/papers/kqueue.pdf

問題在於,上面提到的做法本質上都是監聽着一個事件,如果我們想要處理多個監聽事件,操作系統必須提供新的原語,例如每個socket都對應着一個file descriptor,需要同時監聽所有socket的事件。BSD的Kqueue和Linux的epoll都是解決這種問題的方式,本質上它們就是IPC,但是單純從IO的角度看叫做多路複用IO。目前epoll用於netty的底層,是單線程實現高性能網絡的關鍵。

傳統的select和poll僅僅適用於file descriptor,但是無法關注其他IPC機制,例如信號、文件系統變化、異步IO完成、進程存在;並且也不具備scalability。

第一個問題在於參數傳遞,每次都必須傳遞整個事件組,並且動態在內核中創建和銷燬內存。第二個問題在於內核必須遍歷整個fd列表去找活躍的fd。初始遍歷一次確定沒有active的fd才能沉睡,如果沒有active還要再遍歷一次設定回調來喚醒,最後喚醒時還要再遍歷一次來看是哪個fd喚醒了。

問題出在這個syscall無狀態上,無法利用之前的信息,每次都得重新計算。因此Kqueue的機制就在於內核中維持一個隊列儲存狀態。

intkqueue(void);intkevent(int kq,const struct kevent *changelist, int nchanges,struct kevent *eventlist, int nevents,const struct timespec *timeout);struct kevent{ uintpt t ident; // 事件關注對象的ID,kq,ident,filter確定唯一的event // 事件類型,ident,fflags,data應該如何被解釋?u short flags; // 輸入: 增加/減少,使能/禁止, 執行後重置/刪除;輸出: 發生EOF或者ERRORu int fflags; // 活躍時應該怎麼做,是否返回event?intptr t data; // filter和fflags規定的數據傳輸方式void *udata; // 自定義的數據傳輸方式__uint64_t ext[4]; //在末尾增加的額外信息Hint}EV_SET(&kev;, ident, filter, flags, fflags, data, udata);

kevent()用於創建kqueue並且返回對應的capability(權限控制的抽象)。

kevent()用於註冊event,並設定超時,changelist是指kqueue註冊的event如何變化,eventlist則是返回的event。當event觸發時,會調用內核的回調函數,通知進程。

filter

EVFILT READ :poll近似的實現,當socket_buffer大於SO_LOWAT時觸發將size寫入data或者斷連時觸發EOF,幫助應用處理數據。

EVFILT WRITE: 類似READ

EVFILT AIO: aio_read/write請求後通過事件進行aio_error輪詢,事件返回後aio_return

EVFILT SIGNAL: id為信號值,返回data為信號計數,通知後clear

EVFILT VNODE: 監聽文件系統vnode,id為fd, fflags監聽下列事件並返回所有發生事件

NOTE DELETENOTE WRITENOTE EXTENDNOTE ATTRIBNOTE LINKNOTE RENAME

EVFILT PROC:監聽進程狀態,id為PID,fflags監聽下列事件

NOTE EXIT/FORK/EXEC 監聽exit,fork,execve等原語NOTE TRACK 若父進程設定為Track則fork後子進程為CHILD輸出:NOTE CHILD 子進程fork後設定child,並且父進程id存入dataNOTE TRACKERR 無法添加子進程事件,通常因為資源限制

sample

handle_events(){int i, n;struct timespec timeout ={ TMOUT_SEC, TMOUT_NSEC };n = kevent(kq, ch, nchanges,evi, nevents, &timeout;);if (n <= 0) goto error_or_timeout;for (i = 0; i < n; i++) {if (evi.flags & EV_ERROR)/* error */if (evi.filter == EVFILT_READ)readable_fd(evi.ident);if (evi.filter == EVFILT_WRITE)writeable_fd(evi.ident);}...}update_fd(int fd, int action,int filter){EV_SET(&chnchanges;, fd, filter,action == ADD ? EV_ADD : EV_DELETE,0, 0, 0);nchanges++;} Kqueue實現

Knote

計算當前節點的活躍度

鏈接其他knote

存儲自己所在的Kqueue的指針

struct knote { SLIST_ENTRY(knote) kn_link; /* for kq */ SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; struct kqueue *kn_kq; /* which queue we are on */ struct kevent kn_kevent; void *kn_hook; int kn_hookid; int kn_status; /* protected by kq lock */#define KN_ACTIVE 0x01 /* event has been triggered */#define KN_QUEUED 0x02 /* event is on queue */#define KN_DISABLED 0x04 /* event is disabled */#define KN_DETACHED 0x08 /* knote is detached */#define KN_MARKER 0x20 /* ignore this knote */#define KN_KQUEUE 0x40 /* this knote belongs to a kq */#define KN_SCAN 0x100 /* flux set in kqueue_scan() */ int kn_influx; int kn_sfflags; /* saved filter flags */ int64_t kn_sdata; /* saved data field */ union { struct file *p_fp; /* file data pointer */ struct proc *p_proc; /* proc pointer */ struct kaiocb *p_aio; /* AIO job pointer */ struct aioliojob *p_lio; /* LIO job pointer */ void *p_v; /* generic other pointer */ } kn_ptr; struct filterops *kn_fop;#define kn_id kn_kevent.ident#define kn_filter kn_kevent.filter#define kn_flags kn_kevent.flags#define kn_fflags kn_kevent.fflags#define kn_data kn_kevent.data#define kn_fp kn_ptr.p_fp};

Kqueue

kp_knlist存所有knode用於GC

kp_head存存儲所有標記為active的knode

kq_knhash存儲iden->descriptor的映射

kq_fdp fd索引的數組(同open file table)用於關閉fd時刪除對應的knode

struct kqueue { struct mtx kq_lock; int kq_refcnt; TAILQ_ENTRY(kqueue) kq_list; TAILQ_HEAD(, knote) kq_head; /* list of pending event */ int kq_count; /* number of pending events */ struct selinfo kq_sel; struct sigio *kq_sigio; struct filedesc *kq_fdp; int kq_state;#define KQ_SEL 0x01#define KQ_SLEEP 0x02#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */#define KQ_ASYNC 0x08#define KQ_CLOSING 0x10#define KQ_TASKSCHED 0x20 /* task scheduled */#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */ int kq_knlistsize; /* size of knlist */ struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ struct task kq_task; struct ucred *kq_cred;};

Registration

kqueue

kqueue本身作為文件抽象看待,在OFT裏註冊entry創建內核對象並賦予descriptor索引。hash和內部的array並不分配。

kevent

intkevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout){ return (((int (*)(int, const struct kevent *, int, struct kevent *, int, const struct timespec *)) __libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges, eventlist, nevents, timeout));}

這裏調用了kqueue_register來對changeList進行註冊。首先根據線程和fd獲取文件的FCB,kq對於fp引用計數++,然後調用實際的註冊函數。註冊的代碼太長了,大體就是先根據尋找knote節點,找不到如果是EV_ADD則增加knote,否則把事件增加到knote上去。

int kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag){ struct kqueue *kq; struct file *fp; cap_rights_t rights; int error; error = fget(td, fd, cap_rights_init(&rights;, CAP_KQUEUE_CHANGE), &fp;); if (error != 0) return (error); if ((error = kqueue_acquire(fp, &kq;)) != 0) goto noacquire; error = kqueue_register(kq, kev, td, mflag); kqueue_release(kq, 0);noacquire: fdrop(fp, td); return (error);}

Filter

filter的作用就是對於事件源進行過濾,事件源所有的活動都會調用filter,但是隻有符合filter規則的事件才會報告給應用,也就是返回布爾值,同時他也會修改fflags和data產生副作用(上面提到的輸出參數)。filter封裝了事件,kqueue只能詢問他是否活躍,而對事件的細節一無所知。因此只需要增加filter,就能拓展事件的內容。

單線程性能秒殺多線程!多路複用IO實現高性能網絡服務

Activity

在所有觸發這些活動的地方插入hook函數,調用knote()函數遍歷自己維護的klist(註冊的時候維護的),調用filter。

如果事件觸發則激活,通過knote找到其所屬的kqueue,並且將knote加入kqueue的active鏈末尾。如果已經在了,那麼不用增加knote,但是filter還是會記錄activity(e.g.上文提到的副作用)。

這裏有些special case,例如fork需要看是不是TRACK,來判斷是否報告子節點的PID

Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.

首先,激活父進程的knote,然後創建新的knote分配給子節點,並且設置CHILD flag和對應的父進程PID。同時這裏還提到了可能存在事件可能改變data,因此為EXIT額外分配一個節點。

/* * Activate existing knote and register tracking knotes with * new process. * * First register a knote to get just the child notice. This * must be a separate note from a potential NOTE_EXIT * notification since both NOTE_CHILD and NOTE_EXIT are defined * to use the data field (in conflicting ways). */ kev.ident = pid; kev.filter = kn->kn_filter; kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT | EV_FLAG2; kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ error = kqueue_register(kq, &kev;, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; /* * Then register another knote to track other potential events * from the new process. */ kev.ident = pid; kev.filter = kn->kn_filter; kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1; kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ error = kqueue_register(kq, &kev;, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; if (kn->kn_fop->f_event(kn, NOTE_FORK)) KNOTE_ACTIVATE(kn, 0); list->kl_lock(list->kl_lockarg); KQ_LOCK(kq); kn_leave_flux(kn); KQ_UNLOCK_FLUX(kq);

Delivery

kqueue_scan在active鏈末尾加入哨兵,如果scan時扔出了哨兵,那麼遍歷結束。

每次都從active移除一個節點(注意檢查timeout,過期也要移除,DISABLE也是在這裏移除),如果不是ONESHOP,那麼filter帶着query hint重新檢查一遍是否激活,防止途中又被修改。

The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.

確認激活的knote的信息將會拷貝到kevnet通過eventlist返回給應用進行通知。如果ONESHOP則直接從kqueue中移除,否則如果filter看它仍然active,就把它重新放到active鏈末尾(上次掃描的哨兵之後)。直到哨兵被出列,scan完成。

Miscellaneous Notes

1.論文的版本fork的時候不復制kqueue的df除非vfork。如果複製的話需要在fork時進行整個kqueue複製或者標記為COW。(現在不知道是不是這麼做的)

2.kqueue是通過維護klist來對整條鏈涉及的所有進程進行通知的,而不是像poll或者select那樣在sellInfo持有pid。下面這段話看不懂了,沒看過poll不知道啥叫collision。

While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up

3.考慮同一個klist有不同類型的filter,調用knode時應該給予額外信息通知他到底是什麼事件觸發的(例如PROC和SIGNAL容易混淆),因此利用hint確定activity和哪個相關

4. kevent要經歷兩次拷貝,增加了overhead。因此如果採用AIO更好,kernel直接修改user狀態下的control block。那麼為什麼不這麼做呢?根本原因在於如果內核不允許直接寫用户態數據的話,bug會更好定位,同時應用也不需要考慮狀態。

總結

精妙之處在於kqueue維持在內核中,因此socket如果滿了可以直接將knote加入進程kqueue的活躍鏈,而不需要等到下次syscall的時候再檢查。例如,即使我長期不kevent,knote()依然會將他們的activity存儲在knote上並且插入active list,下次只需要遍歷active list而不需要重頭遍歷整個queue。

同時因為kqueue有狀態,進行修改也開銷很小,只需要改變變化的那部分就行了。

看的時候還是有些地方比較難理解,加上源代碼也很複雜,如果有糾錯請指正。

附錄

filechange

struct kevent ev;struct timespec nullts = { 0, 0 };EV_SET(&ev;, fd, EVFILT_VNODE,EV_ADD | EV_ENABLE | EV_CLEAR,NOTE_RENAME | NOTE_WRITE |NOTE_DELETE | NOTE_ATTRIB, 0, 0);kevent(kq, &ev;, 1, NULL, 0, &nullts;);for (;;) {n = kevent(kq, NULL, 0, &ev;, 1, NULL);if (n > 0) {printf("The file was");if (ev.fflags & NOTE_RENAME)printf(" renamed");if (ev.fflags & NOTE_WRITE)printf(" written");if (ev.fflags & NOTE_DELETE)printf(" deleted");if (ev.fflags & NOTE_ATTRIB)printf(" chmod/chowned");printf("n");}

signal

struct kevent ev;struct timespec nullts = { 0, 0 };EV_SET(&ev;, SIGHUP, EVFILT_SIGNAL,EV_ADD | EV_ENABLE, 0, 0, 0);kevent(kq, &ev;, 1, NULL, 0, &nullts;);signal(SIGHUP, SIG_IGN);for (;;) {n = kevent(kq, NULL, 0, &ev;, 1, NULL);if (n > 0)printf("signal %d delivered"" %d timesn",ev.ident, ev.data);}

udata

int i, n;struct timespec timeout ={ TMOUT_SEC, TMOUT_NSEC };void (* fcn)(struct kevent *);n = kevent(kq, ch, nchanges,ev, nevents, &timeout;);if (n <= 0)goto error_or_timeout;for (i = 0; i < n; i++) {if (evi.flags & EV_ERROR)/* error */fcn = evi.udata;fcn(&evi;);}

io 調用 線程 信號量

版權聲明:本文源自 網絡, 於,由 楠木軒 整理發佈,共 11416 字。

轉載請註明: 單線程性能秒殺多線程!多路複用IO實現高性能網絡服務 - 楠木軒