1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.net.TcpStream;
13 
14 import kiss.container.ByteBuffer;
15 import kiss.core;
16 import kiss.event;
17 import kiss.net.core;
18 import kiss.logger;
19 
20 import std.format;
21 import std.exception;
22 import std.socket;
23 import core.thread;
24 import core.time;
25 
26 
27 /**
28 */
29 class TcpStream : AbstractStream
30 {
31     SimpleEventHandler closeHandler;
32 
33     // for client side
34     this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2)
35     {
36         super(loop, family, bufferSize);
37         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
38 
39         _isClientSide = false;
40         _isConnected = false;
41     }
42 
43     // for server side
44     this(Selector loop, Socket socket, size_t bufferSize = 4096 * 2)
45     {
46         super(loop, socket.addressFamily, bufferSize);
47         this.socket = socket;
48         _remoteAddress = socket.remoteAddress();
49         _localAddress = socket.localAddress();
50 
51         _isClientSide = false;
52         _isConnected = true;
53     }
54 
55     void connect(string ip, ushort port)
56     {
57         connect(parseAddress(ip, port));
58     }
59 
60     void connect(Address addr)
61     {
62         if (_isConnected)
63             return;
64 
65         try
66         {
67             Address binded = createAddress(this.socket, 0);
68             this.socket.bind(binded);
69             this.doConnect(addr);
70             start();
71             _isConnected = true;
72             _remoteAddress = addr;
73             _localAddress = this.socket.localAddress();
74         }
75         catch (Exception ex)
76         {
77             error(ex.message);
78         }
79 
80         if (_connectionHandler !is null)
81             _connectionHandler(_isConnected);
82     }
83 
84     void reconnect(Address addr)
85     {
86         if (_isConnected)
87             this.close();
88         _isConnected = false;
89         AddressFamily family = AddressFamily.INET;
90         if (this.socket !is null)
91             family = this.socket.addressFamily;
92 
93         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
94         connect(addr);
95     }
96 
97     TcpStream onConnected(ConnectionHandler cback)
98     {
99         _connectionHandler = cback;
100         return this;
101     }
102 
103     TcpStream onDataReceived(DataReceivedHandler handler)
104     {
105         dataReceivedHandler = handler;
106         return this;
107     }
108 
109     // TcpStream onDataWritten(DataWrittenHandler handler)
110     // {
111     //     sentHandler = handler;
112     //     return this;
113     // }
114 
115     TcpStream onClosed(SimpleEventHandler handler)
116     {
117         closeHandler = handler;
118         return this;
119     }
120 
121     TcpStream onDisconnected(SimpleEventHandler handler)
122     {
123         disconnectionHandler = handler;
124         return this;
125     }
126 
127     TcpStream onError(ErrorEventHandler handler)
128     {
129         errorHandler = handler;
130         return this;
131     }
132 
133     bool isConnected() nothrow
134     {
135         return _isConnected;
136     }
137 
138     override void start()
139     {
140         if (_isRegistered)
141             return;
142         _inLoop.register(this);
143         _isRegistered = true;
144         version (Windows)
145             this.beginRead();
146     }
147 
148     void write(StreamWriteBuffer buffer)
149     {
150         assert(buffer !is null);
151 
152         if (!_isConnected)
153         {
154             warning("The connection has been closed!");
155             return;
156         }
157 
158         _writeQueue.enQueue(buffer);
159 
160         version (Windows)
161             tryWrite();
162         else
163         {
164             onWrite();
165         }
166     }
167 
168     /// safe for big data sending
169     void write(in ubyte[] data, DataWrittenHandler handler = null)
170     {
171         if (data.length == 0)
172             return;
173 
174         write(new SocketStreamBuffer(data, handler));
175     }
176 
177 protected:
178     bool _isClientSide;
179     ConnectionHandler _connectionHandler;
180 
181     override void onRead()
182     {
183         version (KissDebugMode)
184             trace("start to read");
185 
186         version (Posix)
187         {
188             while (_isRegistered && !tryRead())
189             {
190                 version (KissDebugMode)
191                 {
192                     trace("continue reading...");
193                 }
194             }
195         }
196         else
197         {
198             doRead();
199         }
200 
201         if (this.isError)
202         {
203             string msg = format("Socket error on read: fd=%d, message: %s",
204                     this.handle, this.erroString);
205             errorf(msg);
206             errorOccurred(msg);
207         }
208     }
209 
210     override void onClose()
211     {
212         version (KissDebugMode)
213         {
214             if (!_writeQueue.empty)
215             {
216                 warning("Some data has not been sent yet.");
217             }
218         }
219 
220         _writeQueue.clear();
221         super.onClose();
222         _isConnected = false;
223         this.socket.shutdown(SocketShutdown.BOTH);
224         this.socket.close();
225 
226         if (closeHandler)
227             closeHandler();
228     }
229 
230     override void onWrite()
231     {
232         if (!_isConnected)
233         {
234             _isConnected = true;
235             _remoteAddress = socket.remoteAddress();
236 
237             if (_connectionHandler)
238                 _connectionHandler(true);
239             return;
240         }
241 
242         // bool canWrite = true;
243         version (KissDebugMode)
244             trace("start to write");
245 
246         while (_isRegistered && !isWriteCancelling && !_writeQueue.empty)
247         {
248             version (KissDebugMode)
249                 trace("writting...");
250 
251             StreamWriteBuffer writeBuffer = _writeQueue.front();
252             const(ubyte[]) data = writeBuffer.sendData();
253             if (data.length == 0)
254             {
255                 _writeQueue.deQueue().doFinish();
256                 continue;
257             }
258 
259             this.clearError();
260             size_t nBytes = tryWrite(data);
261             if (nBytes > 0 && writeBuffer.popSize(nBytes))
262             {
263                 version (KissDebugMode)
264                     trace("finishing data writing...nBytes", nBytes);
265                 _writeQueue.deQueue().doFinish();
266             }
267 
268             if (this.isError)
269             {
270                 string msg = format("Socket error on write: fd=%d, message=%s",
271                         this.handle, this.erroString);
272                 errorOccurred(msg);
273                 errorf(msg);
274                 break;
275             }
276         }
277     }
278 }