1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.net.TcpStream; 13 14 import kiss.event; 15 import kiss.net.core; 16 17 import std.format; 18 import std.socket; 19 import std.exception; 20 import kiss.logger; 21 import std.socket; 22 import core.thread; 23 import core.time; 24 25 import kiss.container.ByteBuffer; 26 import kiss.core; 27 28 /** 29 */ 30 class TcpStream : AbstractStream 31 { 32 SimpleEventHandler closeHandler; 33 34 // for client side 35 this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2) 36 { 37 super(loop, family, bufferSize); 38 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 39 40 // _localAddress = socket.localAddress();// TODO: 41 42 _isClientSide = false; 43 _isConnected = false; 44 } 45 46 // for server side 47 this(Selector loop, Socket socket, size_t bufferSize = 4096 * 2) 48 { 49 super(loop, socket.addressFamily, bufferSize); 50 this.socket = socket; 51 _remoteAddress = socket.remoteAddress(); 52 _localAddress = socket.localAddress(); 53 54 _isClientSide = false; 55 _isConnected = true; 56 } 57 58 void connect(string ip, ushort port) 59 { 60 connect(parseAddress(ip, port)); 61 } 62 63 void connect(Address addr) 64 { 65 if (_isConnected) 66 return; 67 68 try 69 { 70 Address binded = createAddress(this.socket, 0); 71 this.socket.bind(binded); 72 this.doConnect(addr); 73 start(); 74 _isConnected = true; 75 _remoteAddress = addr; 76 _localAddress = binded; 77 } 78 catch (Exception ex) 79 { 80 error(ex.message); 81 } 82 83 if (_connectionHandler !is null) 84 _connectionHandler(_isConnected); 85 } 86 87 void reconnect(Address addr) 88 { 89 if (_isConnected) 90 this.close(); 91 _isConnected = false; 92 AddressFamily family = AddressFamily.INET; 93 if (this.socket !is null) 94 family = this.socket.addressFamily; 95 96 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 97 connect(addr); 98 } 99 100 TcpStream onConnected(ConnectionHandler cback) 101 { 102 _connectionHandler = cback; 103 return this; 104 } 105 106 TcpStream onDataReceived(DataReceivedHandler handler) 107 { 108 dataReceivedHandler = handler; 109 return this; 110 } 111 112 // TcpStream onDataWritten(DataWrittenHandler handler) 113 // { 114 // sentHandler = handler; 115 // return this; 116 // } 117 118 TcpStream onClosed(SimpleEventHandler handler) 119 { 120 closeHandler = handler; 121 return this; 122 } 123 124 TcpStream onDisconnected(SimpleEventHandler handler) 125 { 126 disconnectionHandler = handler; 127 return this; 128 } 129 130 TcpStream onError(ErrorEventHandler handler) 131 { 132 errorHandler = handler; 133 return this; 134 } 135 136 bool isConnected() nothrow 137 { 138 return _isConnected; 139 } 140 141 override void start() 142 { 143 if (_isRegistered) 144 return; 145 _inLoop.register(this); 146 _isRegistered = true; 147 version (Windows) 148 this.beginRead(); 149 } 150 151 void write(StreamWriteBuffer buffer) 152 { 153 assert(buffer !is null); 154 155 if (!_isConnected) 156 { 157 warning("The connection has been closed!"); 158 return; 159 } 160 161 _writeQueue.enQueue(buffer); 162 163 version (Windows) 164 tryWrite(); 165 else 166 { 167 onWrite(); 168 } 169 } 170 171 /// safe for big data sending 172 void write(in ubyte[] data, DataWrittenHandler handler = null) 173 { 174 if (data.length == 0) 175 return; 176 177 write(new SocketStreamBuffer(data, handler)); 178 } 179 180 protected: 181 bool _isClientSide; 182 ConnectionHandler _connectionHandler; 183 184 override void onRead() 185 { 186 version (KissDebugMode) 187 trace("start to read"); 188 189 version (Posix) 190 { 191 while (_isRegistered && !tryRead()) 192 { 193 version (KissDebugMode) 194 { 195 trace("continue reading..."); 196 } 197 } 198 } 199 else 200 { 201 doRead(); 202 } 203 204 if (this.isError) 205 { 206 string msg = format("Socket error on read: fd=%d, message: %s", 207 this.handle, this.erroString); 208 errorf(msg); 209 errorOccurred(msg); 210 } 211 } 212 213 override void onClose() 214 { 215 version (KissDebugMode) 216 { 217 if (!_writeQueue.empty) 218 { 219 warning("Some data has not been sent yet."); 220 } 221 } 222 223 _writeQueue.clear(); 224 super.onClose(); 225 _isConnected = false; 226 this.socket.shutdown(SocketShutdown.BOTH); 227 version (Posix) 228 this.socket.close(); 229 230 if (closeHandler) 231 closeHandler(); 232 } 233 234 override void onWrite() 235 { 236 if (!_isConnected) 237 { 238 _isConnected = true; 239 _remoteAddress = socket.remoteAddress(); 240 241 if (_connectionHandler) 242 _connectionHandler(true); 243 return; 244 } 245 246 // bool canWrite = true; 247 version (KissDebugMode) 248 trace("start to write"); 249 250 while (_isRegistered && !isWriteCancelling && !_writeQueue.empty) 251 { 252 version (KissDebugMode) 253 trace("writting..."); 254 255 StreamWriteBuffer writeBuffer = _writeQueue.front(); 256 const(ubyte[]) data = writeBuffer.sendData(); 257 if (data.length == 0) 258 { 259 _writeQueue.deQueue().doFinish(); 260 continue; 261 } 262 263 this.clearError(); 264 size_t nBytes = tryWrite(data); 265 if (nBytes > 0 && writeBuffer.popSize(nBytes)) 266 { 267 version (KissDebugMode) 268 trace("finishing data writing...nBytes", nBytes); 269 _writeQueue.deQueue().doFinish(); 270 } 271 272 if (this.isError) 273 { 274 string msg = format("Socket error on write: fd=%d, message=%s", 275 this.handle, this.erroString); 276 errorOccurred(msg); 277 errorf(msg); 278 break; 279 } 280 } 281 } 282 }