1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.event.core;
13 
14 import kiss.core;
15 
16 import std.socket;
17 import std.exception;
18 import std.bitmanip;
19 import kiss.logger;
20 
21 alias ReadCallBack = void delegate(Object obj);
22 
23 alias DataReceivedHandler = void delegate(in ubyte[] data);
24 alias DataWrittenHandler = void delegate(in ubyte[] data, size_t size);
25 alias AcceptHandler = void delegate(Socket socket);
26 
27 @trusted interface ReadTransport : Channel
28 {
29 
30     void close();
31 
32     void onRead(AbstractChannel watcher) nothrow;
33 
34     void onClose(AbstractChannel watcher) nothrow;
35 }
36 
37 @trusted interface WriteTransport : Channel
38 {
39 
40     void onWrite(AbstractChannel watcher) nothrow;
41 
42     void onClose(AbstractChannel watcher) nothrow;
43 }
44 
45 @trusted interface Transport : ReadTransport, WriteTransport
46 {
47 }
48 
49 // dfmt on
50 
51 interface StreamWriteBuffer
52 {
53     // todo Write Data;
54     const(ubyte)[] sendData();
55 
56     // add send offiset and return is empty
57     bool popSize(size_t size);
58 
59     // do send finish
60     void doFinish();
61 
62     StreamWriteBuffer next();
63     void next(StreamWriteBuffer);
64 }
65 
66 alias ChannelBase = AbstractChannel;
67 
68 /**
69 */
70 interface Channel
71 {
72 
73 }
74 
75 /**
76 */
77 interface Selector
78 {
79     bool register(AbstractChannel channel);
80 
81     bool reregister(AbstractChannel channel);
82 
83     bool deregister(AbstractChannel channel);
84 
85     void stop();
86 
87     void dispose();
88 }
89 
90 /**
91 */
92 abstract class AbstractChannel : Channel
93 {
94     socket_t handle = socket_t.init;
95     ErrorEventHandler errorHandler;
96 
97     protected bool _isRegistered = false;
98 
99     this(Selector loop, WatcherType type)
100     {
101         this._inLoop = loop;
102         _type = type;
103         _flags = BitArray([false, false, false, false, false, false, false,
104                 false, false, false, false, false, false, false, false, false]);
105     }
106 
107     /**
108     */
109     bool isRegistered()
110     {
111         return _isRegistered;
112     }
113 
114     /**
115     */
116     bool isClosed()
117     {
118         return _isClosed;
119     }
120 
121     protected bool _isClosed = false;
122 
123     protected void onClose()
124     {
125         _isRegistered = false;
126         _isClosed = true;
127         version(Windows) {} else {
128             _inLoop.deregister(this);
129         }
130         //  _inLoop = null;
131         clear();
132     }
133 
134     protected void errorOccurred(string msg)
135     {
136         if (errorHandler !is null)
137             errorHandler(msg);
138     }
139 
140     void onRead()
141     {
142         assert(false, "not implemented");
143     }
144 
145     void onWrite()
146     {
147         assert(false, "not implemented");
148     }
149 
150     final bool flag(WatchFlag index)
151     {
152         return _flags[index];
153     }
154 
155     @property WatcherType type()
156     {
157         return _type;
158     }
159 
160     @property Selector eventLoop()
161     {
162         return _inLoop;
163     }
164 
165     void close()
166     {
167         if (!_isClosed)
168         {
169             version (KissDebugMode)
170                 trace("channel closing...", this.handle);
171             onClose();
172             version (KissDebugMode)
173                 trace("channel closed...", this.handle);
174         }
175         else
176         {
177             debug warningf("The watcher(fd=%d) has already been closed", this.handle);
178         }
179     }
180 
181     void setNext(AbstractChannel next)
182     {
183         if (next is this)
184             return; // Can't set to self
185         next._next = _next;
186         next._priv = this;
187         if (_next)
188             _next._priv = next;
189         this._next = next;
190     }
191 
192     void clear()
193     {
194         if (_priv)
195             _priv._next = _next;
196         if (_next)
197             _next._priv = _priv;
198         _next = null;
199         _priv = null;
200     }
201 
202     mixin OverrideErro;
203 
204 protected:
205     final void setFlag(WatchFlag index, bool enable)
206     {
207         _flags[index] = enable;
208     }
209 
210     Selector _inLoop;
211 
212 private:
213     BitArray _flags;
214     WatcherType _type;
215 
216     AbstractChannel _priv;
217     AbstractChannel _next;
218 }
219 
220 /**
221 */
222 class EventChannel : AbstractChannel
223 {
224     this(Selector loop)
225     {
226         super(loop, WatcherType.Event);
227     }
228 
229     void call()
230     {
231         assert(false);
232     }
233 }
234 
235 mixin template OverrideErro()
236 {
237     bool isError()
238     {
239         return _error;
240     }
241 
242     string erroString()
243     {
244         return _erroString;
245     }
246 
247     void clearError()
248     {
249         _error = false;
250         _erroString = "";
251     }
252 
253     bool _error = false;
254     string _erroString;
255 }
256 
257 enum WatcherType : ubyte
258 {
259     Accept = 0,
260     TCP,
261     UDP,
262     Timer,
263     Event,
264     File,
265     None
266 }
267 
268 enum WatchFlag : ushort
269 {
270     None = 0,
271     Read,
272     Write,
273 
274     OneShot = 8,
275     ETMode = 16
276 }
277 
278 final class UdpDataObject
279 {
280     Address addr;
281     ubyte[] data;
282 }
283 
284 final class BaseTypeObject(T)
285 {
286     T data;
287 }
288 
289 class LoopException : Exception
290 {
291     mixin basicExceptionCtors;
292 }
293 
294 // dfmt off
295 version(linux):
296 // dfmt on
297 static if (isCompilerVersionBelow(2078))
298 {
299     version (X86)
300     {
301         enum SO_REUSEPORT = 15;
302     }
303     else version (X86_64)
304     {
305         enum SO_REUSEPORT = 15;
306     }
307     else version (MIPS32)
308     {
309         enum SO_REUSEPORT = 0x0200;
310     }
311     else version (MIPS64)
312     {
313         enum SO_REUSEPORT = 0x0200;
314     }
315     else version (PPC)
316     {
317         enum SO_REUSEPORT = 15;
318     }
319     else version (PPC64)
320     {
321         enum SO_REUSEPORT = 15;
322     }
323     else version (ARM)
324     {
325         enum SO_REUSEPORT = 15;
326     }
327 }