1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.event.socket.posix;
13 
14 // dfmt off
15 version(Posix):
16 
17 // dfmt on
18 
19 import kiss.event.socket.common;
20 import kiss.event.core;
21 import kiss.core;
22 import kiss.util.thread;
23 
24 import std.conv;
25 import std.exception;
26 import std.format;
27 import std.process;
28 import std.socket;
29 import std.string;
30 import kiss.logger;
31 
32 import core.stdc.errno;
33 import core.stdc.string;
34 import core.sys.posix.sys.socket : accept;
35 
36 /**
37 TCP Server
38 */
39 abstract class AbstractListener : AbstractSocketChannel
40 {
41     this(Selector loop, AddressFamily family = AddressFamily.INET)
42     {
43         super(loop, WatcherType.Accept);
44         setFlag(WatchFlag.Read, true);
45         this.socket = new TcpSocket(family);
46     }
47 
48     protected bool onAccept(scope AcceptHandler handler)
49     {
50         version (KissDebugMode)
51             trace("new connection coming...");
52         this.clearError();
53         socket_t clientFd = cast(socket_t)(accept(this.handle, null, null));
54         if (clientFd == socket_t.init)
55             return false;
56 
57         version (KissDebugMode)
58             infof("Listener fd=%d, client fd=%d", this.handle, clientFd);
59 
60         if (handler !is null)
61             handler(new Socket(clientFd, this._family));
62         return true;
63     }
64 
65     override void onWriteDone()
66     {
67         version (KissDebugMode)
68             tracef("a new connection created, thread: %s", getTid());
69     }
70 }
71 
72 /**
73 TCP Client
74 */
75 abstract class AbstractStream : AbstractSocketChannel, Stream
76 {
77     SimpleEventHandler disconnectionHandler;
78     // DataWrittenHandler sentHandler;
79 
80     protected bool _isConnected; //if server side always true.
81     // alias UbyteArrayObject = BaseTypeObject!(ubyte[]);
82 
83     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2)
84     {
85         // _readBuffer = new UbyteArrayObject();
86         version (KissDebugMode)
87             trace("Buffer size for read: ", bufferSize);
88         _readBuffer = new ubyte[bufferSize];
89         super(loop, WatcherType.TCP);
90         setFlag(WatchFlag.Read, true);
91         setFlag(WatchFlag.Write, true);
92         setFlag(WatchFlag.ETMode, true);
93     }
94 
95     /**
96     */
97     protected bool tryRead()
98     {
99         bool isDone = true;
100         this.clearError();
101         ptrdiff_t len = this.socket.receive(cast(void[]) this._readBuffer);
102         version (KissDebugMode)
103             trace("read nbytes...", len);
104 
105         if (len > 0)
106         {
107             if (dataReceivedHandler !is null)
108                 dataReceivedHandler(this._readBuffer[0 .. len]);
109 
110             // It's prossible that more data are wainting for read in inner buffer.
111             if (len == _readBuffer.length)
112                 isDone = false;
113         }
114         else if (len < 0)
115         {
116             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
117             // check more error status
118             if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
119             {
120                 this._error = true;
121                 this._erroString = fromStringz(strerror(errno)).idup;
122             }
123             else
124                 isDone = false;
125         }
126         else
127         {
128             version (KissDebugMode)
129                 warningf("connection broken: %s", _remoteAddress.toString());
130             onDisconnected();
131             if (_isClosed)
132                 this.socket.close(); // release the sources
133             else
134                 this.close();
135         }
136 
137         return isDone;
138     }
139 
140     protected void onDisconnected()
141     {
142         _isConnected = false;
143         _isClosed = true;
144         if (disconnectionHandler !is null)
145             disconnectionHandler();
146     }
147 
148     protected bool canWriteAgain = true;
149     int writeRetryLimit = 5;
150     private int writeRetries = 0;
151 
152     /**
153     Warning: It will try the best to write all the data.   
154     // TODO: create a examlple for test
155     */
156     protected void tryWriteAll(in ubyte[] data)
157     {
158         const nBytes = this.socket.send(data);
159         // version (KissDebugMode)
160         tracef("actually sent bytes: %d / %d", nBytes, data.length);
161 
162         if (nBytes > 0)
163         {
164             if (canWriteAgain && nBytes < data.length) //  && writeRetries < writeRetryLimit
165             {
166                 // version (KissDebugMode)
167                 writeRetries++;
168                 tracef("[%d] rewrite: written %d, remaining: %d, total: %d",
169                         writeRetries, nBytes, data.length - nBytes, data.length);
170                 if (writeRetries > writeRetryLimit)
171                     warning("You are writting a Big block of data!!!");
172 
173                 tryWriteAll(data[nBytes .. $]);
174             }
175             else
176                 writeRetries = 0;
177 
178         }
179         else if (nBytes == Socket.ERROR)
180         {
181             if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
182             {
183                 string msg = lastSocketError();
184                 warningf("errno=%d, message: %s", errno, msg);
185                 this._error = true;
186                 this._erroString = msg;
187 
188                 errorOccurred(msg);
189             }
190             else
191             {
192                 // version (KissDebugMode)
193                 warningf("errno=%d, message: %s", errno, lastSocketError());
194                 if (canWriteAgain)
195                 {
196                     import core.thread;
197                     import core.time;
198 
199                     writeRetries++;
200                     tracef("[%d] rewrite: written %d, remaining: %d, total: %d",
201                             writeRetries, nBytes, data.length - nBytes, data.length);
202                     if (writeRetries > writeRetryLimit)
203                         warning("You are writting a Big block of data!!!");
204                     warning("Wait for a 100 msecs to try again");
205                     Thread.sleep(100.msecs);
206                     tryWriteAll(data);
207                 }
208             }
209         }
210         else
211         {
212             version (KissDebugMode)
213             {
214                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
215                 assert(false, "Undefined behavior!");
216             }
217             else
218             {
219                 this._error = true;
220                 this._erroString = lastSocketError();
221             }
222         }
223     }
224 
225     /**
226     Try to write a block of data.
227     */
228     protected size_t tryWrite(in ubyte[] data)
229     {
230         const nBytes = this.socket.send(data);
231         version (KissDebugMode)
232             tracef("actually sent bytes: %d / %d", nBytes, data.length);
233 
234         if (nBytes > 0)
235         {
236             return nBytes;
237         }
238         else if (nBytes == Socket.ERROR)
239         {
240             version (KissDebugMode)
241                 warningf("errno=%d, message: %s", errno, lastSocketError());
242 
243             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
244             // check more error status
245             if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
246             {
247                 this._error = true;
248                 this._erroString = lastSocketError();
249             }
250         }
251         else
252         {
253             version (KissDebugMode)
254             {
255                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
256                 assert(false, "Undefined behavior!");
257             }
258             else
259             {
260                 this._error = true;
261                 this._erroString = lastSocketError();
262             }
263         }
264         return 0;
265     }
266 
267     protected void doConnect(Address addr)
268     {
269         this.socket.connect(addr);
270     }
271 
272     void cancelWrite()
273     {
274         isWriteCancelling = true;
275     }
276 
277     override void onWriteDone()
278     {
279         // notified by kqueue selector when data writing done
280         version (KissDebugMode)
281             tracef("done with data writing, thread: %s", getTid());
282     }
283 
284     // protected UbyteArrayObject _readBuffer;
285     private const(ubyte)[] _readBuffer;
286     protected WriteBufferQueue _writeQueue;
287     protected bool isWriteCancelling = false;
288 
289     /**
290     * Warning: The received data is stored a inner buffer. For a data safe, 
291     * you would make a copy of it. 
292     */
293     DataReceivedHandler dataReceivedHandler;
294 
295 }
296 
297 /**
298 UDP Socket
299 */
300 abstract class AbstractDatagramSocket : AbstractSocketChannel, IDatagramSocket
301 {
302     this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2)
303     {
304         super(loop, WatcherType.UDP);
305         setFlag(WatchFlag.Read, true);
306         setFlag(WatchFlag.ETMode, false);
307 
308         this.socket = new UdpSocket(family);
309         // _socket.blocking = false;
310         _readBuffer = new UdpDataObject();
311         _readBuffer.data = new ubyte[bufferSize];
312 
313         if (family == AddressFamily.INET)
314             _bindAddress = new InternetAddress(InternetAddress.PORT_ANY);
315         else if (family == AddressFamily.INET6)
316             _bindAddress = new Internet6Address(Internet6Address.PORT_ANY);
317         else
318             _bindAddress = new UnknownAddress();
319     }
320 
321     final void bind(Address addr)
322     {
323         if (_binded)
324             return;
325         _bindAddress = addr;
326         socket.bind(_bindAddress);
327         _binded = true;
328     }
329 
330     final bool isBind()
331     {
332         return _binded;
333     }
334 
335     Address bindAddr()
336     {
337         return _bindAddress;
338     }
339 
340     protected UdpDataObject _readBuffer;
341     protected bool _binded = false;
342     protected Address _bindAddress;
343 
344     protected bool tryRead(scope ReadCallBack read)
345     {
346         scope Address createAddress()
347         {
348             enum ushort DPORT = 0;
349             if (AddressFamily.INET == this.socket.addressFamily)
350                 return new InternetAddress(DPORT);
351             else if (AddressFamily.INET6 == this.socket.addressFamily)
352                 return new Internet6Address(DPORT);
353             else
354                 throw new AddressException(
355                         "NOT SUPPORT addressFamily. It only can be AddressFamily.INET or AddressFamily.INET6");
356         }
357 
358         this._readBuffer.addr = createAddress();
359         auto data = this._readBuffer.data;
360         scope (exit)
361             this._readBuffer.data = data;
362         auto len = this.socket.receiveFrom(this._readBuffer.data, this._readBuffer.addr);
363         if (len > 0)
364         {
365             this._readBuffer.data = this._readBuffer.data[0 .. len];
366             read(this._readBuffer);
367         }
368         return false;
369     }
370 
371     override void onWriteDone()
372     {
373         // notified by kqueue selector when data writing done
374         version (KissDebugMode)
375             tracef("done with data writing, thread: %s", getTid());
376     }
377 }