单Producer和单Consumer
生产者/消费者模型是常见的通信模型,通过共享内核缓冲区环形队列,实现异步的事件通知。双方只关注缓冲区内的数据,而不关注彼此,因此常常被用于网络通信。
信号量
为了避免消费者在缓存区未满时无意义的轮询,消费者block直到生产者通知。wait时线程设置信号量并且block,notify时内核通知所有等待信号的线程状态改为RUNNABLE。
事实上就是Linux的pthread_cond_wait和phread_cond_signal原语。consumer之所以要带锁wait,是因为在内部进行调度yield_wait前要放掉锁,否则其他线程无法进入临界区;唤醒之后重新获得锁。(这里指的锁是外部事务的锁)
wait和notify需要增加锁,防止notify先于wait进行。(这里的锁指的是内部事务的锁)
wait调用的yield_wait在调度时需要临时释放并随后获取内部事务锁,否则会阻塞其他的notify造成全员block。
send(bb, msg): acquire(bb.lock) while True: if bb.in - bb.out < N: bb.buf[bb.in mod N] bb.out: msg
Eventcount & Sequencer
这是1979年提出的算法,作为信号量的可替换实现。Sequencer的目的是处理多producer。
send(Buffer& buffer,Message msg) { t=TICKET(T); AWAIT(buffer.in, t); AWAIT(buffer.out, READ(buffer.in)-N); buffer[READ(buffer.in)%N]=msg; ADVANCE(in);}receive(Buffer& buffer) { AWAIT(buffer.in, READ(buffer.out)); msg = buffer[READ(buffer.out)%N]; ADVANCE(buffer.out); return msg;}
AWAIT(event*,val) - 比较event.count和val,如果大于则返回,否则存入线程TCB并yield
ADVANCE(event*) - 自增event.count并将所有同event且event.count>val的线程唤醒
TICKET(sequencer*) - 原子性自增序号,目的是处理并发的sender
READ(event*) - 原子性读event.count,因为可能读操作涉及多memory cell
send等待in超过ticket,相当于拿排队锁轮到自己。然后等待缓存区未满时写入数据。
receive等待缓冲区存在数据时读取数据。
Kqueue
https://people.freebsd.org/~jlemon/papers/kqueue.pdf
问题在于,上面提到的做法本质上都是监听着一个事件,如果我们想要处理多个监听事件,操作系统必须提供新的原语,例如每个socket都对应着一个file descriptor,需要同时监听所有socket的事件。BSD的Kqueue和Linux的epoll都是解决这种问题的方式,本质上它们就是IPC,但是单纯从IO的角度看叫做多路复用IO。目前epoll用于netty的底层,是单线程实现高性能网络的关键。
传统的select和poll仅仅适用于file descriptor,但是无法关注其他IPC机制,例如信号、文件系统变化、异步IO完成、进程存在;并且也不具备scalability。
第一个问题在于参数传递,每次都必须传递整个事件组,并且动态在内核中创建和销毁内存。第二个问题在于内核必须遍历整个fd列表去找活跃的fd。初始遍历一次确定没有active的fd才能沉睡,如果没有active还要再遍历一次设定回调来唤醒,最后唤醒时还要再遍历一次来看是哪个fd唤醒了。
问题出在这个syscall无状态上,无法利用之前的信息,每次都得重新计算。因此Kqueue的机制就在于内核中维持一个队列储存状态。
intkqueue(void);intkevent(int kq,const struct kevent *changelist, int nchanges,struct kevent *eventlist, int nevents,const struct timespec *timeout);struct kevent{ uintpt t ident; // 事件关注对象的ID,kq,ident,filter确定唯一的event // 事件类型,ident,fflags,data应该如何被解释?u short flags; // 输入: 增加/减少,使能/禁止, 执行后重置/删除;输出: 发生EOF或者ERRORu int fflags; // 活跃时应该怎么做,是否返回event?intptr t data; // filter和fflags规定的数据传输方式void *udata; // 自定义的数据传输方式__uint64_t ext[4]; //在末尾增加的额外信息Hint}EV_SET(&kev;, ident, filter, flags, fflags, data, udata);
kevent()用于创建kqueue并且返回对应的capability(权限控制的抽象)。
kevent()用于注册event,并设定超时,changelist是指kqueue注册的event如何变化,eventlist则是返回的event。当event触发时,会调用内核的回调函数,通知进程。
filter
EVFILT READ :poll近似的实现,当socket_buffer大于SO_LOWAT时触发将size写入data或者断连时触发EOF,帮助应用处理数据。
EVFILT WRITE: 类似READ
EVFILT AIO: aio_read/write请求后通过事件进行aio_error轮询,事件返回后aio_return
EVFILT SIGNAL: id为信号值,返回data为信号计数,通知后clear
EVFILT VNODE: 监听文件系统vnode,id为fd, fflags监听下列事件并返回所有发生事件
NOTE DELETENOTE WRITENOTE EXTENDNOTE ATTRIBNOTE LINKNOTE RENAME
EVFILT PROC:监听进程状态,id为PID,fflags监听下列事件
NOTE EXIT/FORK/EXEC 监听exit,fork,execve等原语NOTE TRACK 若父进程设定为Track则fork后子进程为CHILD输出:NOTE CHILD 子进程fork后设定child,并且父进程id存入dataNOTE TRACKERR 无法添加子进程事件,通常因为资源限制
sample
handle_events(){int i, n;struct timespec timeout ={ TMOUT_SEC, TMOUT_NSEC };n = kevent(kq, ch, nchanges,evi, nevents, &timeout;);if (n <= 0) goto error_or_timeout;for (i = 0; i < n; i++) {if (evi.flags & EV_ERROR)/* error */if (evi.filter == EVFILT_READ)readable_fd(evi.ident);if (evi.filter == EVFILT_WRITE)writeable_fd(evi.ident);}...}update_fd(int fd, int action,int filter){EV_SET(&chnchanges;, fd, filter,action == ADD ? EV_ADD : EV_DELETE,0, 0, 0);nchanges++;}
Kqueue实现
Knote
计算当前节点的活跃度
链接其他knote
存储自己所在的Kqueue的指针
struct knote { SLIST_ENTRY(knote) kn_link; /* for kq */ SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */ struct knlist *kn_knlist; /* f_attach populated */ TAILQ_ENTRY(knote) kn_tqe; struct kqueue *kn_kq; /* which queue we are on */ struct kevent kn_kevent; void *kn_hook; int kn_hookid; int kn_status; /* protected by kq lock */#define KN_ACTIVE 0x01 /* event has been triggered */#define KN_QUEUED 0x02 /* event is on queue */#define KN_DISABLED 0x04 /* event is disabled */#define KN_DETACHED 0x08 /* knote is detached */#define KN_MARKER 0x20 /* ignore this knote */#define KN_KQUEUE 0x40 /* this knote belongs to a kq */#define KN_SCAN 0x100 /* flux set in kqueue_scan() */ int kn_influx; int kn_sfflags; /* saved filter flags */ int64_t kn_sdata; /* saved data field */ union { struct file *p_fp; /* file data pointer */ struct proc *p_proc; /* proc pointer */ struct kaiocb *p_aio; /* AIO job pointer */ struct aioliojob *p_lio; /* LIO job pointer */ void *p_v; /* generic other pointer */ } kn_ptr; struct filterops *kn_fop;#define kn_id kn_kevent.ident#define kn_filter kn_kevent.filter#define kn_flags kn_kevent.flags#define kn_fflags kn_kevent.fflags#define kn_data kn_kevent.data#define kn_fp kn_ptr.p_fp};
Kqueue
kp_knlist存所有knode用于GC
kp_head存存储所有标记为active的knode
kq_knhash存储iden->descriptor的映射
kq_fdp fd索引的数组(同open file table)用于关闭fd时删除对应的knode
struct kqueue { struct mtx kq_lock; int kq_refcnt; TAILQ_ENTRY(kqueue) kq_list; TAILQ_HEAD(, knote) kq_head; /* list of pending event */ int kq_count; /* number of pending events */ struct selinfo kq_sel; struct sigio *kq_sigio; struct filedesc *kq_fdp; int kq_state;#define KQ_SEL 0x01#define KQ_SLEEP 0x02#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */#define KQ_ASYNC 0x08#define KQ_CLOSING 0x10#define KQ_TASKSCHED 0x20 /* task scheduled */#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */ int kq_knlistsize; /* size of knlist */ struct klist *kq_knlist; /* list of knotes */ u_long kq_knhashmask; /* size of knhash */ struct klist *kq_knhash; /* hash table for knotes */ struct task kq_task; struct ucred *kq_cred;};
Registration
kqueue
kqueue本身作为文件抽象看待,在OFT里注册entry创建内核对象并赋予descriptor索引。hash和内部的array并不分配。
kevent
intkevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout){ return (((int (*)(int, const struct kevent *, int, struct kevent *, int, const struct timespec *)) __libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges, eventlist, nevents, timeout));}
这里调用了kqueue_register来对changeList进行注册。首先根据线程和fd获取文件的FCB,kq对于fp引用计数++,然后调用实际的注册函数。注册的代码太长了,大体就是先根据寻找knote节点,找不到如果是EV_ADD则增加knote,否则把事件增加到knote上去。
int kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag){ struct kqueue *kq; struct file *fp; cap_rights_t rights; int error; error = fget(td, fd, cap_rights_init(&rights;, CAP_KQUEUE_CHANGE), &fp;); if (error != 0) return (error); if ((error = kqueue_acquire(fp, &kq;)) != 0) goto noacquire; error = kqueue_register(kq, kev, td, mflag); kqueue_release(kq, 0);noacquire: fdrop(fp, td); return (error);}
Filter
filter的作用就是对于事件源进行过滤,事件源所有的活动都会调用filter,但是只有符合filter规则的事件才会报告给应用,也就是返回布尔值,同时他也会修改fflags和data产生副作用(上面提到的输出参数)。filter封装了事件,kqueue只能询问他是否活跃,而对事件的细节一无所知。因此只需要增加filter,就能拓展事件的内容。
Activity
在所有触发这些活动的地方插入hook函数,调用knote()函数遍历自己维护的klist(注册的时候维护的),调用filter。
如果事件触发则激活,通过knote找到其所属的kqueue,并且将knote加入kqueue的active链末尾。如果已经在了,那么不用增加knote,但是filter还是会记录activity(e.g.上文提到的副作用)。
这里有些special case,例如fork需要看是不是TRACK,来判断是否报告子节点的PID
首先,激活父进程的knote,然后创建新的knote分配给子节点,并且设置CHILD flag和对应的父进程PID。同时这里还提到了可能存在事件可能改变data,因此为EXIT额外分配一个节点。
/* * Activate existing knote and register tracking knotes with * new process. * * First register a knote to get just the child notice. This * must be a separate note from a potential NOTE_EXIT * notification since both NOTE_CHILD and NOTE_EXIT are defined * to use the data field (in conflicting ways). */ kev.ident = pid; kev.filter = kn->kn_filter; kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT | EV_FLAG2; kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ error = kqueue_register(kq, &kev;, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; /* * Then register another knote to track other potential events * from the new process. */ kev.ident = pid; kev.filter = kn->kn_filter; kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1; kev.fflags = kn->kn_sfflags; kev.data = kn->kn_id; /* parent */ kev.udata = kn->kn_kevent.udata;/* preserve udata */ error = kqueue_register(kq, &kev;, NULL, M_NOWAIT); if (error) kn->kn_fflags |= NOTE_TRACKERR; if (kn->kn_fop->f_event(kn, NOTE_FORK)) KNOTE_ACTIVATE(kn, 0); list->kl_lock(list->kl_lockarg); KQ_LOCK(kq); kn_leave_flux(kn); KQ_UNLOCK_FLUX(kq);
Delivery
kqueue_scan在active链末尾加入哨兵,如果scan时扔出了哨兵,那么遍历结束。
每次都从active移除一个节点(注意检查timeout,过期也要移除,DISABLE也是在这里移除),如果不是ONESHOP,那么filter带着query hint重新检查一遍是否激活,防止途中又被修改。
确认激活的knote的信息将会拷贝到kevnet通过eventlist返回给应用进行通知。如果ONESHOP则直接从kqueue中移除,否则如果filter看它仍然active,就把它重新放到active链末尾(上次扫描的哨兵之后)。直到哨兵被出列,scan完成。
Miscellaneous Notes
1.论文的版本fork的时候不复制kqueue的df除非vfork。如果复制的话需要在fork时进行整个kqueue复制或者标记为COW。(现在不知道是不是这么做的)
2.kqueue是通过维护klist来对整条链涉及的所有进程进行通知的,而不是像poll或者select那样在sellInfo持有pid。下面这段话看不懂了,没看过poll不知道啥叫collision。
3.考虑同一个klist有不同类型的filter,调用knode时应该给予额外信息通知他到底是什么事件触发的(例如PROC和SIGNAL容易混淆),因此利用hint确定activity和哪个相关
4. kevent要经历两次拷贝,增加了overhead。因此如果采用AIO更好,kernel直接修改user状态下的control block。那么为什么不这么做呢?根本原因在于如果内核不允许直接写用户态数据的话,bug会更好定位,同时应用也不需要考虑状态。
总结
精妙之处在于kqueue维持在内核中,因此socket如果满了可以直接将knote加入进程kqueue的活跃链,而不需要等到下次syscall的时候再检查。例如,即使我长期不kevent,knote()依然会将他们的activity存储在knote上并且插入active list,下次只需要遍历active list而不需要重头遍历整个queue。
同时因为kqueue有状态,进行修改也开销很小,只需要改变变化的那部分就行了。
看的时候还是有些地方比较难理解,加上源代码也很复杂,如果有纠错请指正。
附录
filechange
struct kevent ev;struct timespec nullts = { 0, 0 };EV_SET(&ev;, fd, EVFILT_VNODE,EV_ADD | EV_ENABLE | EV_CLEAR,NOTE_RENAME | NOTE_WRITE |NOTE_DELETE | NOTE_ATTRIB, 0, 0);kevent(kq, &ev;, 1, NULL, 0, &nullts;);for (;;) {n = kevent(kq, NULL, 0, &ev;, 1, NULL);if (n > 0) {printf("The file was");if (ev.fflags & NOTE_RENAME)printf(" renamed");if (ev.fflags & NOTE_WRITE)printf(" written");if (ev.fflags & NOTE_DELETE)printf(" deleted");if (ev.fflags & NOTE_ATTRIB)printf(" chmod/chowned");printf("n");}
signal
struct kevent ev;struct timespec nullts = { 0, 0 };EV_SET(&ev;, SIGHUP, EVFILT_SIGNAL,EV_ADD | EV_ENABLE, 0, 0, 0);kevent(kq, &ev;, 1, NULL, 0, &nullts;);signal(SIGHUP, SIG_IGN);for (;;) {n = kevent(kq, NULL, 0, &ev;, 1, NULL);if (n > 0)printf("signal %d delivered"" %d timesn",ev.ident, ev.data);}
udata
int i, n;struct timespec timeout ={ TMOUT_SEC, TMOUT_NSEC };void (* fcn)(struct kevent *);n = kevent(kq, ch, nchanges,ev, nevents, &timeout;);if (n <= 0)goto error_or_timeout;for (i = 0; i < n; i++) {if (evi.flags & EV_ERROR)/* error */fcn = evi.udata;fcn(&evi;);}