1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.socket.posix; 13 14 // dfmt off 15 version(Posix): 16 17 // dfmt on 18 19 import kiss.event.socket.common; 20 import kiss.event.core; 21 import kiss.core; 22 import kiss.util.thread; 23 24 import std.conv; 25 import std.exception; 26 import std.format; 27 import std.process; 28 import std.socket; 29 import std.string; 30 import kiss.logger; 31 32 import core.stdc.errno; 33 import core.stdc.string; 34 import core.sys.posix.sys.socket : accept; 35 36 /** 37 TCP Server 38 */ 39 abstract class AbstractListener : AbstractSocketChannel 40 { 41 this(Selector loop, AddressFamily family = AddressFamily.INET) 42 { 43 super(loop, WatcherType.Accept); 44 setFlag(WatchFlag.Read, true); 45 this.socket = new TcpSocket(family); 46 } 47 48 protected bool onAccept(scope AcceptHandler handler) 49 { 50 version (KissDebugMode) 51 trace("new connection coming..."); 52 this.clearError(); 53 socket_t clientFd = cast(socket_t)(accept(this.handle, null, null)); 54 if (clientFd == socket_t.init) 55 return false; 56 57 version (KissDebugMode) 58 infof("Listener fd=%d, client fd=%d", this.handle, clientFd); 59 60 if (handler !is null) 61 handler(new Socket(clientFd, this._family)); 62 return true; 63 } 64 65 override void onWriteDone() 66 { 67 version (KissDebugMode) 68 tracef("a new connection created"); 69 } 70 } 71 72 /** 73 TCP Client 74 */ 75 abstract class AbstractStream : AbstractSocketChannel, Stream 76 { 77 SimpleEventHandler disconnectionHandler; 78 // DataWrittenHandler sentHandler; 79 80 protected bool _isConnected; //if server side always true. 81 // alias UbyteArrayObject = BaseTypeObject!(ubyte[]); 82 83 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) 84 { 85 // _readBuffer = new UbyteArrayObject(); 86 version (KissDebugMode) 87 trace("Buffer size for read: ", bufferSize); 88 _readBuffer = new ubyte[bufferSize]; 89 super(loop, WatcherType.TCP); 90 setFlag(WatchFlag.Read, true); 91 setFlag(WatchFlag.Write, true); 92 setFlag(WatchFlag.ETMode, true); 93 } 94 95 /** 96 */ 97 protected bool tryRead() 98 { 99 bool isDone = true; 100 this.clearError(); 101 ptrdiff_t len = this.socket.receive(cast(void[]) this._readBuffer); 102 version (KissDebugMode) 103 trace("read nbytes...", len); 104 105 if (len > 0) 106 { 107 if (dataReceivedHandler !is null) 108 dataReceivedHandler(this._readBuffer[0 .. len]); 109 110 // It's prossible that more data are wainting for read in inner buffer. 111 if (len == _readBuffer.length) 112 isDone = false; 113 } 114 else if (len < 0) 115 { 116 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 117 // check more error status 118 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) 119 { 120 this._error = true; 121 this._erroString = cast(string)fromStringz(strerror(errno)); 122 } 123 124 version (KissDebugMode) 125 warningf("read error: isDone=%s, errno=%d, message=%s", 126 isDone, errno, cast(string)fromStringz(strerror(errno))); 127 } 128 else 129 { 130 version (KissDebugMode) 131 warningf("connection broken: %s", _remoteAddress.toString()); 132 onDisconnected(); 133 if (_isClosed) 134 this.socket.close(); // release the sources 135 else 136 this.close(); 137 } 138 139 return isDone; 140 } 141 142 protected void onDisconnected() 143 { 144 _isConnected = false; 145 _isClosed = true; 146 if (disconnectionHandler !is null) 147 disconnectionHandler(); 148 } 149 150 protected bool canWriteAgain = true; 151 int writeRetryLimit = 5; 152 private int writeRetries = 0; 153 154 /** 155 Warning: It will try the best to write all the data. 156 // TODO: create a examlple for test 157 */ 158 protected void tryWriteAll(in ubyte[] data) 159 { 160 const nBytes = this.socket.send(data); 161 // version (KissDebugMode) 162 tracef("actually sent bytes: %d / %d", nBytes, data.length); 163 164 if (nBytes > 0) 165 { 166 if (canWriteAgain && nBytes < data.length) // && writeRetries < writeRetryLimit 167 { 168 // version (KissDebugMode) 169 writeRetries++; 170 tracef("[%d] rewrite: written %d, remaining: %d, total: %d", 171 writeRetries, nBytes, data.length - nBytes, data.length); 172 if (writeRetries > writeRetryLimit) 173 warning("You are writting a Big block of data!!!"); 174 175 tryWriteAll(data[nBytes .. $]); 176 } 177 else 178 writeRetries = 0; 179 180 } 181 else if (nBytes == Socket.ERROR) 182 { 183 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) 184 { 185 string msg = lastSocketError(); 186 warningf("errno=%d, message: %s", errno, msg); 187 this._error = true; 188 this._erroString = msg; 189 190 errorOccurred(msg); 191 } 192 else 193 { 194 // version (KissDebugMode) 195 warningf("errno=%d, message: %s", errno, lastSocketError()); 196 if (canWriteAgain) 197 { 198 import core.thread; 199 import core.time; 200 201 writeRetries++; 202 tracef("[%d] rewrite: written %d, remaining: %d, total: %d", 203 writeRetries, nBytes, data.length - nBytes, data.length); 204 if (writeRetries > writeRetryLimit) 205 warning("You are writting a Big block of data!!!"); 206 warning("Wait for a 100 msecs to try again"); 207 Thread.sleep(100.msecs); 208 tryWriteAll(data); 209 } 210 } 211 } 212 else 213 { 214 version (KissDebugMode) 215 { 216 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 217 assert(false, "Undefined behavior!"); 218 } 219 else 220 { 221 this._error = true; 222 this._erroString = lastSocketError(); 223 } 224 } 225 } 226 227 /** 228 Try to write a block of data. 229 */ 230 protected size_t tryWrite(in ubyte[] data) 231 { 232 const nBytes = this.socket.send(data); 233 version (KissDebugMode) 234 tracef("actually sent bytes: %d / %d", nBytes, data.length); 235 236 if (nBytes > 0) 237 { 238 return nBytes; 239 } 240 else if (nBytes == Socket.ERROR) 241 { 242 version (KissDebugMode) 243 warningf("errno=%d, message: %s", errno, lastSocketError()); 244 245 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 246 // check more error status 247 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) 248 { 249 this._error = true; 250 this._erroString = lastSocketError(); 251 warningf("errno=%d, message: %s", errno, this._erroString); 252 } 253 } 254 else 255 { 256 version (KissDebugMode) 257 { 258 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 259 assert(false, "Undefined behavior!"); 260 } 261 else 262 { 263 this._error = true; 264 this._erroString = lastSocketError(); 265 } 266 } 267 return 0; 268 } 269 270 protected void doConnect(Address addr) 271 { 272 this.socket.connect(addr); 273 } 274 275 void cancelWrite() 276 { 277 isWriteCancelling = true; 278 } 279 280 override void onWriteDone() 281 { 282 // notified by kqueue selector when data writing done 283 version (KissDebugMode) 284 tracef("done with data writing"); 285 } 286 287 // protected UbyteArrayObject _readBuffer; 288 private const(ubyte)[] _readBuffer; 289 protected WriteBufferQueue _writeQueue; 290 protected bool isWriteCancelling = false; 291 292 /** 293 * Warning: The received data is stored a inner buffer. For a data safe, 294 * you would make a copy of it. 295 */ 296 DataReceivedHandler dataReceivedHandler; 297 298 } 299 300 /** 301 UDP Socket 302 */ 303 abstract class AbstractDatagramSocket : AbstractSocketChannel, IDatagramSocket 304 { 305 this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2) 306 { 307 super(loop, WatcherType.UDP); 308 setFlag(WatchFlag.Read, true); 309 setFlag(WatchFlag.ETMode, false); 310 311 this.socket = new UdpSocket(family); 312 // _socket.blocking = false; 313 _readBuffer = new UdpDataObject(); 314 _readBuffer.data = new ubyte[bufferSize]; 315 316 if (family == AddressFamily.INET) 317 _bindAddress = new InternetAddress(InternetAddress.PORT_ANY); 318 else if (family == AddressFamily.INET6) 319 _bindAddress = new Internet6Address(Internet6Address.PORT_ANY); 320 else 321 _bindAddress = new UnknownAddress(); 322 } 323 324 final void bind(Address addr) 325 { 326 if (_binded) 327 return; 328 _bindAddress = addr; 329 socket.bind(_bindAddress); 330 _binded = true; 331 } 332 333 final bool isBind() 334 { 335 return _binded; 336 } 337 338 Address bindAddr() 339 { 340 return _bindAddress; 341 } 342 343 protected UdpDataObject _readBuffer; 344 protected bool _binded = false; 345 protected Address _bindAddress; 346 347 protected bool tryRead(scope ReadCallBack read) 348 { 349 scope Address createAddress() 350 { 351 enum ushort DPORT = 0; 352 if (AddressFamily.INET == this.socket.addressFamily) 353 return new InternetAddress(DPORT); 354 else if (AddressFamily.INET6 == this.socket.addressFamily) 355 return new Internet6Address(DPORT); 356 else 357 throw new AddressException( 358 "NOT SUPPORT addressFamily. It only can be AddressFamily.INET or AddressFamily.INET6"); 359 } 360 361 this._readBuffer.addr = createAddress(); 362 auto data = this._readBuffer.data; 363 scope (exit) 364 this._readBuffer.data = data; 365 auto len = this.socket.receiveFrom(this._readBuffer.data, this._readBuffer.addr); 366 if (len > 0) 367 { 368 this._readBuffer.data = this._readBuffer.data[0 .. len]; 369 read(this._readBuffer); 370 } 371 return false; 372 } 373 374 override void onWriteDone() 375 { 376 // notified by kqueue selector when data writing done 377 version (KissDebugMode) 378 tracef("done with data writing"); 379 } 380 }