1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.selector.epoll; 13 14 // dfmt off 15 version(linux): 16 17 // dfmt on 18 19 import std.exception; 20 import std.socket; 21 import std.string; 22 import kiss.logger; 23 24 import core.time; 25 import core.stdc.string; 26 import core.stdc.errno; 27 import core.sys.posix.sys.types; // for ssize_t, size_t 28 import core.sys.posix.netinet.tcp; 29 import core.sys.posix.netinet.in_; 30 import core.sys.posix.unistd; 31 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC; 32 33 import kiss.event.core; 34 import kiss.event.socket; 35 import kiss.event.timer; 36 import kiss.event.timer.epoll; 37 38 /** 39 */ 40 class AbstractSelector : Selector 41 { 42 this() 43 { 44 _epollFD = epoll_create1(0); 45 _event = new EpollEventChannel(this); 46 register(_event); 47 } 48 49 ~this() 50 { 51 dispose(); 52 } 53 54 void dispose() 55 { 56 if(isDisposed) 57 return; 58 isDisposed = true; 59 deregister(_event); 60 core.sys.posix.unistd.close(_epollFD); 61 } 62 private bool isDisposed = false; 63 64 65 override bool register(AbstractChannel watcher) 66 { 67 assert(watcher !is null); 68 69 if (watcher.type == WatcherType.Timer) 70 { 71 auto wt = cast(AbstractTimer) watcher; 72 if (wt !is null) 73 wt.setTimer(); 74 } 75 76 // version(KissDebugMode) infof("register, watcher(fd=%d)", watcher.handle); 77 const fd = watcher.handle; 78 assert(fd >= 0, "The watcher.handle is not initilized!"); 79 80 // if(fd < 0) return false; 81 epoll_event ev = buildEpollEvent(watcher); 82 if ((epoll_ctl(_epollFD, EPOLL_CTL_ADD, fd, &ev)) != 0) 83 { 84 if (errno != EEXIST) 85 return false; 86 } 87 88 _event.setNext(watcher); 89 return true; 90 } 91 92 override bool reregister(AbstractChannel watcher) 93 { 94 assert(watcher !is null); 95 const int fd = watcher.handle; 96 if (fd < 0) 97 return false; 98 auto ev = buildEpollEvent(watcher); 99 return epoll_ctl(_epollFD, EPOLL_CTL_MOD, fd, &ev) == 0; 100 } 101 102 override bool deregister(AbstractChannel watcher) 103 { 104 assert(watcher !is null); 105 // version(KissDebugMode) infof("unregister watcher(fd=%d)", watcher.handle); 106 107 const int fd = watcher.handle; 108 if (fd < 0) 109 return false; 110 111 if ((epoll_ctl(_epollFD, EPOLL_CTL_DEL, fd, null)) != 0) 112 { 113 errorf("unregister failed, watcher.handle=%d", watcher.handle); 114 return false; 115 } 116 // TODO: check this 117 // watcher.clear(); 118 return true; 119 } 120 121 122 void onLoop(scope void delegate() weak) 123 { 124 _runing = true; 125 do 126 { 127 weak(); 128 handleEpollEvent(); 129 } 130 while (_runing); 131 } 132 133 private void handleEpollEvent() 134 { 135 epoll_event[64] events; 136 const int len = epoll_wait(_epollFD, events.ptr, events.length, 10); 137 foreach (i; 0 .. len) 138 { 139 AbstractChannel watch = cast(AbstractChannel)(events[i].data.ptr); 140 if (watch is null) 141 { 142 version(KissDebugMode) warningf("watcher is null"); 143 continue; 144 } 145 146 if (isErro(events[i].events)) 147 { 148 version(KissDebugMode) warning("close event: ", watch.handle); 149 watch.close(); 150 continue; 151 } 152 153 if (watch.isRegistered && isRead(events[i].events)) 154 { 155 watch.onRead(); 156 } 157 158 if (watch.isRegistered && isWrite(events[i].events)) 159 { 160 AbstractSocketChannel wt = cast(AbstractSocketChannel) watch; 161 assert(wt !is null); 162 wt.onWriteDone(); 163 // watch.onWrite(); 164 } 165 } 166 } 167 168 override void stop() 169 { 170 _runing = false; 171 } 172 173 protected: 174 bool isErro(uint events) nothrow 175 { 176 return (events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0; 177 } 178 179 bool isRead(uint events) nothrow 180 { 181 return (events & EPOLLIN) != 0; 182 } 183 184 bool isWrite(uint events) nothrow 185 { 186 return (events & EPOLLOUT) != 0; 187 } 188 189 static epoll_event buildEpollEvent(AbstractChannel watch) 190 { 191 epoll_event ev; 192 ev.data.ptr = cast(void*) watch; 193 ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP; 194 if (watch.flag(WatchFlag.Read)) 195 ev.events |= EPOLLIN; 196 if (watch.flag(WatchFlag.Write)) 197 ev.events |= EPOLLOUT; 198 if (watch.flag(WatchFlag.OneShot)) 199 ev.events |= EPOLLONESHOT; 200 if (watch.flag(WatchFlag.ETMode)) 201 ev.events |= EPOLLET; 202 return ev; 203 } 204 205 private: 206 bool _runing; 207 int _epollFD; 208 EventChannel _event; 209 } 210 211 /** 212 */ 213 class EpollEventChannel : EventChannel 214 { 215 alias UlongObject = BaseTypeObject!ulong; 216 this(Selector loop) 217 { 218 super(loop); 219 setFlag(WatchFlag.Read, true); 220 _readBuffer = new UlongObject(); 221 this.handle = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 222 } 223 224 ~this() 225 { 226 close(); 227 } 228 229 override void call() 230 { 231 ulong value = 1; 232 core.sys.posix.unistd.write(this.handle, &value, value.sizeof); 233 } 234 235 override void onRead() 236 { 237 readEvent((Object obj) { }); 238 super.onRead(); 239 } 240 241 bool readEvent(scope ReadCallBack read) 242 { 243 this.clearError(); 244 ulong value; 245 core.sys.posix.unistd.read(this.handle, &value, value.sizeof); 246 this._readBuffer.data = value; 247 if (read) 248 read(this._readBuffer); 249 return false; 250 } 251 252 UlongObject _readBuffer; 253 } 254 255 enum 256 { 257 EFD_SEMAPHORE = 0x1, 258 EFD_CLOEXEC = 0x80000, 259 EFD_NONBLOCK = 0x800 260 }; 261 262 enum 263 { 264 EPOLL_CLOEXEC = 0x80000, 265 EPOLL_NONBLOCK = 0x800 266 } 267 268 enum 269 { 270 EPOLLIN = 0x001, 271 EPOLLPRI = 0x002, 272 EPOLLOUT = 0x004, 273 EPOLLRDNORM = 0x040, 274 EPOLLRDBAND = 0x080, 275 EPOLLWRNORM = 0x100, 276 EPOLLWRBAND = 0x200, 277 EPOLLMSG = 0x400, 278 EPOLLERR = 0x008, 279 EPOLLHUP = 0x010, 280 EPOLLRDHUP = 0x2000, // since Linux 2.6.17 281 EPOLLONESHOT = 1u << 30, 282 EPOLLET = 1u << 31 283 } 284 285 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ 286 enum 287 { 288 EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface. 289 EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface. 290 EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure. 291 } 292 293 294 295 // dfmt off 296 extern (C) : @system : nothrow : 297 // dfmt on 298 299 align(1) struct epoll_event 300 { 301 align(1): 302 uint events; 303 epoll_data_t data; 304 } 305 306 union epoll_data_t 307 { 308 void* ptr; 309 int fd; 310 uint u32; 311 ulong u64; 312 } 313 314 int epoll_create(int size); 315 int epoll_create1(int flags); 316 int epoll_ctl(int epfd, int op, int fd, epoll_event* event); 317 int epoll_wait(int epfd, epoll_event* events, int maxevents, int timeout); 318 319 socket_t eventfd(uint initval, int flags); 320