單Producer和單Consumer
生產者/消費者模型是常見的通信模型,通過共享內核緩衝區環形隊列,實現異步的事件通知。雙方只關注緩衝區內的數據,而不關注彼此,因此常常被用於網絡通信。
信號量
為了避免消費者在緩存區未滿時無意義的輪詢,消費者block直到生產者通知。wait時線程設置信號量並且block,notify時內核通知所有等待信號的線程狀態改為RUNNABLE。
事實上就是Linux的pthread_cond_wait和phread_cond_signal原語。consumer之所以要帶鎖wait,是因為在內部進行調度yield_wait前要放掉鎖,否則其他線程無法進入臨界區;喚醒之後重新獲得鎖。(這裏指的鎖是外部事務的鎖)
wait和notify需要增加鎖,防止notify先於wait進行。(這裏的鎖指的是內部事務的鎖)
wait調用的yield_wait在調度時需要臨時釋放並隨後獲取內部事務鎖,否則會阻塞其他的notify造成全員block。
send(bb, msg): acquire(bb.lock) while True: if bb.in - bb.out < N: bb.buf[bb.in mod N] bb.out: msg
Eventcount & Sequencer
這是1979年提出的算法,作為信號量的可替換實現。Sequencer的目的是處理多producer。
send(Buffer& buffer,Message msg) { t=TICKET(T); AWAIT(buffer.in, t); AWAIT(buffer.out, READ(buffer.in)-N); buffer[READ(buffer.in)%N]=msg; ADVANCE(in);}receive(Buffer& buffer) { AWAIT(buffer.in, READ(buffer.out)); msg = buffer[READ(buffer.out)%N]; ADVANCE(buffer.out); return msg;}
AWAIT(event*,val) - 比較event.count和val,如果大於則返回,否則存入線程TCB並yield
ADVANCE(event*) - 自增event.count並將所有同event且event.count>val的線程喚醒
TICKET(sequencer*) - 原子性自增序號,目的是處理併發的sender
READ(event*) - 原子性讀event.count,因為可能讀操作涉及多memory cell
send等待in超過ticket,相當於拿排隊鎖輪到自己。然後等待緩存區未滿時寫入數據。
receive等待緩衝區存在數據時讀取數據。
Kqueue
https://people.freebsd.org/~jlemon/papers/kqueue.pdf
問題在於,上面提到的做法本質上都是監聽着一個事件,如果我們想要處理多個監聽事件,操作系統必須提供新的原語,例如每個socket都對應着一個file descriptor,需要同時監聽所有socket的事件。BSD的Kqueue和Linux的epoll都是解決這種問題的方式,本質上它們就是IPC,但是單純從IO的角度看叫做多路複用IO。目前epoll用於netty的底層,是單線程實現高性能網絡的關鍵。
傳統的select和poll僅僅適用於file descriptor,但是無法關注其他IPC機制,例如信號、文件系統變化、異步IO完成、進程存在;並且也不具備scalability。
第一個問題在於參數傳遞,每次都必須傳遞整個事件組,並且動態在內核中創建和銷燬內存。第二個問題在於內核必須遍歷整個fd列表去找活躍的fd。初始遍歷一次確定沒有active的fd才能沉睡,如果沒有active還要再遍歷一次設定回調來喚醒,最後喚醒時還要再遍歷一次來看是哪個fd喚醒了。
問題出在這個syscall無狀態上,無法利用之前的信息,每次都得重新計算。因此Kqueue的機制就在於內核中維持一個隊列儲存狀態。
intkqueue(void);intkevent(int kq,const struct kevent *changelist, int nchanges,struct kevent *eventlist, int nevents,const struct timespec *timeout);struct kevent{ uintpt t ident; // 事件關注對象的ID,kq,ident,filter確定唯一的event // 事件類型,ident,fflags,data應該如何被解釋?u short flags; // 輸入: 增加/減少,使能/禁止, 執行後重置/刪除;輸出: 發生EOF或者ERRORu int fflags; // 活躍時應該怎麼做,是否返回event?intptr t data; // filter和fflags規定的數據傳輸方式void *udata; // 自定義的數據傳輸方式__uint64_t ext[4]; //在末尾增加的額外信息Hint}EV_SET(&kev;, ident, filter, flags, fflags, data, udata);
kevent()用於創建kqueue並且返回對應的capability(權限控制的抽象)。
kevent()用於註冊event,並設定超時,changelist是指kqueue註冊的event如何變化,eventlist則是返回的event。當event觸發時,會調用內核的回調函數,通知進程。
filter
EVFILT READ :poll近似的實現,當socket_buffer大於SO_LOWAT時觸發將size寫入data或者斷連時觸發EOF,幫助應用處理數據。
EVFILT WRITE: 類似READ
EVFILT AIO: aio_read/write請求後通過事件進行aio_error輪詢,事件返回後aio_return
EVFILT SIGNAL: id為信號值,返回data為信號計數,通知後clear
EVFILT VNODE: 監聽文件系統vnode,id為fd, fflags監聽下列事件並返回所有發生事件
NOTE DELETENOTE WRITENOTE EXTENDNOTE ATTRIBNOTE LINKNOTE RENAME
EVFILT PROC:監聽進程狀態,id為PID,fflags監聽下列事件
NOTE EXIT/FORK/EXEC 監聽exit,fork,execve等原語NOTE TRACK 若父進程設定為Track則fork後子進程為CHILD輸出:NOTE CHILD 子進程fork後設定child,並且父進程id存入dataNOTE TRACKERR 無法添加子進程事件,通常因為資源限制
sample
handle_events(){int i, n;struct timespec timeout ={ TMOUT_SEC, TMOUT_NSEC };n = kevent(kq, ch, nchanges,evi, nevents, &timeout;);if (n <= 0) goto error_or_timeout;for (i = 0; i < n; i++) {if (evi.flags & EV_ERROR)/* error */if (evi.filter == EVFILT_READ)readable_fd(evi.ident);if (evi.filter == EVFILT_WRITE)writeable_fd(evi.ident);}...}update_fd(int fd, int action,int filter){EV_SET(&chnchanges;, fd, filter,action == ADD ? EV_ADD : EV_DELETE,0, 0, 0);nchanges++;}
Kqueue實現
Knote
計算當前節點的活躍度
鏈接其他knote
存儲自己所在的Kqueue的指針
struct knote { SLIST_ENTRY(knote) kn_link; /* for kq */ SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; struct kqueue *kn_kq; /* which queue we are on */ struct kevent kn_kevent; void *kn_hook; int kn_hookid; int kn_status; /* protected by kq lock */#define KN_ACTIVE 0x01 /* event has been triggered */#define KN_QUEUED 0x02 /* event is on queue */#define KN_DISABLED 0x04 /* event is disabled */#define KN_DETACHED 0x08 /* knote is detached */#define KN_MARKER 0x20 /* ignore this knote */#define KN_KQUEUE 0x40 /* this knote belongs to a kq */#define KN_SCAN 0x100 /* flux set in kqueue_scan() */ int kn_influx; int kn_sfflags; /* saved filter flags */ int64_t kn_sdata; /* saved data field */ union { struct file *p_fp; /* file data pointer */ struct proc *p_proc; /* proc pointer */ struct kaiocb *p_aio; /* AIO job pointer */ struct aioliojob *p_lio; /* LIO job pointer */ void *p_v; /* generic other pointer */ } kn_ptr; struct filterops *kn_fop;#define kn_id kn_kevent.ident#define kn_filter kn_kevent.filter#define kn_flags kn_kevent.flags#define kn_fflags kn_kevent.fflags#define kn_data kn_kevent.data#define kn_fp kn_ptr.p_fp};
Kqueue
kp_knlist存所有knode用於GC
kp_head存存儲所有標記為active的knode
kq_knhash存儲iden->descriptor的映射
kq_fdp fd索引的數組(同open file table)用於關閉fd時刪除對應的knode
struct kqueue { struct mtx kq_lock; int kq_refcnt; TAILQ_ENTRY(kqueue) kq_list; TAILQ_HEAD(, knote) kq_head; /* list of pending event */ int kq_count; /* number of pending events */ struct selinfo kq_sel; struct sigio *kq_sigio; struct filedesc *kq_fdp; int kq_state;#define KQ_SEL 0x01#define KQ_SLEEP 0x02#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */#define KQ_ASYNC 0x08#define KQ_CLOSING 0x10#define KQ_TASKSCHED 0x20 /* task scheduled */#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */ int kq_knlistsize; /* size of knlist */ struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ struct task kq_task; struct ucred *kq_cred;};
Registration
kqueue
kqueue本身作為文件抽象看待,在OFT裏註冊entry創建內核對象並賦予descriptor索引。hash和內部的array並不分配。
kevent
intkevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout){ return (((int (*)(int, const struct kevent *, int, struct kevent *, int, const struct timespec *)) __libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges, eventlist, nevents, timeout));}
這裏調用了kqueue_register來對changeList進行註冊。首先根據線程和fd獲取文件的FCB,kq對於fp引用計數++,然後調用實際的註冊函數。註冊的代碼太長了,大體就是先根據尋找knote節點,找不到如果是EV_ADD則增加knote,否則把事件增加到knote上去。
int kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag){ struct kqueue *kq; struct file *fp; cap_rights_t rights; int error; error = fget(td, fd, cap_rights_init(&rights;, CAP_KQUEUE_CHANGE), &fp;); if (error != 0) return (error); if ((error = kqueue_acquire(fp, &kq;)) != 0) goto noacquire; error = kqueue_register(kq, kev, td, mflag); kqueue_release(kq, 0);noacquire: fdrop(fp, td); return (error);}
Filter
filter的作用就是對於事件源進行過濾,事件源所有的活動都會調用filter,但是隻有符合filter規則的事件才會報告給應用,也就是返回布爾值,同時他也會修改fflags和data產生副作用(上面提到的輸出參數)。filter封裝了事件,kqueue只能詢問他是否活躍,而對事件的細節一無所知。因此只需要增加filter,就能拓展事件的內容。
Activity
在所有觸發這些活動的地方插入hook函數,調用knote()函數遍歷自己維護的klist(註冊的時候維護的),調用filter。
如果事件觸發則激活,通過knote找到其所屬的kqueue,並且將knote加入kqueue的active鏈末尾。如果已經在了,那麼不用增加knote,但是filter還是會記錄activity(e.g.上文提到的副作用)。
這裏有些special case,例如fork需要看是不是TRACK,來判斷是否報告子節點的PID
首先,激活父進程的knote,然後創建新的knote分配給子節點,並且設置CHILD flag和對應的父進程PID。同時這裏還提到了可能存在事件可能改變data,因此為EXIT額外分配一個節點。
/* * Activate existing knote and register tracking knotes with * new process. * * First register a knote to get just the child notice. This * must be a separate note from a potential NOTE_EXIT * notification since both NOTE_CHILD and NOTE_EXIT are defined * to use the data field (in conflicting ways). */ kev.ident = pid; kev.filter = kn->kn_filter; kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT | EV_FLAG2; kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ error = kqueue_register(kq, &kev;, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; /* * Then register another knote to track other potential events * from the new process. */ kev.ident = pid; kev.filter = kn->kn_filter; kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1; kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ error = kqueue_register(kq, &kev;, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; if (kn->kn_fop->f_event(kn, NOTE_FORK)) KNOTE_ACTIVATE(kn, 0); list->kl_lock(list->kl_lockarg); KQ_LOCK(kq); kn_leave_flux(kn); KQ_UNLOCK_FLUX(kq);
Delivery
kqueue_scan在active鏈末尾加入哨兵,如果scan時扔出了哨兵,那麼遍歷結束。
每次都從active移除一個節點(注意檢查timeout,過期也要移除,DISABLE也是在這裏移除),如果不是ONESHOP,那麼filter帶着query hint重新檢查一遍是否激活,防止途中又被修改。
確認激活的knote的信息將會拷貝到kevnet通過eventlist返回給應用進行通知。如果ONESHOP則直接從kqueue中移除,否則如果filter看它仍然active,就把它重新放到active鏈末尾(上次掃描的哨兵之後)。直到哨兵被出列,scan完成。
Miscellaneous Notes
1.論文的版本fork的時候不復制kqueue的df除非vfork。如果複製的話需要在fork時進行整個kqueue複製或者標記為COW。(現在不知道是不是這麼做的)
2.kqueue是通過維護klist來對整條鏈涉及的所有進程進行通知的,而不是像poll或者select那樣在sellInfo持有pid。下面這段話看不懂了,沒看過poll不知道啥叫collision。
3.考慮同一個klist有不同類型的filter,調用knode時應該給予額外信息通知他到底是什麼事件觸發的(例如PROC和SIGNAL容易混淆),因此利用hint確定activity和哪個相關
4. kevent要經歷兩次拷貝,增加了overhead。因此如果採用AIO更好,kernel直接修改user狀態下的control block。那麼為什麼不這麼做呢?根本原因在於如果內核不允許直接寫用户態數據的話,bug會更好定位,同時應用也不需要考慮狀態。
總結
精妙之處在於kqueue維持在內核中,因此socket如果滿了可以直接將knote加入進程kqueue的活躍鏈,而不需要等到下次syscall的時候再檢查。例如,即使我長期不kevent,knote()依然會將他們的activity存儲在knote上並且插入active list,下次只需要遍歷active list而不需要重頭遍歷整個queue。
同時因為kqueue有狀態,進行修改也開銷很小,只需要改變變化的那部分就行了。
看的時候還是有些地方比較難理解,加上源代碼也很複雜,如果有糾錯請指正。
附錄
filechange
struct kevent ev;struct timespec nullts = { 0, 0 };EV_SET(&ev;, fd, EVFILT_VNODE,EV_ADD | EV_ENABLE | EV_CLEAR,NOTE_RENAME | NOTE_WRITE |NOTE_DELETE | NOTE_ATTRIB, 0, 0);kevent(kq, &ev;, 1, NULL, 0, &nullts;);for (;;) {n = kevent(kq, NULL, 0, &ev;, 1, NULL);if (n > 0) {printf("The file was");if (ev.fflags & NOTE_RENAME)printf(" renamed");if (ev.fflags & NOTE_WRITE)printf(" written");if (ev.fflags & NOTE_DELETE)printf(" deleted");if (ev.fflags & NOTE_ATTRIB)printf(" chmod/chowned");printf("n");}
signal
struct kevent ev;struct timespec nullts = { 0, 0 };EV_SET(&ev;, SIGHUP, EVFILT_SIGNAL,EV_ADD | EV_ENABLE, 0, 0, 0);kevent(kq, &ev;, 1, NULL, 0, &nullts;);signal(SIGHUP, SIG_IGN);for (;;) {n = kevent(kq, NULL, 0, &ev;, 1, NULL);if (n > 0)printf("signal %d delivered"" %d timesn",ev.ident, ev.data);}
udata
int i, n;struct timespec timeout ={ TMOUT_SEC, TMOUT_NSEC };void (* fcn)(struct kevent *);n = kevent(kq, ch, nchanges,ev, nevents, &timeout;);if (n <= 0)goto error_or_timeout;for (i = 0; i < n; i++) {if (evi.flags & EV_ERROR)/* error */fcn = evi.udata;fcn(&evi;);}