1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.socket.posix; 13 14 // dfmt off 15 version(Posix): 16 17 // dfmt on 18 19 import kiss.event.socket.common; 20 import kiss.event.core; 21 import kiss.core; 22 import kiss.util.thread; 23 24 import std.conv; 25 import std.exception; 26 import std.format; 27 import std.process; 28 import std.socket; 29 import std.string; 30 import kiss.logger; 31 32 import core.stdc.errno; 33 import core.stdc.string; 34 import core.sys.posix.sys.socket : accept; 35 36 /** 37 TCP Server 38 */ 39 abstract class AbstractListener : AbstractSocketChannel 40 { 41 this(Selector loop, AddressFamily family = AddressFamily.INET) 42 { 43 super(loop, WatcherType.Accept); 44 setFlag(WatchFlag.Read, true); 45 this.socket = new TcpSocket(family); 46 } 47 48 protected bool onAccept(scope AcceptHandler handler) 49 { 50 version (KissDebugMode) 51 trace("new connection coming..."); 52 this.clearError(); 53 socket_t clientFd = cast(socket_t)(accept(this.handle, null, null)); 54 if (clientFd == socket_t.init) 55 return false; 56 57 version (KissDebugMode) 58 infof("Listener fd=%d, client fd=%d", this.handle, clientFd); 59 60 if (handler !is null) 61 handler(new Socket(clientFd, this._family)); 62 return true; 63 } 64 65 override void onWriteDone() 66 { 67 version (KissDebugMode) 68 tracef("a new connection created, thread: %s", getTid()); 69 } 70 } 71 72 /** 73 TCP Client 74 */ 75 abstract class AbstractStream : AbstractSocketChannel, Stream 76 { 77 SimpleEventHandler disconnectionHandler; 78 // DataWrittenHandler sentHandler; 79 80 protected bool _isConnected; //if server side always true. 81 // alias UbyteArrayObject = BaseTypeObject!(ubyte[]); 82 83 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) 84 { 85 // _readBuffer = new UbyteArrayObject(); 86 version (KissDebugMode) 87 trace("Buffer size for read: ", bufferSize); 88 _readBuffer = new ubyte[bufferSize]; 89 super(loop, WatcherType.TCP); 90 setFlag(WatchFlag.Read, true); 91 setFlag(WatchFlag.Write, true); 92 setFlag(WatchFlag.ETMode, true); 93 } 94 95 /** 96 */ 97 protected bool tryRead() 98 { 99 bool isDone = true; 100 this.clearError(); 101 ptrdiff_t len = this.socket.receive(cast(void[]) this._readBuffer); 102 version (KissDebugMode) 103 trace("read nbytes...", len); 104 105 if (len > 0) 106 { 107 if (dataReceivedHandler !is null) 108 dataReceivedHandler(this._readBuffer[0 .. len]); 109 110 // It's prossible that more data are wainting for read in inner buffer. 111 if (len == _readBuffer.length) 112 isDone = false; 113 } 114 else if (len < 0) 115 { 116 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 117 // check more error status 118 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) 119 { 120 this._error = true; 121 this._erroString = fromStringz(strerror(errno)).idup; 122 } 123 else 124 isDone = false; 125 } 126 else 127 { 128 version (KissDebugMode) 129 warningf("connection broken: %s", _remoteAddress.toString()); 130 onDisconnected(); 131 if (_isClosed) 132 this.socket.close(); // release the sources 133 else 134 this.close(); 135 } 136 137 return isDone; 138 } 139 140 protected void onDisconnected() 141 { 142 _isConnected = false; 143 _isClosed = true; 144 if (disconnectionHandler !is null) 145 disconnectionHandler(); 146 } 147 148 protected bool canWriteAgain = true; 149 int writeRetryLimit = 5; 150 private int writeRetries = 0; 151 152 /** 153 Warning: It will try the best to write all the data. 154 // TODO: create a examlple for test 155 */ 156 protected void tryWriteAll(in ubyte[] data) 157 { 158 const nBytes = this.socket.send(data); 159 // version (KissDebugMode) 160 tracef("actually sent bytes: %d / %d", nBytes, data.length); 161 162 if (nBytes > 0) 163 { 164 if (canWriteAgain && nBytes < data.length) // && writeRetries < writeRetryLimit 165 { 166 // version (KissDebugMode) 167 writeRetries++; 168 tracef("[%d] rewrite: written %d, remaining: %d, total: %d", 169 writeRetries, nBytes, data.length - nBytes, data.length); 170 if (writeRetries > writeRetryLimit) 171 warning("You are writting a Big block of data!!!"); 172 173 tryWriteAll(data[nBytes .. $]); 174 } 175 else 176 writeRetries = 0; 177 178 } 179 else if (nBytes == Socket.ERROR) 180 { 181 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) 182 { 183 string msg = lastSocketError(); 184 warningf("errno=%d, message: %s", errno, msg); 185 this._error = true; 186 this._erroString = msg; 187 188 errorOccurred(msg); 189 } 190 else 191 { 192 // version (KissDebugMode) 193 warningf("errno=%d, message: %s", errno, lastSocketError()); 194 if (canWriteAgain) 195 { 196 import core.thread; 197 import core.time; 198 199 writeRetries++; 200 tracef("[%d] rewrite: written %d, remaining: %d, total: %d", 201 writeRetries, nBytes, data.length - nBytes, data.length); 202 if (writeRetries > writeRetryLimit) 203 warning("You are writting a Big block of data!!!"); 204 warning("Wait for a 100 msecs to try again"); 205 Thread.sleep(100.msecs); 206 tryWriteAll(data); 207 } 208 } 209 } 210 else 211 { 212 version (KissDebugMode) 213 { 214 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 215 assert(false, "Undefined behavior!"); 216 } 217 else 218 { 219 this._error = true; 220 this._erroString = lastSocketError(); 221 } 222 } 223 } 224 225 /** 226 Try to write a block of data. 227 */ 228 protected size_t tryWrite(in ubyte[] data) 229 { 230 const nBytes = this.socket.send(data); 231 version (KissDebugMode) 232 tracef("actually sent bytes: %d / %d", nBytes, data.length); 233 234 if (nBytes > 0) 235 { 236 return nBytes; 237 } 238 else if (nBytes == Socket.ERROR) 239 { 240 version (KissDebugMode) 241 warningf("errno=%d, message: %s", errno, lastSocketError()); 242 243 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 244 // check more error status 245 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) 246 { 247 this._error = true; 248 this._erroString = lastSocketError(); 249 } 250 } 251 else 252 { 253 version (KissDebugMode) 254 { 255 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 256 assert(false, "Undefined behavior!"); 257 } 258 else 259 { 260 this._error = true; 261 this._erroString = lastSocketError(); 262 } 263 } 264 return 0; 265 } 266 267 protected void doConnect(Address addr) 268 { 269 this.socket.connect(addr); 270 } 271 272 void cancelWrite() 273 { 274 isWriteCancelling = true; 275 } 276 277 override void onWriteDone() 278 { 279 // notified by kqueue selector when data writing done 280 version (KissDebugMode) 281 tracef("done with data writing, thread: %s", getTid()); 282 } 283 284 // protected UbyteArrayObject _readBuffer; 285 private const(ubyte)[] _readBuffer; 286 protected WriteBufferQueue _writeQueue; 287 protected bool isWriteCancelling = false; 288 289 /** 290 * Warning: The received data is stored a inner buffer. For a data safe, 291 * you would make a copy of it. 292 */ 293 DataReceivedHandler dataReceivedHandler; 294 295 } 296 297 /** 298 UDP Socket 299 */ 300 abstract class AbstractDatagramSocket : AbstractSocketChannel, IDatagramSocket 301 { 302 this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2) 303 { 304 super(loop, WatcherType.UDP); 305 setFlag(WatchFlag.Read, true); 306 setFlag(WatchFlag.ETMode, false); 307 308 this.socket = new UdpSocket(family); 309 // _socket.blocking = false; 310 _readBuffer = new UdpDataObject(); 311 _readBuffer.data = new ubyte[bufferSize]; 312 313 if (family == AddressFamily.INET) 314 _bindAddress = new InternetAddress(InternetAddress.PORT_ANY); 315 else if (family == AddressFamily.INET6) 316 _bindAddress = new Internet6Address(Internet6Address.PORT_ANY); 317 else 318 _bindAddress = new UnknownAddress(); 319 } 320 321 final void bind(Address addr) 322 { 323 if (_binded) 324 return; 325 _bindAddress = addr; 326 socket.bind(_bindAddress); 327 _binded = true; 328 } 329 330 final bool isBind() 331 { 332 return _binded; 333 } 334 335 Address bindAddr() 336 { 337 return _bindAddress; 338 } 339 340 protected UdpDataObject _readBuffer; 341 protected bool _binded = false; 342 protected Address _bindAddress; 343 344 protected bool tryRead(scope ReadCallBack read) 345 { 346 scope Address createAddress() 347 { 348 enum ushort DPORT = 0; 349 if (AddressFamily.INET == this.socket.addressFamily) 350 return new InternetAddress(DPORT); 351 else if (AddressFamily.INET6 == this.socket.addressFamily) 352 return new Internet6Address(DPORT); 353 else 354 throw new AddressException( 355 "NOT SUPPORT addressFamily. It only can be AddressFamily.INET or AddressFamily.INET6"); 356 } 357 358 this._readBuffer.addr = createAddress(); 359 auto data = this._readBuffer.data; 360 scope (exit) 361 this._readBuffer.data = data; 362 auto len = this.socket.receiveFrom(this._readBuffer.data, this._readBuffer.addr); 363 if (len > 0) 364 { 365 this._readBuffer.data = this._readBuffer.data[0 .. len]; 366 read(this._readBuffer); 367 } 368 return false; 369 } 370 371 override void onWriteDone() 372 { 373 // notified by kqueue selector when data writing done 374 version (KissDebugMode) 375 tracef("done with data writing, thread: %s", getTid()); 376 } 377 }