1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.net.TcpStream;
13 
14 import kiss.event;
15 import kiss.net.core;
16 
17 import std.format;
18 import std.socket;
19 import std.exception;
20 import kiss.logger;
21 import std.socket;
22 import core.thread;
23 import core.time;
24 
25 import kiss.container.ByteBuffer;
26 import kiss.core;
27 
28 /**
29 */
30 class TcpStream : AbstractStream
31 {
32     SimpleEventHandler closeHandler;
33 
34     // for client side
35     this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2)
36     {
37         super(loop, family, bufferSize);
38         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
39 
40         // _localAddress = socket.localAddress();// TODO:
41 
42         _isClientSide = false;
43         _isConnected = false;
44     }
45 
46     // for server side
47     this(Selector loop, Socket socket, size_t bufferSize = 4096 * 2)
48     {
49         super(loop, socket.addressFamily, bufferSize);
50         this.socket = socket;
51         _remoteAddress = socket.remoteAddress();
52         _localAddress = socket.localAddress();
53 
54         _isClientSide = false;
55         _isConnected = true;
56     }
57 
58     void connect(string ip, ushort port)
59     {
60         connect(parseAddress(ip, port));
61     }
62 
63     void connect(Address addr)
64     {
65         if (_isConnected)
66             return;
67 
68         try
69         {
70             Address binded = createAddress(this.socket, 0);
71             this.socket.bind(binded);
72             this.doConnect(addr);
73             start();
74             _isConnected = true;
75             _remoteAddress = addr;
76             _localAddress = binded;
77         }
78         catch (Exception ex)
79         {
80             error(ex.message);
81         }
82 
83         if (_connectionHandler !is null)
84             _connectionHandler(_isConnected);
85     }
86 
87     void reconnect(Address addr)
88     {
89         if (_isConnected)
90             this.close();
91         _isConnected = false;
92         AddressFamily family = AddressFamily.INET;
93         if (this.socket !is null)
94             family = this.socket.addressFamily;
95 
96         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
97         connect(addr);
98     }
99 
100     TcpStream onConnected(ConnectionHandler cback)
101     {
102         _connectionHandler = cback;
103         return this;
104     }
105 
106     TcpStream onDataReceived(DataReceivedHandler handler)
107     {
108         dataReceivedHandler = handler;
109         return this;
110     }
111 
112     // TcpStream onDataWritten(DataWrittenHandler handler)
113     // {
114     //     sentHandler = handler;
115     //     return this;
116     // }
117 
118     TcpStream onClosed(SimpleEventHandler handler)
119     {
120         closeHandler = handler;
121         return this;
122     }
123 
124     TcpStream onDisconnected(SimpleEventHandler handler)
125     {
126         disconnectionHandler = handler;
127         return this;
128     }
129 
130     TcpStream onError(ErrorEventHandler handler)
131     {
132         errorHandler = handler;
133         return this;
134     }
135 
136     bool isConnected() nothrow
137     {
138         return _isConnected;
139     }
140 
141     override void start()
142     {
143         if (_isRegistered)
144             return;
145         _inLoop.register(this);
146         _isRegistered = true;
147         version (Windows)
148             this.beginRead();
149     }
150 
151     void write(StreamWriteBuffer buffer)
152     {
153         assert(buffer !is null);
154 
155         if (!_isConnected)
156         {
157             warning("The connection has been closed!");
158             return;
159         }
160 
161         _writeQueue.enQueue(buffer);
162 
163         version (Windows)
164             tryWrite();
165         else
166         {
167             onWrite();
168         }
169     }
170 
171     /// safe for big data sending
172     void write(in ubyte[] data, DataWrittenHandler handler = null)
173     {
174         if (data.length == 0)
175             return;
176 
177         write(new SocketStreamBuffer(data, handler));
178     }
179 
180 protected:
181     bool _isClientSide;
182     ConnectionHandler _connectionHandler;
183 
184     override void onRead()
185     {
186         version (KissDebugMode)
187             trace("start to read");
188 
189         version (Posix)
190         {
191             while (_isRegistered && !tryRead())
192             {
193                 version (KissDebugMode)
194                 {
195                     trace("continue reading...");
196                 }
197             }
198         }
199         else
200         {
201             doRead();
202         }
203 
204         if (this.isError)
205         {
206             string msg = format("Socket error on read: fd=%d, message: %s",
207                     this.handle, this.erroString);
208             errorf(msg);
209             errorOccurred(msg);
210         }
211     }
212 
213     override void onClose()
214     {
215         version (KissDebugMode)
216         {
217             if (!_writeQueue.empty)
218             {
219                 warning("Some data has not been sent yet.");
220             }
221         }
222 
223         _writeQueue.clear();
224         super.onClose();
225         _isConnected = false;
226         this.socket.shutdown(SocketShutdown.BOTH);
227         version (Posix)
228             this.socket.close();
229 
230         if (closeHandler)
231             closeHandler();
232     }
233 
234     override void onWrite()
235     {
236         if (!_isConnected)
237         {
238             _isConnected = true;
239             _remoteAddress = socket.remoteAddress();
240 
241             if (_connectionHandler)
242                 _connectionHandler(true);
243             return;
244         }
245 
246         // bool canWrite = true;
247         version (KissDebugMode)
248             trace("start to write");
249 
250         while (_isRegistered && !isWriteCancelling && !_writeQueue.empty)
251         {
252             version (KissDebugMode)
253                 trace("writting...");
254 
255             StreamWriteBuffer writeBuffer = _writeQueue.front();
256             const(ubyte[]) data = writeBuffer.sendData();
257             if (data.length == 0)
258             {
259                 _writeQueue.deQueue().doFinish();
260                 continue;
261             }
262 
263             this.clearError();
264             size_t nBytes = tryWrite(data);
265             if (nBytes > 0 && writeBuffer.popSize(nBytes))
266             {
267                 version (KissDebugMode)
268                     trace("finishing data writing...nBytes", nBytes);
269                 _writeQueue.deQueue().doFinish();
270             }
271 
272             if (this.isError)
273             {
274                 string msg = format("Socket error on write: fd=%d, message=%s",
275                         this.handle, this.erroString);
276                 errorOccurred(msg);
277                 errorf(msg);
278                 break;
279             }
280         }
281     }
282 }