1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.selector.kqueue;
13 
14 import kiss.core;
15 import kiss.event.core;
16 
17 // dfmt off
18 version(Kqueue):
19 // dfmt on
20 
21 import kiss.event.core;
22 import kiss.event.socket.common;
23 // import kiss.event.socket.posix;
24 import kiss.event.timer.kqueue;
25 
26 import std.exception;
27 import std.socket;
28 
29 import std.string;
30 
31 import core.time;
32 import core.stdc.string;
33 import core.stdc.errno;
34 import core.sys.posix.sys.types; // for ssize_t, size_t
35 import core.sys.posix.signal;
36 import core.sys.posix.netinet.tcp;
37 import core.sys.posix.netinet.in_;
38 import core.sys.posix.unistd;
39 import core.sys.posix.time;
40 
41 /**
42 */
43 class AbstractSelector : Selector
44 {
45     this()
46     {
47         _kqueueFD = kqueue();
48         _event = new KqueueEventChannel(this);
49         register(_event);
50     }
51 
52     ~this()
53     {
54         dispose();
55     }
56 
57     void dispose()
58     {
59         if(isDisposed)
60             return;
61         isDisposed = true;
62         deregister(_event);
63         core.sys.posix.unistd.close(_kqueueFD);
64     }
65     private bool isDisposed = false;
66 
67     override bool register(AbstractChannel watcher)
68     {
69         assert(watcher !is null);
70 
71         int err = -1;
72         if (watcher.type == WatcherType.Timer)
73         {
74             kevent_t ev;
75             AbstractTimer watch = cast(AbstractTimer) watcher;
76             if (watch is null)
77                 return false;
78             size_t time = watch.time < 20 ? 20 : watch.time; // in millisecond
79             EV_SET(&ev, watch.handle, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR,
80                     0, time, cast(void*) watcher);
81             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
82         }
83         else
84         {
85             const int fd = watcher.handle;
86             if (fd < 0)
87                 return false;
88             kevent_t[2] ev = void;
89             short read = EV_ADD | EV_ENABLE;
90             short write = EV_ADD | EV_ENABLE;
91             if (watcher.flag(WatchFlag.ETMode))
92             {
93                 read |= EV_CLEAR;
94                 write |= EV_CLEAR;
95             }
96             EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) watcher);
97             EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) watcher);
98             if (watcher.flag(WatchFlag.Read) && watcher.flag(WatchFlag.Write))
99                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
100             else if (watcher.flag(WatchFlag.Read))
101                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
102             else if (watcher.flag(WatchFlag.Write))
103                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
104         }
105         if (err < 0)
106         {
107             return false;
108         }
109         // watcher.currtLoop = this;
110         _event.setNext(watcher);
111         return true;
112     }
113 
114     override bool reregister(AbstractChannel watcher)
115     {
116         throw new LoopException("The Kqueue does not support reregister!");
117         //return false;
118     }
119 
120     override bool deregister(AbstractChannel watcher)
121     {
122         assert(watcher !is null);
123             const fd = watcher.handle;
124             if (fd < 0)
125                 return false;
126 
127         int err = -1;
128         if (watcher.type == WatcherType.Timer)
129         {
130             kevent_t ev;
131             AbstractTimer watch = cast(AbstractTimer) watcher;
132             if (watch is null)
133                 return false;
134             EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) watcher); 
135             err = kevent(_kqueueFD, &ev, 1, null, 0, null);
136         }
137         else
138         {
139             kevent_t[2] ev = void;
140             EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) watcher);
141             EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) watcher);
142             if (watcher.flag(WatchFlag.Read) && watcher.flag(WatchFlag.Write))
143                 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null);
144             else if (watcher.flag(WatchFlag.Read))
145                 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null);
146             else if (watcher.flag(WatchFlag.Write))
147                 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null);
148         }
149         if (err < 0)
150         {
151             return false;
152         }
153         // watcher.currtLoop = null;
154         watcher.clear();
155         return true;
156     }
157 
158     // override bool weakUp()
159     // {
160     //     _event.call();
161     //     return true;
162     // }
163 
164     // while(true)
165     void onLoop(scope void delegate() weak)
166     {
167         _runing = true;
168         auto tspec = timespec(1, 1000 * 10);
169         do
170         {
171             weak();
172             kevent_t[64] events;
173             auto len = kevent(_kqueueFD, null, 0, events.ptr, events.length, &tspec);
174             if (len < 1)
175                 continue;
176             foreach (i; 0 .. len)
177             {
178                 AbstractChannel watch = cast(AbstractChannel)(events[i].udata);
179                 if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR))
180                 {
181                     watch.close();
182                     continue;
183                 }
184                 if (watch.type == WatcherType.Timer)
185                 {
186                     watch.onRead();
187                     continue;
188                 }
189                 if ((events[i].filter & EVFILT_WRITE) && watch.isRegistered)
190                 {
191                     // import kiss.logger;
192                     // version(KissDebugMode) trace("The channel socket is: ", typeid(watch));
193                     AbstractSocketChannel wt = cast(AbstractSocketChannel) watch;
194                     assert(wt !is null);
195                     wt.onWriteDone();
196                 }
197 
198                 if ((events[i].filter & EVFILT_READ) && watch.isRegistered)
199                     watch.onRead();
200             }
201         }
202         while (_runing);
203     }
204 
205     override void stop()
206     {
207         _runing = false;
208     }
209 
210 private:
211     bool _runing;
212     int _kqueueFD;
213     EventChannel _event;
214 }
215 
216 /**
217 */
218 class KqueueEventChannel : EventChannel
219 {
220     this(Selector loop)
221     {
222         super(loop);
223         setFlag(WatchFlag.Read, true);
224         _pair = socketPair();
225         _pair[0].blocking = false;
226         _pair[1].blocking = false;
227         this.handle = _pair[1].handle;
228     }
229 
230     ~this()
231     {
232         close();
233     }
234 
235     override void call()
236     {
237         _pair[0].send("call");
238     }
239 
240     override void onRead()
241     {
242         ubyte[128] data;
243         while (true)
244         {
245             if (_pair[1].receive(data) <= 0)
246                 break;
247         }
248 
249         super.onRead();
250     }
251 
252     // mixin OverrideErro;
253 
254     Socket[2] _pair;
255 }
256 
257 enum : short
258 {
259     EVFILT_READ = -1,
260     EVFILT_WRITE = -2,
261     EVFILT_AIO = -3, /* attached to aio requests */
262     EVFILT_VNODE = -4, /* attached to vnodes */
263     EVFILT_PROC = -5, /* attached to struct proc */
264     EVFILT_SIGNAL = -6, /* attached to struct proc */
265     EVFILT_TIMER = -7, /* timers */
266     EVFILT_MACHPORT = -8, /* Mach portsets */
267     EVFILT_FS = -9, /* filesystem events */
268     EVFILT_USER = -10, /* User events */
269     EVFILT_VM = -12, /* virtual memory events */
270     EVFILT_SYSCOUNT = 11
271 }
272 
273 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) @nogc nothrow
274 {
275     *kevp = kevent_t(args);
276 }
277 
278 struct kevent_t
279 {
280     uintptr_t ident; /* identifier for this event */
281     short filter; /* filter for event */
282     ushort flags;
283     uint fflags;
284     intptr_t data;
285     void* udata; /* opaque user data identifier */
286 }
287 
288 enum
289 {
290     /* actions */
291     EV_ADD = 0x0001, /* add event to kq (implies enable) */
292     EV_DELETE = 0x0002, /* delete event from kq */
293     EV_ENABLE = 0x0004, /* enable event */
294     EV_DISABLE = 0x0008, /* disable event (not reported) */
295 
296     /* flags */
297     EV_ONESHOT = 0x0010, /* only report one occurrence */
298     EV_CLEAR = 0x0020, /* clear event state after reporting */
299     EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
300     EV_DISPATCH = 0x0080, /* disable event after reporting */
301 
302     EV_SYSFLAGS = 0xF000, /* reserved by system */
303     EV_FLAG1 = 0x2000, /* filter-specific flag */
304 
305     /* returned values */
306     EV_EOF = 0x8000, /* EOF detected */
307     EV_ERROR = 0x4000, /* error, data contains errno */
308 
309 
310 
311 }
312 
313 enum
314 {
315     /*
316     * data/hint flags/masks for EVFILT_USER, shared with userspace
317     *
318     * On input, the top two bits of fflags specifies how the lower twenty four
319     * bits should be applied to the stored value of fflags.
320     *
321     * On output, the top two bits will always be set to NOTE_FFNOP and the
322     * remaining twenty four bits will contain the stored fflags value.
323     */
324     NOTE_FFNOP = 0x00000000, /* ignore input fflags */
325     NOTE_FFAND = 0x40000000, /* AND fflags */
326     NOTE_FFOR = 0x80000000, /* OR fflags */
327     NOTE_FFCOPY = 0xc0000000, /* copy fflags */
328     NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */
329     NOTE_FFLAGSMASK = 0x00ffffff,
330 
331     NOTE_TRIGGER = 0x01000000, /* Cause the event to be
332                                     triggered for output. */
333 
334     /*
335     * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
336     */
337     NOTE_LOWAT = 0x0001, /* low water mark */
338 
339     /*
340     * data/hint flags for EVFILT_VNODE, shared with userspace
341     */
342     NOTE_DELETE = 0x0001, /* vnode was removed */
343     NOTE_WRITE = 0x0002, /* data contents changed */
344     NOTE_EXTEND = 0x0004, /* size increased */
345     NOTE_ATTRIB = 0x0008, /* attributes changed */
346     NOTE_LINK = 0x0010, /* link count changed */
347     NOTE_RENAME = 0x0020, /* vnode was renamed */
348     NOTE_REVOKE = 0x0040, /* vnode access was revoked */
349 
350     /*
351     * data/hint flags for EVFILT_PROC, shared with userspace
352     */
353     NOTE_EXIT = 0x80000000, /* process exited */
354     NOTE_FORK = 0x40000000, /* process forked */
355     NOTE_EXEC = 0x20000000, /* process exec'd */
356     NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */
357     NOTE_PDATAMASK = 0x000fffff, /* mask for pid */
358 
359     /* additional flags for EVFILT_PROC */
360     NOTE_TRACK = 0x00000001, /* follow across forks */
361     NOTE_TRACKERR = 0x00000002, /* could not track child */
362     NOTE_CHILD = 0x00000004, /* am a child process */
363 
364 
365 
366 }
367 
368 extern (C)
369 {
370     int kqueue() @nogc nothrow;
371     int kevent(int kq, const kevent_t* changelist, int nchanges,
372             kevent_t* eventlist, int nevents, const timespec* timeout) @nogc nothrow;
373 }
374 
375 enum SO_REUSEPORT = 0x0200;