1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.core; 13 14 import kiss.core; 15 16 import std.socket; 17 import std.exception; 18 import std.bitmanip; 19 import kiss.logger; 20 21 alias ReadCallBack = void delegate(Object obj); 22 23 alias DataReceivedHandler = void delegate(in ubyte[] data); 24 alias DataWrittenHandler = void delegate(in ubyte[] data, size_t size); 25 alias AcceptHandler = void delegate(Socket socket); 26 27 @trusted interface ReadTransport : Channel 28 { 29 30 void close(); 31 32 void onRead(AbstractChannel watcher) nothrow; 33 34 void onClose(AbstractChannel watcher) nothrow; 35 } 36 37 @trusted interface WriteTransport : Channel 38 { 39 40 void onWrite(AbstractChannel watcher) nothrow; 41 42 void onClose(AbstractChannel watcher) nothrow; 43 } 44 45 @trusted interface Transport : ReadTransport, WriteTransport 46 { 47 } 48 49 // dfmt on 50 51 interface StreamWriteBuffer 52 { 53 // todo Write Data; 54 const(ubyte)[] sendData(); 55 56 // add send offiset and return is empty 57 bool popSize(size_t size); 58 59 // do send finish 60 void doFinish(); 61 62 StreamWriteBuffer next(); 63 void next(StreamWriteBuffer); 64 } 65 66 alias ChannelBase = AbstractChannel; 67 68 /** 69 */ 70 interface Channel 71 { 72 73 } 74 75 /** 76 */ 77 interface Selector 78 { 79 bool register(AbstractChannel channel); 80 81 bool reregister(AbstractChannel channel); 82 83 bool deregister(AbstractChannel channel); 84 85 void stop(); 86 87 void dispose(); 88 } 89 90 /** 91 */ 92 abstract class AbstractChannel : Channel 93 { 94 socket_t handle = socket_t.init; 95 ErrorEventHandler errorHandler; 96 97 protected bool _isRegistered = false; 98 99 this(Selector loop, WatcherType type) 100 { 101 this._inLoop = loop; 102 _type = type; 103 _flags = BitArray([false, false, false, false, false, false, false, 104 false, false, false, false, false, false, false, false, false]); 105 } 106 107 /** 108 */ 109 bool isRegistered() 110 { 111 return _isRegistered; 112 } 113 114 /** 115 */ 116 bool isClosed() 117 { 118 return _isClosed; 119 } 120 121 protected bool _isClosed = false; 122 123 protected void onClose() 124 { 125 _inLoop.deregister(this); 126 // _inLoop = null; 127 _isRegistered = false; 128 _isClosed = true; 129 clear(); 130 } 131 132 protected void errorOccurred(string msg) 133 { 134 if (errorHandler !is null) 135 errorHandler(msg); 136 } 137 138 void onRead() 139 { 140 assert(false, "not implemented"); 141 } 142 143 void onWrite() 144 { 145 assert(false, "not implemented"); 146 } 147 148 final bool flag(WatchFlag index) 149 { 150 return _flags[index]; 151 } 152 153 @property WatcherType type() 154 { 155 return _type; 156 } 157 158 @property Selector eventLoop() 159 { 160 return _inLoop; 161 } 162 163 void close() 164 { 165 if (!_isClosed) 166 { 167 version (KissDebugMode) 168 trace("channel closing..."); 169 onClose(); 170 version (KissDebugMode) 171 trace("channel closed..."); 172 } 173 else 174 { 175 debug warningf("The watcher(fd=%d) has already been closed", this.handle); 176 } 177 } 178 179 void setNext(AbstractChannel next) 180 { 181 if (next is this) 182 return; // Can't set to self 183 next._next = _next; 184 next._priv = this; 185 if (_next) 186 _next._priv = next; 187 this._next = next; 188 } 189 190 void clear() 191 { 192 if (_priv) 193 _priv._next = _next; 194 if (_next) 195 _next._priv = _priv; 196 _next = null; 197 _priv = null; 198 } 199 200 mixin OverrideErro; 201 202 protected: 203 final void setFlag(WatchFlag index, bool enable) 204 { 205 _flags[index] = enable; 206 } 207 208 Selector _inLoop; 209 210 private: 211 BitArray _flags; 212 WatcherType _type; 213 214 AbstractChannel _priv; 215 AbstractChannel _next; 216 } 217 218 /** 219 */ 220 class EventChannel : AbstractChannel 221 { 222 this(Selector loop) 223 { 224 super(loop, WatcherType.Event); 225 } 226 227 void call() 228 { 229 assert(false); 230 } 231 } 232 233 mixin template OverrideErro() 234 { 235 bool isError() 236 { 237 return _error; 238 } 239 240 string erroString() 241 { 242 return _erroString; 243 } 244 245 void clearError() 246 { 247 _error = false; 248 _erroString = ""; 249 } 250 251 bool _error = false; 252 string _erroString; 253 } 254 255 enum WatcherType : ubyte 256 { 257 Accept = 0, 258 TCP, 259 UDP, 260 Timer, 261 Event, 262 File, 263 None 264 } 265 266 enum WatchFlag : ushort 267 { 268 None = 0, 269 Read, 270 Write, 271 272 OneShot = 8, 273 ETMode = 16 274 } 275 276 final class UdpDataObject 277 { 278 Address addr; 279 ubyte[] data; 280 } 281 282 final class BaseTypeObject(T) 283 { 284 T data; 285 } 286 287 class LoopException : Exception 288 { 289 mixin basicExceptionCtors; 290 } 291 292 // dfmt off 293 version(linux): 294 // dfmt on 295 static if (isCompilerVersionBelow(2078)) 296 { 297 version (X86) 298 { 299 enum SO_REUSEPORT = 15; 300 } 301 else version (X86_64) 302 { 303 enum SO_REUSEPORT = 15; 304 } 305 else version (MIPS32) 306 { 307 enum SO_REUSEPORT = 0x0200; 308 } 309 else version (MIPS64) 310 { 311 enum SO_REUSEPORT = 0x0200; 312 } 313 else version (PPC) 314 { 315 enum SO_REUSEPORT = 15; 316 } 317 else version (PPC64) 318 { 319 enum SO_REUSEPORT = 15; 320 } 321 else version (ARM) 322 { 323 enum SO_REUSEPORT = 15; 324 } 325 }