1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.net.TcpStream; 13 14 import kiss.container.ByteBuffer; 15 import kiss.core; 16 import kiss.event; 17 import kiss.net.core; 18 import kiss.logger; 19 20 import std.format; 21 import std.exception; 22 import std.socket; 23 import core.thread; 24 import core.time; 25 26 27 /** 28 */ 29 class TcpStream : AbstractStream 30 { 31 SimpleEventHandler closeHandler; 32 33 // for client side 34 this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2) 35 { 36 super(loop, family, bufferSize); 37 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 38 39 _isClientSide = false; 40 _isConnected = false; 41 } 42 43 // for server side 44 this(Selector loop, Socket socket, size_t bufferSize = 4096 * 2) 45 { 46 super(loop, socket.addressFamily, bufferSize); 47 this.socket = socket; 48 _remoteAddress = socket.remoteAddress(); 49 _localAddress = socket.localAddress(); 50 51 _isClientSide = false; 52 _isConnected = true; 53 } 54 55 void connect(string ip, ushort port) 56 { 57 connect(parseAddress(ip, port)); 58 } 59 60 void connect(Address addr) 61 { 62 if (_isConnected) 63 return; 64 65 try 66 { 67 Address binded = createAddress(this.socket, 0); 68 this.socket.bind(binded); 69 this.doConnect(addr); 70 start(); 71 _isConnected = true; 72 _remoteAddress = addr; 73 _localAddress = this.socket.localAddress(); 74 } 75 catch (Exception ex) 76 { 77 error(ex.message); 78 } 79 80 if (_connectionHandler !is null) 81 _connectionHandler(_isConnected); 82 } 83 84 void reconnect(Address addr) 85 { 86 if (_isConnected) 87 this.close(); 88 _isConnected = false; 89 AddressFamily family = AddressFamily.INET; 90 if (this.socket !is null) 91 family = this.socket.addressFamily; 92 93 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 94 connect(addr); 95 } 96 97 TcpStream onConnected(ConnectionHandler cback) 98 { 99 _connectionHandler = cback; 100 return this; 101 } 102 103 TcpStream onDataReceived(DataReceivedHandler handler) 104 { 105 dataReceivedHandler = handler; 106 return this; 107 } 108 109 // TcpStream onDataWritten(DataWrittenHandler handler) 110 // { 111 // sentHandler = handler; 112 // return this; 113 // } 114 115 TcpStream onClosed(SimpleEventHandler handler) 116 { 117 closeHandler = handler; 118 return this; 119 } 120 121 TcpStream onDisconnected(SimpleEventHandler handler) 122 { 123 disconnectionHandler = handler; 124 return this; 125 } 126 127 TcpStream onError(ErrorEventHandler handler) 128 { 129 errorHandler = handler; 130 return this; 131 } 132 133 bool isConnected() nothrow 134 { 135 return _isConnected; 136 } 137 138 override void start() 139 { 140 if (_isRegistered) 141 return; 142 _inLoop.register(this); 143 _isRegistered = true; 144 version (Windows) 145 this.beginRead(); 146 } 147 148 void write(StreamWriteBuffer buffer) 149 { 150 assert(buffer !is null); 151 152 if (!_isConnected) 153 { 154 warning("The connection has been closed!"); 155 return; 156 } 157 158 _writeQueue.enQueue(buffer); 159 160 version (Windows) 161 tryWrite(); 162 else 163 { 164 onWrite(); 165 } 166 } 167 168 /// safe for big data sending 169 void write(in ubyte[] data, DataWrittenHandler handler = null) 170 { 171 if (data.length == 0) 172 return; 173 174 write(new SocketStreamBuffer(data, handler)); 175 } 176 177 protected: 178 bool _isClientSide; 179 ConnectionHandler _connectionHandler; 180 181 override void onRead() 182 { 183 version (KissDebugMode) 184 trace("start to read"); 185 186 version (Posix) 187 { 188 while (_isRegistered && !tryRead()) 189 { 190 version (KissDebugMode) 191 { 192 trace("continue reading..."); 193 } 194 } 195 } 196 else 197 { 198 doRead(); 199 } 200 201 if (this.isError) 202 { 203 string msg = format("Socket error on read: fd=%d, message: %s", 204 this.handle, this.erroString); 205 errorf(msg); 206 errorOccurred(msg); 207 } 208 } 209 210 override void onClose() 211 { 212 version (KissDebugMode) 213 { 214 if (!_writeQueue.empty) 215 { 216 warning("Some data has not been sent yet."); 217 } 218 } 219 220 _writeQueue.clear(); 221 super.onClose(); 222 _isConnected = false; 223 this.socket.shutdown(SocketShutdown.BOTH); 224 this.socket.close(); 225 226 if (closeHandler) 227 closeHandler(); 228 } 229 230 override void onWrite() 231 { 232 if (!_isConnected) 233 { 234 _isConnected = true; 235 _remoteAddress = socket.remoteAddress(); 236 237 if (_connectionHandler) 238 _connectionHandler(true); 239 return; 240 } 241 242 // bool canWrite = true; 243 version (KissDebugMode) 244 trace("start to write"); 245 246 while (_isRegistered && !isWriteCancelling && !_writeQueue.empty) 247 { 248 version (KissDebugMode) 249 trace("writting..."); 250 251 StreamWriteBuffer writeBuffer = _writeQueue.front(); 252 const(ubyte[]) data = writeBuffer.sendData(); 253 if (data.length == 0) 254 { 255 _writeQueue.deQueue().doFinish(); 256 continue; 257 } 258 259 this.clearError(); 260 size_t nBytes = tryWrite(data); 261 if (nBytes > 0 && writeBuffer.popSize(nBytes)) 262 { 263 version (KissDebugMode) 264 trace("finishing data writing...nBytes", nBytes); 265 _writeQueue.deQueue().doFinish(); 266 } 267 268 if (this.isError) 269 { 270 string msg = format("Socket error on write: fd=%d, message=%s", 271 this.handle, this.erroString); 272 errorOccurred(msg); 273 errorf(msg); 274 break; 275 } 276 } 277 } 278 }