1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.selector.kqueue; 13 14 import kiss.core; 15 import kiss.event.core; 16 17 // dfmt off 18 version(Kqueue): 19 // dfmt on 20 21 import kiss.event.core; 22 import kiss.event.socket.common; 23 // import kiss.event.socket.posix; 24 import kiss.event.timer.kqueue; 25 26 import std.exception; 27 import std.socket; 28 29 import std.string; 30 31 import core.time; 32 import core.stdc.string; 33 import core.stdc.errno; 34 import core.sys.posix.sys.types; // for ssize_t, size_t 35 import core.sys.posix.signal; 36 import core.sys.posix.netinet.tcp; 37 import core.sys.posix.netinet.in_; 38 import core.sys.posix.unistd; 39 import core.sys.posix.time; 40 41 /** 42 */ 43 class AbstractSelector : Selector 44 { 45 this() 46 { 47 _kqueueFD = kqueue(); 48 _event = new KqueueEventChannel(this); 49 register(_event); 50 } 51 52 ~this() 53 { 54 dispose(); 55 } 56 57 void dispose() 58 { 59 if(isDisposed) 60 return; 61 isDisposed = true; 62 deregister(_event); 63 core.sys.posix.unistd.close(_kqueueFD); 64 } 65 private bool isDisposed = false; 66 67 override bool register(AbstractChannel watcher) 68 { 69 assert(watcher !is null); 70 71 int err = -1; 72 if (watcher.type == WatcherType.Timer) 73 { 74 kevent_t ev; 75 AbstractTimer watch = cast(AbstractTimer) watcher; 76 if (watch is null) 77 return false; 78 size_t time = watch.time < 20 ? 20 : watch.time; // in millisecond 79 EV_SET(&ev, watch.handle, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, 80 0, time, cast(void*) watcher); 81 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 82 } 83 else 84 { 85 const int fd = watcher.handle; 86 if (fd < 0) 87 return false; 88 kevent_t[2] ev = void; 89 short read = EV_ADD | EV_ENABLE; 90 short write = EV_ADD | EV_ENABLE; 91 if (watcher.flag(WatchFlag.ETMode)) 92 { 93 read |= EV_CLEAR; 94 write |= EV_CLEAR; 95 } 96 EV_SET(&(ev[0]), fd, EVFILT_READ, read, 0, 0, cast(void*) watcher); 97 EV_SET(&(ev[1]), fd, EVFILT_WRITE, write, 0, 0, cast(void*) watcher); 98 if (watcher.flag(WatchFlag.Read) && watcher.flag(WatchFlag.Write)) 99 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 100 else if (watcher.flag(WatchFlag.Read)) 101 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 102 else if (watcher.flag(WatchFlag.Write)) 103 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 104 } 105 if (err < 0) 106 { 107 return false; 108 } 109 // watcher.currtLoop = this; 110 _event.setNext(watcher); 111 return true; 112 } 113 114 override bool reregister(AbstractChannel watcher) 115 { 116 throw new LoopException("The Kqueue does not support reregister!"); 117 //return false; 118 } 119 120 override bool deregister(AbstractChannel watcher) 121 { 122 assert(watcher !is null); 123 const fd = watcher.handle; 124 if (fd < 0) 125 return false; 126 127 int err = -1; 128 if (watcher.type == WatcherType.Timer) 129 { 130 kevent_t ev; 131 AbstractTimer watch = cast(AbstractTimer) watcher; 132 if (watch is null) 133 return false; 134 EV_SET(&ev, fd, EVFILT_TIMER, EV_DELETE, 0, 0, cast(void*) watcher); 135 err = kevent(_kqueueFD, &ev, 1, null, 0, null); 136 } 137 else 138 { 139 kevent_t[2] ev = void; 140 EV_SET(&(ev[0]), fd, EVFILT_READ, EV_DELETE, 0, 0, cast(void*) watcher); 141 EV_SET(&(ev[1]), fd, EVFILT_WRITE, EV_DELETE, 0, 0, cast(void*) watcher); 142 if (watcher.flag(WatchFlag.Read) && watcher.flag(WatchFlag.Write)) 143 err = kevent(_kqueueFD, &(ev[0]), 2, null, 0, null); 144 else if (watcher.flag(WatchFlag.Read)) 145 err = kevent(_kqueueFD, &(ev[0]), 1, null, 0, null); 146 else if (watcher.flag(WatchFlag.Write)) 147 err = kevent(_kqueueFD, &(ev[1]), 1, null, 0, null); 148 } 149 if (err < 0) 150 { 151 return false; 152 } 153 // watcher.currtLoop = null; 154 watcher.clear(); 155 return true; 156 } 157 158 // override bool weakUp() 159 // { 160 // _event.call(); 161 // return true; 162 // } 163 164 // while(true) 165 void onLoop(scope void delegate() weak) 166 { 167 _runing = true; 168 auto tspec = timespec(1, 1000 * 10); 169 do 170 { 171 weak(); 172 kevent_t[64] events; 173 auto len = kevent(_kqueueFD, null, 0, events.ptr, events.length, &tspec); 174 if (len < 1) 175 continue; 176 foreach (i; 0 .. len) 177 { 178 AbstractChannel watch = cast(AbstractChannel)(events[i].udata); 179 if ((events[i].flags & EV_EOF) || (events[i].flags & EV_ERROR)) 180 { 181 watch.close(); 182 continue; 183 } 184 if (watch.type == WatcherType.Timer) 185 { 186 watch.onRead(); 187 continue; 188 } 189 if ((events[i].filter & EVFILT_WRITE) && watch.isRegistered) 190 { 191 // import kiss.logger; 192 // version(KissDebugMode) trace("The channel socket is: ", typeid(watch)); 193 AbstractSocketChannel wt = cast(AbstractSocketChannel) watch; 194 assert(wt !is null); 195 wt.onWriteDone(); 196 } 197 198 if ((events[i].filter & EVFILT_READ) && watch.isRegistered) 199 watch.onRead(); 200 } 201 } 202 while (_runing); 203 } 204 205 override void stop() 206 { 207 _runing = false; 208 } 209 210 private: 211 bool _runing; 212 int _kqueueFD; 213 EventChannel _event; 214 } 215 216 /** 217 */ 218 class KqueueEventChannel : EventChannel 219 { 220 this(Selector loop) 221 { 222 super(loop); 223 setFlag(WatchFlag.Read, true); 224 _pair = socketPair(); 225 _pair[0].blocking = false; 226 _pair[1].blocking = false; 227 this.handle = _pair[1].handle; 228 } 229 230 ~this() 231 { 232 close(); 233 } 234 235 override void call() 236 { 237 _pair[0].send("call"); 238 } 239 240 override void onRead() 241 { 242 ubyte[128] data; 243 while (true) 244 { 245 if (_pair[1].receive(data) <= 0) 246 break; 247 } 248 249 super.onRead(); 250 } 251 252 // mixin OverrideErro; 253 254 Socket[2] _pair; 255 } 256 257 enum : short 258 { 259 EVFILT_READ = -1, 260 EVFILT_WRITE = -2, 261 EVFILT_AIO = -3, /* attached to aio requests */ 262 EVFILT_VNODE = -4, /* attached to vnodes */ 263 EVFILT_PROC = -5, /* attached to struct proc */ 264 EVFILT_SIGNAL = -6, /* attached to struct proc */ 265 EVFILT_TIMER = -7, /* timers */ 266 EVFILT_MACHPORT = -8, /* Mach portsets */ 267 EVFILT_FS = -9, /* filesystem events */ 268 EVFILT_USER = -10, /* User events */ 269 EVFILT_VM = -12, /* virtual memory events */ 270 EVFILT_SYSCOUNT = 11 271 } 272 273 extern (D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) @nogc nothrow 274 { 275 *kevp = kevent_t(args); 276 } 277 278 struct kevent_t 279 { 280 uintptr_t ident; /* identifier for this event */ 281 short filter; /* filter for event */ 282 ushort flags; 283 uint fflags; 284 intptr_t data; 285 void* udata; /* opaque user data identifier */ 286 } 287 288 enum 289 { 290 /* actions */ 291 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 292 EV_DELETE = 0x0002, /* delete event from kq */ 293 EV_ENABLE = 0x0004, /* enable event */ 294 EV_DISABLE = 0x0008, /* disable event (not reported) */ 295 296 /* flags */ 297 EV_ONESHOT = 0x0010, /* only report one occurrence */ 298 EV_CLEAR = 0x0020, /* clear event state after reporting */ 299 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 300 EV_DISPATCH = 0x0080, /* disable event after reporting */ 301 302 EV_SYSFLAGS = 0xF000, /* reserved by system */ 303 EV_FLAG1 = 0x2000, /* filter-specific flag */ 304 305 /* returned values */ 306 EV_EOF = 0x8000, /* EOF detected */ 307 EV_ERROR = 0x4000, /* error, data contains errno */ 308 309 310 311 } 312 313 enum 314 { 315 /* 316 * data/hint flags/masks for EVFILT_USER, shared with userspace 317 * 318 * On input, the top two bits of fflags specifies how the lower twenty four 319 * bits should be applied to the stored value of fflags. 320 * 321 * On output, the top two bits will always be set to NOTE_FFNOP and the 322 * remaining twenty four bits will contain the stored fflags value. 323 */ 324 NOTE_FFNOP = 0x00000000, /* ignore input fflags */ 325 NOTE_FFAND = 0x40000000, /* AND fflags */ 326 NOTE_FFOR = 0x80000000, /* OR fflags */ 327 NOTE_FFCOPY = 0xc0000000, /* copy fflags */ 328 NOTE_FFCTRLMASK = 0xc0000000, /* masks for operations */ 329 NOTE_FFLAGSMASK = 0x00ffffff, 330 331 NOTE_TRIGGER = 0x01000000, /* Cause the event to be 332 triggered for output. */ 333 334 /* 335 * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace 336 */ 337 NOTE_LOWAT = 0x0001, /* low water mark */ 338 339 /* 340 * data/hint flags for EVFILT_VNODE, shared with userspace 341 */ 342 NOTE_DELETE = 0x0001, /* vnode was removed */ 343 NOTE_WRITE = 0x0002, /* data contents changed */ 344 NOTE_EXTEND = 0x0004, /* size increased */ 345 NOTE_ATTRIB = 0x0008, /* attributes changed */ 346 NOTE_LINK = 0x0010, /* link count changed */ 347 NOTE_RENAME = 0x0020, /* vnode was renamed */ 348 NOTE_REVOKE = 0x0040, /* vnode access was revoked */ 349 350 /* 351 * data/hint flags for EVFILT_PROC, shared with userspace 352 */ 353 NOTE_EXIT = 0x80000000, /* process exited */ 354 NOTE_FORK = 0x40000000, /* process forked */ 355 NOTE_EXEC = 0x20000000, /* process exec'd */ 356 NOTE_PCTRLMASK = 0xf0000000, /* mask for hint bits */ 357 NOTE_PDATAMASK = 0x000fffff, /* mask for pid */ 358 359 /* additional flags for EVFILT_PROC */ 360 NOTE_TRACK = 0x00000001, /* follow across forks */ 361 NOTE_TRACKERR = 0x00000002, /* could not track child */ 362 NOTE_CHILD = 0x00000004, /* am a child process */ 363 364 365 366 } 367 368 extern (C) 369 { 370 int kqueue() @nogc nothrow; 371 int kevent(int kq, const kevent_t* changelist, int nchanges, 372 kevent_t* eventlist, int nevents, const timespec* timeout) @nogc nothrow; 373 } 374 375 enum SO_REUSEPORT = 0x0200;