1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.socket.common;
13 
14 import kiss.core;
15 import kiss.event.EventLoop;
16 import kiss.event.core;
17 import kiss.exception;
18 import kiss.container.ByteBuffer;
19 
20 import std.socket;
21 import kiss.logger;
22 
23 
24 alias ConnectionHandler = void delegate(bool isSucceeded);
25 
26 // dfmt off
27 alias UDPReadCallBack = void delegate(in ubyte[] data, Address addr);
28 alias AcceptCallBack = void delegate(Selector loop, Socket socket) ;
29 // dfmt on
30 
31 alias SocketChannelBase = AbstractSocketChannel;
32 // alias AcceptorBase = AbstractListener;
33 // alias StreamSocketBase = AbstractStream;
34 // alias DatagramSocketBase = AbstractDatagramSocket;
35 
36 /**
37 */
38 interface IAcceptor
39 {
40     void onClose();
41     void onRead();
42 }
43 
44 /**
45 */
46 interface Stream
47 {
48 
49 }
50 
51 // alias IStreamSocket = Stream;
52 
53 // dfmt off
54 mixin template ChannelSocketOption() {
55     import std.functional;
56     import std.datetime;
57     import core.stdc.stdint;
58     import std.socket;
59 
60     version (Windows) import SOCKETOPTIONS = core.sys.windows.winsock2;
61 
62     version (Posix) import SOCKETOPTIONS = core.sys.posix.sys.socket;
63 
64     /// Get a socket option.
65     /// Returns: The number of bytes written to $(D result).
66     //returns the length, in bytes, of the actual result - very different from getsockopt()
67     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option,
68         void[] result) @trusted {
69 
70         return  this.socket.getOption(level, option, result);
71     }
72 
73     /// Common case of getting integer and boolean options.
74     pragma(inline) final int getOption(SocketOptionLevel level,
75         SocketOption option, ref int32_t result) @trusted {
76         return  this.socket.getOption(level, option, result);
77     }
78 
79     /// Get the linger option.
80     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option,
81         ref Linger result) @trusted {
82         return  this.socket.getOption(level, option, result);
83     }
84 
85     /// Get a timeout (duration) option.
86     pragma(inline) final void getOption(SocketOptionLevel level,
87         SocketOption option, ref Duration result) @trusted {
88          this.socket.getOption(level, option, result);
89     }
90 
91     /// Set a socket option.
92     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
93         void[] value) @trusted {
94         return  this.socket.setOption(forward!(level, option, value));
95     }
96 
97     /// Common case for setting integer and boolean options.
98     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
99         int32_t value) @trusted {
100         return  this.socket.setOption(forward!(level, option, value));
101     }
102 
103     /// Set the linger option.
104     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
105         Linger value) @trusted {
106         return  this.socket.setOption(forward!(level, option, value));
107     }
108 
109     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
110         Duration value) @trusted {
111         return  this.socket.setOption(forward!(level, option, value));
112     }
113 
114     final @property @trusted Address remoteAddress() {
115         return _remoteAddress;
116     }
117     protected Address _remoteAddress;
118 
119     final @property @trusted Socket socket(){
120         return this.socket;
121     }
122 
123     final @property @trusted Address localAddress() {
124         return _localAddress;
125     }
126     protected Address _localAddress;
127 }
128 // dfmt on
129 
130 /**
131 */
132 abstract class AbstractSocketChannel : AbstractChannel
133 {
134     protected AddressFamily _family;
135 
136     this(Selector loop, WatcherType type)
137     {
138         super(loop, type);
139     }
140 
141     protected @property void socket(Socket s)
142     {
143         this.handle = s.handle();
144         this._family = s.addressFamily;
145         // _localAddress = s.localAddress();
146         version (Posix)
147             s.blocking = false;
148         _socket = s;
149         version (KissDebugMode)
150             trace("new socket fd: ", this.handle);
151     }
152 
153     protected @property Socket socket()
154     {
155         return _socket;
156     }
157 
158     mixin ChannelSocketOption;
159 
160     version (Windows)
161     {
162 
163         void setRead(size_t bytes)
164         {
165             readLen = bytes;
166         }
167 
168         protected size_t readLen;
169     }
170 
171     void start();
172 
173     void onWriteDone() 
174     {
175         assert(false, "not implemented");
176     }
177 
178 protected:
179     Socket _socket;
180 }
181 
182 /**
183 */
184 interface IDatagramSocket
185 {
186 
187 }
188 
189 /**
190 */
191 class SocketStreamBuffer : StreamWriteBuffer
192 {
193 
194     this(const(ubyte)[] data, DataWrittenHandler handler = null)
195     {
196         _data = data;
197         _site = 0;
198         _sentHandler = handler;
199     }
200 
201     const(ubyte)[] sendData()
202     {
203         return _data[_site .. $];
204     }
205 
206     // add send offiset and return is empty
207     bool popSize(size_t size)
208     {
209         _site += size;
210         if (_site >= _data.length)
211             return true;
212         else
213             return false;
214     }
215     // do send finish
216     void doFinish()
217     {
218         if (_sentHandler)
219         {
220             _sentHandler(_data, _site);
221         }
222         _sentHandler = null;
223         _data = null;
224     }
225 
226     StreamWriteBuffer next()
227     {
228         return _next;
229     }
230 
231     void next(StreamWriteBuffer v)
232     {
233         _next = v;
234     }
235 
236 private:
237     StreamWriteBuffer _next;
238     size_t _site = 0;
239     const(ubyte)[] _data;
240     DataWrittenHandler _sentHandler;
241 }
242 
243 /**
244 */
245 struct WriteBufferQueue
246 {
247     StreamWriteBuffer front() nothrow @safe
248     {
249         return _first;
250     }
251 
252     bool empty() nothrow @safe
253     {
254         return _first is null;
255     }
256 
257     void clear()
258     {
259         StreamWriteBuffer current = _first;
260         while (current !is null)
261         {
262             _first = current.next;
263             current.next = null;
264             current = _first;
265         }
266 
267         _first = null;
268         _last = null;
269     }
270 
271     void enQueue(StreamWriteBuffer wsite)
272     {
273         assert(wsite);
274         if (_last)
275         {
276             _last.next = wsite;
277         }
278         else
279         {
280             _first = wsite;
281         }
282         wsite.next = null;
283         _last = wsite;
284     }
285 
286     StreamWriteBuffer deQueue()
287     {
288         // assert(_first && _last);
289         StreamWriteBuffer wsite = _first;
290         if (_first !is null)
291             _first = _first.next;
292 
293         if (_first is null)
294             _last = null;
295 
296         return wsite;
297     }
298 
299 private:
300     StreamWriteBuffer _last = null;
301     StreamWriteBuffer _first = null;
302 }