1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.event.core;
13 
14 import kiss.core;
15 
16 import std.socket;
17 import std.exception;
18 import std.bitmanip;
19 import kiss.logger;
20 
21 alias ReadCallBack = void delegate(Object obj);
22 
23 alias DataReceivedHandler = void delegate(in ubyte[] data);
24 alias DataWrittenHandler = void delegate(in ubyte[] data, size_t size);
25 alias AcceptHandler = void delegate(Socket socket);
26 
27 @trusted interface ReadTransport : Channel
28 {
29 
30     void close();
31 
32     void onRead(AbstractChannel watcher) nothrow;
33 
34     void onClose(AbstractChannel watcher) nothrow;
35 }
36 
37 @trusted interface WriteTransport : Channel
38 {
39 
40     void onWrite(AbstractChannel watcher) nothrow;
41 
42     void onClose(AbstractChannel watcher) nothrow;
43 }
44 
45 @trusted interface Transport : ReadTransport, WriteTransport
46 {
47 }
48 
49 // dfmt on
50 
51 interface StreamWriteBuffer
52 {
53     // todo Write Data;
54     const(ubyte)[] sendData();
55 
56     // add send offiset and return is empty
57     bool popSize(size_t size);
58 
59     // do send finish
60     void doFinish();
61 
62     StreamWriteBuffer next();
63     void next(StreamWriteBuffer);
64 }
65 
66 alias ChannelBase = AbstractChannel;
67 
68 /**
69 */
70 interface Channel
71 {
72 
73 }
74 
75 /**
76 */
77 interface Selector
78 {
79     bool register(AbstractChannel channel);
80 
81     bool reregister(AbstractChannel channel);
82 
83     bool deregister(AbstractChannel channel);
84 
85     void stop();
86 
87     void dispose();
88 }
89 
90 /**
91 */
92 abstract class AbstractChannel : Channel
93 {
94     socket_t handle = socket_t.init;
95     ErrorEventHandler errorHandler;
96 
97     protected bool _isRegistered = false;
98 
99     this(Selector loop, WatcherType type)
100     {
101         this._inLoop = loop;
102         _type = type;
103         _flags = BitArray([false, false, false, false, false, false, false,
104                 false, false, false, false, false, false, false, false, false]);
105     }
106 
107     /**
108     */
109     bool isRegistered()
110     {
111         return _isRegistered;
112     }
113 
114     /**
115     */
116     bool isClosed()
117     {
118         return _isClosed;
119     }
120 
121     protected bool _isClosed = false;
122 
123     protected void onClose()
124     {
125         _inLoop.deregister(this);
126         //  _inLoop = null;
127         _isRegistered = false;
128         _isClosed = true;
129         clear();
130     }
131 
132     protected void errorOccurred(string msg)
133     {
134         if (errorHandler !is null)
135             errorHandler(msg);
136     }
137 
138     void onRead()
139     {
140         assert(false, "not implemented");
141     }
142 
143     void onWrite()
144     {
145         assert(false, "not implemented");
146     }
147 
148     final bool flag(WatchFlag index)
149     {
150         return _flags[index];
151     }
152 
153     @property WatcherType type()
154     {
155         return _type;
156     }
157 
158     @property Selector eventLoop()
159     {
160         return _inLoop;
161     }
162 
163     void close()
164     {
165         if (!_isClosed)
166         {
167             version (KissDebugMode)
168                 trace("channel closing...");
169             onClose();
170             version (KissDebugMode)
171                 trace("channel closed...");
172         }
173         else
174         {
175             debug warningf("The watcher(fd=%d) has already been closed", this.handle);
176         }
177     }
178 
179     void setNext(AbstractChannel next)
180     {
181         if (next is this)
182             return; // Can't set to self
183         next._next = _next;
184         next._priv = this;
185         if (_next)
186             _next._priv = next;
187         this._next = next;
188     }
189 
190     void clear()
191     {
192         if (_priv)
193             _priv._next = _next;
194         if (_next)
195             _next._priv = _priv;
196         _next = null;
197         _priv = null;
198     }
199 
200     mixin OverrideErro;
201 
202 protected:
203     final void setFlag(WatchFlag index, bool enable)
204     {
205         _flags[index] = enable;
206     }
207 
208     Selector _inLoop;
209 
210 private:
211     BitArray _flags;
212     WatcherType _type;
213 
214     AbstractChannel _priv;
215     AbstractChannel _next;
216 }
217 
218 /**
219 */
220 class EventChannel : AbstractChannel
221 {
222     this(Selector loop)
223     {
224         super(loop, WatcherType.Event);
225     }
226 
227     void call()
228     {
229         assert(false);
230     }
231 }
232 
233 mixin template OverrideErro()
234 {
235     bool isError()
236     {
237         return _error;
238     }
239 
240     string erroString()
241     {
242         return _erroString;
243     }
244 
245     void clearError()
246     {
247         _error = false;
248         _erroString = "";
249     }
250 
251     bool _error = false;
252     string _erroString;
253 }
254 
255 enum WatcherType : ubyte
256 {
257     Accept = 0,
258     TCP,
259     UDP,
260     Timer,
261     Event,
262     File,
263     None
264 }
265 
266 enum WatchFlag : ushort
267 {
268     None = 0,
269     Read,
270     Write,
271 
272     OneShot = 8,
273     ETMode = 16
274 }
275 
276 final class UdpDataObject
277 {
278     Address addr;
279     ubyte[] data;
280 }
281 
282 final class BaseTypeObject(T)
283 {
284     T data;
285 }
286 
287 class LoopException : Exception
288 {
289     mixin basicExceptionCtors;
290 }
291 
292 // dfmt off
293 version(linux):
294 // dfmt on
295 static if (isCompilerVersionBelow(2078))
296 {
297     version (X86)
298     {
299         enum SO_REUSEPORT = 15;
300     }
301     else version (X86_64)
302     {
303         enum SO_REUSEPORT = 15;
304     }
305     else version (MIPS32)
306     {
307         enum SO_REUSEPORT = 0x0200;
308     }
309     else version (MIPS64)
310     {
311         enum SO_REUSEPORT = 0x0200;
312     }
313     else version (PPC)
314     {
315         enum SO_REUSEPORT = 15;
316     }
317     else version (PPC64)
318     {
319         enum SO_REUSEPORT = 15;
320     }
321     else version (ARM)
322     {
323         enum SO_REUSEPORT = 15;
324     }
325 }