1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.event.socket.posix;
13 
14 // dfmt off
15 version(Posix):
16 
17 // dfmt on
18 
19 import kiss.event.socket.common;
20 import kiss.event.core;
21 import kiss.core;
22 import kiss.util.thread;
23 
24 import std.conv;
25 import std.exception;
26 import std.format;
27 import std.process;
28 import std.socket;
29 import std.string;
30 import kiss.logger;
31 
32 import core.stdc.errno;
33 import core.stdc.string;
34 import core.sys.posix.sys.socket : accept;
35 
36 /**
37 TCP Server
38 */
39 abstract class AbstractListener : AbstractSocketChannel
40 {
41     this(Selector loop, AddressFamily family = AddressFamily.INET)
42     {
43         super(loop, WatcherType.Accept);
44         setFlag(WatchFlag.Read, true);
45         this.socket = new TcpSocket(family);
46     }
47 
48     protected bool onAccept(scope AcceptHandler handler)
49     {
50         version (KissDebugMode)
51             trace("new connection coming...");
52         this.clearError();
53         socket_t clientFd = cast(socket_t)(accept(this.handle, null, null));
54         if (clientFd == socket_t.init)
55             return false;
56 
57         version (KissDebugMode)
58             infof("Listener fd=%d, client fd=%d", this.handle, clientFd);
59 
60         if (handler !is null)
61             handler(new Socket(clientFd, this._family));
62         return true;
63     }
64 
65     override void onWriteDone()
66     {
67         version (KissDebugMode)
68             tracef("a new connection created");
69     }
70 }
71 
72 /**
73 TCP Client
74 */
75 abstract class AbstractStream : AbstractSocketChannel, Stream
76 {
77     SimpleEventHandler disconnectionHandler;
78     // DataWrittenHandler sentHandler;
79 
80     protected bool _isConnected; //if server side always true.
81     // alias UbyteArrayObject = BaseTypeObject!(ubyte[]);
82 
83     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2)
84     {
85         // _readBuffer = new UbyteArrayObject();
86         version (KissDebugMode)
87             trace("Buffer size for read: ", bufferSize);
88         _readBuffer = new ubyte[bufferSize];
89         super(loop, WatcherType.TCP);
90         setFlag(WatchFlag.Read, true);
91         setFlag(WatchFlag.Write, true);
92         setFlag(WatchFlag.ETMode, true);
93     }
94 
95     /**
96     */
97     protected bool tryRead()
98     {
99         bool isDone = true;
100         this.clearError();
101         ptrdiff_t len = this.socket.receive(cast(void[]) this._readBuffer);
102         version (KissDebugMode)
103             trace("read nbytes...", len);
104 
105         if (len > 0)
106         {
107             if (dataReceivedHandler !is null)
108                 dataReceivedHandler(this._readBuffer[0 .. len]);
109 
110             // It's prossible that more data are wainting for read in inner buffer.
111             if (len == _readBuffer.length)
112                 isDone = false;
113         }
114         else if (len < 0)
115         {
116             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
117             // check more error status
118             if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
119             {
120                 this._error = true;
121                 this._erroString = cast(string)fromStringz(strerror(errno));
122             }
123 
124             version (KissDebugMode)
125                 warningf("read error: isDone=%s, errno=%d, message=%s", 
126                     isDone, errno, cast(string)fromStringz(strerror(errno)));
127         }
128         else
129         {
130             version (KissDebugMode)
131                 warningf("connection broken: %s", _remoteAddress.toString());
132             onDisconnected();
133             if (_isClosed)
134                 this.socket.close(); // release the sources
135             else
136                 this.close();
137         }
138 
139         return isDone;
140     }
141 
142     protected void onDisconnected()
143     {
144         _isConnected = false;
145         _isClosed = true;
146         if (disconnectionHandler !is null)
147             disconnectionHandler();
148     }
149 
150     protected bool canWriteAgain = true;
151     int writeRetryLimit = 5;
152     private int writeRetries = 0;
153 
154     /**
155     Warning: It will try the best to write all the data.   
156     // TODO: create a examlple for test
157     */
158     protected void tryWriteAll(in ubyte[] data)
159     {
160         const nBytes = this.socket.send(data);
161         // version (KissDebugMode)
162         tracef("actually sent bytes: %d / %d", nBytes, data.length);
163 
164         if (nBytes > 0)
165         {
166             if (canWriteAgain && nBytes < data.length) //  && writeRetries < writeRetryLimit
167             {
168                 // version (KissDebugMode)
169                 writeRetries++;
170                 tracef("[%d] rewrite: written %d, remaining: %d, total: %d",
171                         writeRetries, nBytes, data.length - nBytes, data.length);
172                 if (writeRetries > writeRetryLimit)
173                     warning("You are writting a Big block of data!!!");
174 
175                 tryWriteAll(data[nBytes .. $]);
176             }
177             else
178                 writeRetries = 0;
179 
180         }
181         else if (nBytes == Socket.ERROR)
182         {
183             if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
184             {
185                 string msg = lastSocketError();
186                 warningf("errno=%d, message: %s", errno, msg);
187                 this._error = true;
188                 this._erroString = msg;
189 
190                 errorOccurred(msg);
191             }
192             else
193             {
194                 // version (KissDebugMode)
195                 warningf("errno=%d, message: %s", errno, lastSocketError());
196                 if (canWriteAgain)
197                 {
198                     import core.thread;
199                     import core.time;
200 
201                     writeRetries++;
202                     tracef("[%d] rewrite: written %d, remaining: %d, total: %d",
203                             writeRetries, nBytes, data.length - nBytes, data.length);
204                     if (writeRetries > writeRetryLimit)
205                         warning("You are writting a Big block of data!!!");
206                     warning("Wait for a 100 msecs to try again");
207                     Thread.sleep(100.msecs);
208                     tryWriteAll(data);
209                 }
210             }
211         }
212         else
213         {
214             version (KissDebugMode)
215             {
216                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
217                 assert(false, "Undefined behavior!");
218             }
219             else
220             {
221                 this._error = true;
222                 this._erroString = lastSocketError();
223             }
224         }
225     }
226 
227     /**
228     Try to write a block of data.
229     */
230     protected size_t tryWrite(in ubyte[] data)
231     {
232         const nBytes = this.socket.send(data);
233         version (KissDebugMode)
234             tracef("actually sent bytes: %d / %d", nBytes, data.length);
235 
236         if (nBytes > 0)
237         {
238             return nBytes;
239         }
240         else if (nBytes == Socket.ERROR)
241         {
242             version (KissDebugMode)
243                 warningf("errno=%d, message: %s", errno, lastSocketError());
244 
245             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
246             // check more error status
247             if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
248             {
249                 this._error = true;
250                 this._erroString = lastSocketError();
251                 warningf("errno=%d, message: %s", errno, this._erroString);
252             }
253         }
254         else
255         {
256             version (KissDebugMode)
257             {
258                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
259                 assert(false, "Undefined behavior!");
260             }
261             else
262             {
263                 this._error = true;
264                 this._erroString = lastSocketError();
265             }
266         }
267         return 0;
268     }
269 
270     protected void doConnect(Address addr)
271     {
272         this.socket.connect(addr);
273     }
274 
275     void cancelWrite()
276     {
277         isWriteCancelling = true;
278     }
279 
280     override void onWriteDone()
281     {
282         // notified by kqueue selector when data writing done
283         version (KissDebugMode)
284             tracef("done with data writing");
285     }
286 
287     // protected UbyteArrayObject _readBuffer;
288     private const(ubyte)[] _readBuffer;
289     protected WriteBufferQueue _writeQueue;
290     protected bool isWriteCancelling = false;
291 
292     /**
293     * Warning: The received data is stored a inner buffer. For a data safe, 
294     * you would make a copy of it. 
295     */
296     DataReceivedHandler dataReceivedHandler;
297 
298 }
299 
300 /**
301 UDP Socket
302 */
303 abstract class AbstractDatagramSocket : AbstractSocketChannel, IDatagramSocket
304 {
305     this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2)
306     {
307         super(loop, WatcherType.UDP);
308         setFlag(WatchFlag.Read, true);
309         setFlag(WatchFlag.ETMode, false);
310 
311         this.socket = new UdpSocket(family);
312         // _socket.blocking = false;
313         _readBuffer = new UdpDataObject();
314         _readBuffer.data = new ubyte[bufferSize];
315 
316         if (family == AddressFamily.INET)
317             _bindAddress = new InternetAddress(InternetAddress.PORT_ANY);
318         else if (family == AddressFamily.INET6)
319             _bindAddress = new Internet6Address(Internet6Address.PORT_ANY);
320         else
321             _bindAddress = new UnknownAddress();
322     }
323 
324     final void bind(Address addr)
325     {
326         if (_binded)
327             return;
328         _bindAddress = addr;
329         socket.bind(_bindAddress);
330         _binded = true;
331     }
332 
333     final bool isBind()
334     {
335         return _binded;
336     }
337 
338     Address bindAddr()
339     {
340         return _bindAddress;
341     }
342 
343     protected UdpDataObject _readBuffer;
344     protected bool _binded = false;
345     protected Address _bindAddress;
346 
347     protected bool tryRead(scope ReadCallBack read)
348     {
349         scope Address createAddress()
350         {
351             enum ushort DPORT = 0;
352             if (AddressFamily.INET == this.socket.addressFamily)
353                 return new InternetAddress(DPORT);
354             else if (AddressFamily.INET6 == this.socket.addressFamily)
355                 return new Internet6Address(DPORT);
356             else
357                 throw new AddressException(
358                         "NOT SUPPORT addressFamily. It only can be AddressFamily.INET or AddressFamily.INET6");
359         }
360 
361         this._readBuffer.addr = createAddress();
362         auto data = this._readBuffer.data;
363         scope (exit)
364             this._readBuffer.data = data;
365         auto len = this.socket.receiveFrom(this._readBuffer.data, this._readBuffer.addr);
366         if (len > 0)
367         {
368             this._readBuffer.data = this._readBuffer.data[0 .. len];
369             read(this._readBuffer);
370         }
371         return false;
372     }
373 
374     override void onWriteDone()
375     {
376         // notified by kqueue selector when data writing done
377         version (KissDebugMode)
378             tracef("done with data writing");
379     }
380 }