1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.core; 13 14 import kiss.core; 15 16 import std.socket; 17 import std.exception; 18 import std.bitmanip; 19 import kiss.logger; 20 21 alias ReadCallBack = void delegate(Object obj); 22 23 alias DataReceivedHandler = void delegate(in ubyte[] data); 24 alias DataWrittenHandler = void delegate(in ubyte[] data, size_t size); 25 alias AcceptHandler = void delegate(Socket socket); 26 27 @trusted interface ReadTransport : Channel 28 { 29 30 void close(); 31 32 void onRead(AbstractChannel watcher) nothrow; 33 34 void onClose(AbstractChannel watcher) nothrow; 35 } 36 37 @trusted interface WriteTransport : Channel 38 { 39 40 void onWrite(AbstractChannel watcher) nothrow; 41 42 void onClose(AbstractChannel watcher) nothrow; 43 } 44 45 @trusted interface Transport : ReadTransport, WriteTransport 46 { 47 } 48 49 // dfmt on 50 51 interface StreamWriteBuffer 52 { 53 // todo Write Data; 54 const(ubyte)[] sendData(); 55 56 // add send offiset and return is empty 57 bool popSize(size_t size); 58 59 // do send finish 60 void doFinish(); 61 62 StreamWriteBuffer next(); 63 void next(StreamWriteBuffer); 64 } 65 66 alias ChannelBase = AbstractChannel; 67 68 /** 69 */ 70 interface Channel 71 { 72 73 } 74 75 /** 76 */ 77 interface Selector 78 { 79 bool register(AbstractChannel channel); 80 81 bool reregister(AbstractChannel channel); 82 83 bool deregister(AbstractChannel channel); 84 85 void stop(); 86 87 void dispose(); 88 } 89 90 /** 91 */ 92 abstract class AbstractChannel : Channel 93 { 94 socket_t handle = socket_t.init; 95 ErrorEventHandler errorHandler; 96 97 protected bool _isRegistered = false; 98 99 this(Selector loop, WatcherType type) 100 { 101 this._inLoop = loop; 102 _type = type; 103 _flags = BitArray([false, false, false, false, false, false, false, 104 false, false, false, false, false, false, false, false, false]); 105 } 106 107 /** 108 */ 109 bool isRegistered() 110 { 111 return _isRegistered; 112 } 113 114 /** 115 */ 116 bool isClosed() 117 { 118 return _isClosed; 119 } 120 121 protected bool _isClosed = false; 122 123 protected void onClose() 124 { 125 _isRegistered = false; 126 _isClosed = true; 127 version(Windows) {} else { 128 _inLoop.deregister(this); 129 } 130 // _inLoop = null; 131 clear(); 132 } 133 134 protected void errorOccurred(string msg) 135 { 136 if (errorHandler !is null) 137 errorHandler(msg); 138 } 139 140 void onRead() 141 { 142 assert(false, "not implemented"); 143 } 144 145 void onWrite() 146 { 147 assert(false, "not implemented"); 148 } 149 150 final bool flag(WatchFlag index) 151 { 152 return _flags[index]; 153 } 154 155 @property WatcherType type() 156 { 157 return _type; 158 } 159 160 @property Selector eventLoop() 161 { 162 return _inLoop; 163 } 164 165 void close() 166 { 167 if (!_isClosed) 168 { 169 version (KissDebugMode) 170 trace("channel closing...", this.handle); 171 onClose(); 172 version (KissDebugMode) 173 trace("channel closed...", this.handle); 174 } 175 else 176 { 177 debug warningf("The watcher(fd=%d) has already been closed", this.handle); 178 } 179 } 180 181 void setNext(AbstractChannel next) 182 { 183 if (next is this) 184 return; // Can't set to self 185 next._next = _next; 186 next._priv = this; 187 if (_next) 188 _next._priv = next; 189 this._next = next; 190 } 191 192 void clear() 193 { 194 if (_priv) 195 _priv._next = _next; 196 if (_next) 197 _next._priv = _priv; 198 _next = null; 199 _priv = null; 200 } 201 202 mixin OverrideErro; 203 204 protected: 205 final void setFlag(WatchFlag index, bool enable) 206 { 207 _flags[index] = enable; 208 } 209 210 Selector _inLoop; 211 212 private: 213 BitArray _flags; 214 WatcherType _type; 215 216 AbstractChannel _priv; 217 AbstractChannel _next; 218 } 219 220 /** 221 */ 222 class EventChannel : AbstractChannel 223 { 224 this(Selector loop) 225 { 226 super(loop, WatcherType.Event); 227 } 228 229 void call() 230 { 231 assert(false); 232 } 233 } 234 235 mixin template OverrideErro() 236 { 237 bool isError() 238 { 239 return _error; 240 } 241 242 string erroString() 243 { 244 return _erroString; 245 } 246 247 void clearError() 248 { 249 _error = false; 250 _erroString = ""; 251 } 252 253 bool _error = false; 254 string _erroString; 255 } 256 257 enum WatcherType : ubyte 258 { 259 Accept = 0, 260 TCP, 261 UDP, 262 Timer, 263 Event, 264 File, 265 None 266 } 267 268 enum WatchFlag : ushort 269 { 270 None = 0, 271 Read, 272 Write, 273 274 OneShot = 8, 275 ETMode = 16 276 } 277 278 final class UdpDataObject 279 { 280 Address addr; 281 ubyte[] data; 282 } 283 284 final class BaseTypeObject(T) 285 { 286 T data; 287 } 288 289 class LoopException : Exception 290 { 291 mixin basicExceptionCtors; 292 } 293 294 // dfmt off 295 version(linux): 296 // dfmt on 297 static if (isCompilerVersionBelow(2078)) 298 { 299 version (X86) 300 { 301 enum SO_REUSEPORT = 15; 302 } 303 else version (X86_64) 304 { 305 enum SO_REUSEPORT = 15; 306 } 307 else version (MIPS32) 308 { 309 enum SO_REUSEPORT = 0x0200; 310 } 311 else version (MIPS64) 312 { 313 enum SO_REUSEPORT = 0x0200; 314 } 315 else version (PPC) 316 { 317 enum SO_REUSEPORT = 15; 318 } 319 else version (PPC64) 320 { 321 enum SO_REUSEPORT = 15; 322 } 323 else version (ARM) 324 { 325 enum SO_REUSEPORT = 15; 326 } 327 }