1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.selector.epoll; 13 14 // dfmt off 15 version(linux): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 import kiss.logger; 23 24 import core.time; 25 import core.stdc.string; 26 import core.stdc.errno; 27 import core.sys.posix.sys.types; // for ssize_t, size_t 28 import core.sys.posix.netinet.tcp; 29 import core.sys.posix.netinet.in_; 30 import core.sys.posix.unistd; 31 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC; 32 33 import kiss.event.core; 34 import kiss.event.socket; 35 import kiss.event.timer; 36 import kiss.event.timer.epoll; 37 38 /** 39 */ 40 class AbstractSelector : Selector 41 { 42 this() 43 { 44 _epollFD = epoll_create1(0); 45 _event = new EpollEventChannel(this); 46 register(_event); 47 } 48 49 ~this() 50 { 51 dispose(); 52 } 53 54 void dispose() 55 { 56 if(isDisposed) 57 return; 58 isDisposed = true; 59 deregister(_event); 60 core.sys.posix.unistd.close(_epollFD); 61 } 62 private bool isDisposed = false; 63 64 65 override bool register(AbstractChannel watcher) 66 { 67 assert(watcher !is null); 68 69 if (watcher.type == WatcherType.Timer) 70 { 71 auto wt = cast(AbstractTimer) watcher; 72 if (wt !is null) 73 wt.setTimer(); 74 } 75 76 // version(KissDebugMode) infof("register, watcher(fd=%d)", watcher.handle); 77 const fd = watcher.handle; 78 assert(fd >= 0, "The watcher.handle is not initilized!"); 79 80 // if(fd < 0) return false; 81 epoll_event ev = buildEpollEvent(watcher); 82 if ((epoll_ctl(_epollFD, EPOLL_CTL_ADD, fd, &ev)) != 0) 83 { 84 if (errno != EEXIST) 85 return false; 86 } 87 88 _event.setNext(watcher); 89 return true; 90 } 91 92 override bool reregister(AbstractChannel watcher) 93 { 94 assert(watcher !is null); 95 const int fd = watcher.handle; 96 if (fd < 0) 97 return false; 98 auto ev = buildEpollEvent(watcher); 99 return epoll_ctl(_epollFD, EPOLL_CTL_MOD, fd, &ev) == 0; 100 } 101 102 override bool deregister(AbstractChannel watcher) 103 { 104 assert(watcher !is null); 105 // version(KissDebugMode) infof("unregister watcher(fd=%d)", watcher.handle); 106 107 const int fd = watcher.handle; 108 if (fd < 0) 109 return false; 110 111 if ((epoll_ctl(_epollFD, EPOLL_CTL_DEL, fd, null)) != 0) 112 { 113 errorf("unregister failed, watcher.handle=%d", watcher.handle); 114 return false; 115 } 116 // TODO: check this 117 // watcher.currtLoop = null; 118 // watcher.clear(); 119 return true; 120 } 121 122 123 void onLoop(scope void delegate() weak) 124 { 125 _runing = true; 126 do 127 { 128 weak(); 129 handleEpollEvent(); 130 } 131 while (_runing); 132 } 133 134 private void handleEpollEvent() 135 { 136 epoll_event[64] events; 137 const int len = epoll_wait(_epollFD, events.ptr, events.length, 10); 138 foreach (i; 0 .. len) 139 { 140 AbstractChannel watch = cast(AbstractChannel)(events[i].data.ptr); 141 if (watch is null) 142 { 143 warningf("watcher(fd=%d) is null", watch.handle); 144 continue; 145 } 146 147 if (isErro(events[i].events)) 148 { 149 version(KissDebugMode) warning("close event: ", watch.handle); 150 watch.close(); 151 continue; 152 } 153 154 if (watch.isRegistered && isRead(events[i].events)) 155 { 156 watch.onRead(); 157 } 158 159 if (watch.isRegistered && isWrite(events[i].events)) 160 { 161 AbstractSocketChannel wt = cast(AbstractSocketChannel) watch; 162 assert(wt !is null); 163 wt.onWriteDone(); 164 // watch.onWrite(); 165 } 166 } 167 } 168 169 override void stop() 170 { 171 _runing = false; 172 } 173 174 protected: 175 bool isErro(uint events) nothrow 176 { 177 return (events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0; 178 } 179 180 bool isRead(uint events) nothrow 181 { 182 return (events & EPOLLIN) != 0; 183 } 184 185 bool isWrite(uint events) nothrow 186 { 187 return (events & EPOLLOUT) != 0; 188 } 189 190 static epoll_event buildEpollEvent(AbstractChannel watch) 191 { 192 epoll_event ev; 193 ev.data.ptr = cast(void*) watch; 194 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 195 if (watch.flag(WatchFlag.Read)) 196 ev.events |= EPOLLIN; 197 if (watch.flag(WatchFlag.Write)) 198 ev.events |= EPOLLOUT; 199 if (watch.flag(WatchFlag.OneShot)) 200 ev.events |= EPOLLONESHOT; 201 if (watch.flag(WatchFlag.ETMode)) 202 ev.events |= EPOLLET; 203 return ev; 204 } 205 206 private: 207 bool _runing; 208 int _epollFD; 209 EventChannel _event; 210 } 211 212 /** 213 */ 214 class EpollEventChannel : EventChannel 215 { 216 alias UlongObject = BaseTypeObject!ulong; 217 this(Selector loop) 218 { 219 super(loop); 220 setFlag(WatchFlag.Read, true); 221 _readBuffer = new UlongObject(); 222 this.handle = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 223 } 224 225 ~this() 226 { 227 close(); 228 } 229 230 override void call() 231 { 232 ulong value = 1; 233 core.sys.posix.unistd.write(this.handle, &value, value.sizeof); 234 } 235 236 override void onRead() 237 { 238 readEvent((Object obj) { }); 239 super.onRead(); 240 } 241 242 bool readEvent(scope ReadCallBack read) 243 { 244 this.clearError(); 245 ulong value; 246 core.sys.posix.unistd.read(this.handle, &value, value.sizeof); 247 this._readBuffer.data = value; 248 if (read) 249 read(this._readBuffer); 250 return false; 251 } 252 253 UlongObject _readBuffer; 254 } 255 256 enum 257 { 258 EFD_SEMAPHORE = 0x1, 259 EFD_CLOEXEC = 0x80000, 260 EFD_NONBLOCK = 0x800 261 }; 262 263 enum 264 { 265 EPOLL_CLOEXEC = 0x80000, 266 EPOLL_NONBLOCK = 0x800 267 } 268 269 enum 270 { 271 EPOLLIN = 0x001, 272 EPOLLPRI = 0x002, 273 EPOLLOUT = 0x004, 274 EPOLLRDNORM = 0x040, 275 EPOLLRDBAND = 0x080, 276 EPOLLWRNORM = 0x100, 277 EPOLLWRBAND = 0x200, 278 EPOLLMSG = 0x400, 279 EPOLLERR = 0x008, 280 EPOLLHUP = 0x010, 281 EPOLLRDHUP = 0x2000, // since Linux 2.6.17 282 EPOLLONESHOT = 1u << 30, 283 EPOLLET = 1u << 31 284 } 285 286 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ 287 enum 288 { 289 EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface. 290 EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface. 291 EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure. 292 } 293 294 295 296 // dfmt off 297 extern (C) : @system : nothrow : 298 // dfmt on 299 300 align(1) struct epoll_event 301 { 302 align(1): 303 uint events; 304 epoll_data_t data; 305 } 306 307 union epoll_data_t 308 { 309 void* ptr; 310 int fd; 311 uint u32; 312 ulong u64; 313 } 314 315 int epoll_create(int size); 316 int epoll_create1(int flags); 317 int epoll_ctl(int epfd, int op, int fd, epoll_event* event); 318 int epoll_wait(int epfd, epoll_event* events, int maxevents, int timeout); 319 320 socket_t eventfd(uint initval, int flags); 321