1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.socket.common;
13 
14 import kiss.core;
15 import kiss.event.EventLoop;
16 import kiss.event.core;
17 import kiss.exception;
18 import kiss.container.ByteBuffer;
19 
20 import std.socket;
21 import kiss.logger;
22 
23 
24 alias ConnectionHandler = void delegate(bool isSucceeded);
25 
26 // dfmt off
27 alias UDPReadCallBack = void delegate(in ubyte[] data, Address addr);
28 alias AcceptCallBack = void delegate(Selector loop, Socket socket) ;
29 // dfmt on
30 
31 alias SocketChannelBase = AbstractSocketChannel;
32 // alias AcceptorBase = AbstractListener;
33 // alias StreamSocketBase = AbstractStream;
34 // alias DatagramSocketBase = AbstractDatagramSocket;
35 
36 /**
37 */
38 interface IAcceptor
39 {
40     void onClose();
41     void onRead();
42 }
43 
44 /**
45 */
46 interface Stream
47 {
48 
49 }
50 
51 // alias IStreamSocket = Stream;
52 
53 // dfmt off
54 mixin template ChannelSocketOption() {
55     import std.functional;
56     import std.datetime;
57     import core.stdc.stdint;
58     import std.socket;
59 
60     version (Windows) import SOCKETOPTIONS = core.sys.windows.winsock2;
61 
62     version (Posix) import SOCKETOPTIONS = core.sys.posix.sys.socket;
63 
64     /// Get a socket option.
65     /// Returns: The number of bytes written to $(D result).
66     //returns the length, in bytes, of the actual result - very different from getsockopt()
67     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option,
68         void[] result) @trusted {
69 
70         return  this.socket.getOption(level, option, result);
71     }
72 
73     /// Common case of getting integer and boolean options.
74     pragma(inline) final int getOption(SocketOptionLevel level,
75         SocketOption option, ref int32_t result) @trusted {
76         return  this.socket.getOption(level, option, result);
77     }
78 
79     /// Get the linger option.
80     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option,
81         ref Linger result) @trusted {
82         return  this.socket.getOption(level, option, result);
83     }
84 
85     /// Get a timeout (duration) option.
86     pragma(inline) final void getOption(SocketOptionLevel level,
87         SocketOption option, ref Duration result) @trusted {
88          this.socket.getOption(level, option, result);
89     }
90 
91     /// Set a socket option.
92     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
93         void[] value) @trusted {
94         return  this.socket.setOption(forward!(level, option, value));
95     }
96 
97     /// Common case for setting integer and boolean options.
98     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
99         int32_t value) @trusted {
100         return  this.socket.setOption(forward!(level, option, value));
101     }
102 
103     /// Set the linger option.
104     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
105         Linger value) @trusted {
106         return  this.socket.setOption(forward!(level, option, value));
107     }
108 
109     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option,
110         Duration value) @trusted {
111         return  this.socket.setOption(forward!(level, option, value));
112     }
113 
114     final @property @trusted Address remoteAddress() {
115         return _remoteAddress;
116     }
117     protected Address _remoteAddress;
118 
119     final @property @trusted Socket socket(){
120         return this.socket;
121     }
122 
123     final @property @trusted Address localAddress() {
124         return _localAddress;
125     }
126     protected Address _localAddress;
127 }
128 // dfmt on
129 
130 /**
131 */
132 abstract class AbstractSocketChannel : AbstractChannel
133 {
134     protected AddressFamily _family;
135 
136     this(Selector loop, WatcherType type)
137     {
138         super(loop, type);
139     }
140 
141     protected @property void socket(Socket s)
142     {
143         this.handle = s.handle();
144         this._family = s.addressFamily;
145         version (Posix)
146             s.blocking = false;
147         _socket = s;
148         version (KissDebugMode)
149             trace("new socket fd: ", this.handle);
150     }
151 
152     protected @property Socket socket()
153     {
154         return _socket;
155     }
156 
157     mixin ChannelSocketOption;
158 
159     version (Windows)
160     {
161 
162         void setRead(size_t bytes)
163         {
164             readLen = bytes;
165         }
166 
167         protected size_t readLen;
168     }
169 
170     void start();
171 
172     void onWriteDone() 
173     {
174         assert(false, "unimplemented");
175     }
176 
177 protected:
178     Socket _socket;
179 }
180 
181 /**
182 */
183 interface IDatagramSocket
184 {
185 
186 }
187 
188 /**
189 */
190 class SocketStreamBuffer : StreamWriteBuffer
191 {
192 
193     this(const(ubyte)[] data, DataWrittenHandler handler = null)
194     {
195         _data = data;
196         _site = 0;
197         _sentHandler = handler;
198     }
199 
200     const(ubyte)[] sendData()
201     {
202         return _data[_site .. $];
203     }
204 
205     // add send offiset and return is empty
206     bool popSize(size_t size)
207     {
208         _site += size;
209         if (_site >= _data.length)
210             return true;
211         else
212             return false;
213     }
214     // do send finish
215     void doFinish()
216     {
217         if (_sentHandler)
218         {
219             _sentHandler(_data, _site);
220         }
221         _sentHandler = null;
222         _data = null;
223     }
224 
225     StreamWriteBuffer next()
226     {
227         return _next;
228     }
229 
230     void next(StreamWriteBuffer v)
231     {
232         _next = v;
233     }
234 
235 private:
236     StreamWriteBuffer _next;
237     size_t _site = 0;
238     const(ubyte)[] _data;
239     DataWrittenHandler _sentHandler;
240 }
241 
242 /**
243 */
244 struct WriteBufferQueue
245 {
246     StreamWriteBuffer front() nothrow @safe
247     {
248         return _first;
249     }
250 
251     bool empty() nothrow @safe
252     {
253         return _first is null;
254     }
255 
256     void clear()
257     {
258         StreamWriteBuffer current = _first;
259         while (current !is null)
260         {
261             _first = current.next;
262             current.next = null;
263             current = _first;
264         }
265 
266         _first = null;
267         _last = null;
268     }
269 
270     void enQueue(StreamWriteBuffer wsite)
271     {
272         assert(wsite);
273         if (_last)
274         {
275             _last.next = wsite;
276         }
277         else
278         {
279             _first = wsite;
280         }
281         wsite.next = null;
282         _last = wsite;
283     }
284 
285     StreamWriteBuffer deQueue()
286     {
287         // assert(_first && _last);
288         StreamWriteBuffer wsite = _first;
289         if (_first !is null)
290             _first = _first.next;
291 
292         if (_first is null)
293             _last = null;
294 
295         return wsite;
296     }
297 
298 private:
299     StreamWriteBuffer _last = null;
300     StreamWriteBuffer _first = null;
301 }