1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.selector.epoll;
13 
14 // dfmt off
15 version(linux):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 import kiss.logger;
23 
24 import core.time;
25 import core.stdc.string;
26 import core.stdc.errno;
27 import core.sys.posix.sys.types; // for ssize_t, size_t
28 import core.sys.posix.netinet.tcp;
29 import core.sys.posix.netinet.in_;
30 import core.sys.posix.unistd;
31 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC;
32 
33 import kiss.event.core;
34 import kiss.event.socket;
35 import kiss.event.timer;
36 import kiss.event.timer.epoll;
37 
38 /**
39 */
40 class AbstractSelector : Selector
41 {
42     this()
43     {
44         _epollFD = epoll_create1(0);
45         _event = new EpollEventChannel(this);
46         register(_event);
47     }
48     
49     ~this()
50     {
51         dispose();
52     }
53 
54     void dispose()
55     {
56         if(isDisposed)
57             return;
58         isDisposed = true;
59         deregister(_event);
60         core.sys.posix.unistd.close(_epollFD);
61     }
62     private bool isDisposed = false;
63 
64 
65     override bool register(AbstractChannel watcher)
66     {
67         assert(watcher !is null);
68 
69         if (watcher.type == WatcherType.Timer)
70         {
71             auto wt = cast(AbstractTimer) watcher;
72             if (wt !is null)
73                 wt.setTimer();
74         }
75 
76         // version(KissDebugMode) infof("register, watcher(fd=%d)", watcher.handle);
77         const fd = watcher.handle;
78         assert(fd >= 0, "The watcher.handle is not initilized!");
79 
80         // if(fd < 0) return false;
81         epoll_event ev = buildEpollEvent(watcher);
82         if ((epoll_ctl(_epollFD, EPOLL_CTL_ADD, fd, &ev)) != 0)
83         {
84             if (errno != EEXIST)
85                 return false;
86         }
87         
88         _event.setNext(watcher);
89         return true;
90     }
91 
92     override bool reregister(AbstractChannel watcher)
93     {
94         assert(watcher !is null);
95         const int fd = watcher.handle; 
96         if (fd < 0)
97             return false;
98         auto ev = buildEpollEvent(watcher);
99        return epoll_ctl(_epollFD, EPOLL_CTL_MOD, fd, &ev) == 0;
100     }
101 
102     override bool deregister(AbstractChannel watcher)
103     {
104         assert(watcher !is null);
105         // version(KissDebugMode) infof("unregister watcher(fd=%d)", watcher.handle);
106 
107         const int fd = watcher.handle;
108         if (fd < 0)
109             return false;
110 
111         if ((epoll_ctl(_epollFD, EPOLL_CTL_DEL, fd, null)) != 0)
112         {
113             errorf("unregister failed, watcher.handle=%d", watcher.handle);
114             return false;
115         }
116         // TODO: check this
117         // watcher.currtLoop = null;
118         // watcher.clear();
119         return true;
120     }
121 
122  
123     void onLoop(scope void delegate() weak)
124     {
125         _runing = true;
126         do
127         {
128             weak();
129             handleEpollEvent();
130         }
131         while (_runing);
132     }
133 
134     private void handleEpollEvent()
135     {
136         epoll_event[64] events;
137         const int len = epoll_wait(_epollFD, events.ptr, events.length, 10);
138         foreach (i; 0 .. len)
139         {
140             AbstractChannel watch = cast(AbstractChannel)(events[i].data.ptr);
141             if (watch is null)
142             {
143                 warningf("watcher(fd=%d) is null", watch.handle);
144                 continue;
145             }
146 
147             if (isErro(events[i].events))
148             {
149                 version(KissDebugMode) warning("close event: ", watch.handle);
150                 watch.close();
151                 continue;
152             }
153 
154             if (watch.isRegistered && isRead(events[i].events))
155             {
156                 watch.onRead();
157             }
158 
159             if (watch.isRegistered && isWrite(events[i].events))
160             {
161                 AbstractSocketChannel wt = cast(AbstractSocketChannel) watch;
162                     assert(wt !is null);
163                     wt.onWriteDone();
164                 // watch.onWrite();
165             }
166         }
167     }
168 
169     override void stop()
170     {
171         _runing = false;
172     }
173 
174 protected:
175     bool isErro(uint events) nothrow
176     {
177         return (events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0;
178     }
179 
180     bool isRead(uint events) nothrow
181     {
182         return (events & EPOLLIN) != 0;
183     }
184 
185     bool isWrite(uint events) nothrow
186     {
187         return (events & EPOLLOUT) != 0;
188     }
189 
190     static epoll_event buildEpollEvent(AbstractChannel watch)
191     {
192         epoll_event ev;
193         ev.data.ptr = cast(void*) watch;
194         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
195         if (watch.flag(WatchFlag.Read))
196             ev.events |= EPOLLIN;
197         if (watch.flag(WatchFlag.Write))
198             ev.events |= EPOLLOUT;
199         if (watch.flag(WatchFlag.OneShot))
200             ev.events |= EPOLLONESHOT;
201         if (watch.flag(WatchFlag.ETMode))
202             ev.events |= EPOLLET;
203         return ev;
204     }
205 
206 private:
207     bool _runing;
208     int _epollFD;
209     EventChannel _event;
210 }
211 
212 /**
213 */
214 class EpollEventChannel : EventChannel
215 {
216     alias UlongObject = BaseTypeObject!ulong;
217     this(Selector loop)
218     {
219         super(loop);
220         setFlag(WatchFlag.Read, true);
221         _readBuffer = new UlongObject();
222         this.handle = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
223     }
224 
225     ~this()
226     {
227         close();
228     }
229 
230     override void call()
231     {
232         ulong value = 1;
233         core.sys.posix.unistd.write(this.handle, &value, value.sizeof);
234     }
235 
236     override void onRead()
237     {
238         readEvent((Object obj) {  });
239         super.onRead();
240     }
241 
242     bool readEvent(scope ReadCallBack read)
243     {
244         this.clearError();
245         ulong value;
246         core.sys.posix.unistd.read(this.handle, &value, value.sizeof);
247         this._readBuffer.data = value;
248         if (read)
249             read(this._readBuffer);
250         return false;
251     }
252 
253     UlongObject _readBuffer;
254 }
255 
256 enum
257 {
258     EFD_SEMAPHORE = 0x1,
259     EFD_CLOEXEC = 0x80000,
260     EFD_NONBLOCK = 0x800
261 };
262 
263 enum
264 {
265     EPOLL_CLOEXEC = 0x80000,
266     EPOLL_NONBLOCK = 0x800
267 }
268 
269 enum
270 {
271     EPOLLIN = 0x001,
272     EPOLLPRI = 0x002,
273     EPOLLOUT = 0x004,
274     EPOLLRDNORM = 0x040,
275     EPOLLRDBAND = 0x080,
276     EPOLLWRNORM = 0x100,
277     EPOLLWRBAND = 0x200,
278     EPOLLMSG = 0x400,
279     EPOLLERR = 0x008,
280     EPOLLHUP = 0x010,
281     EPOLLRDHUP = 0x2000, // since Linux 2.6.17
282     EPOLLONESHOT = 1u << 30,
283     EPOLLET = 1u << 31
284 }
285 
286 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
287 enum
288 {
289     EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface.
290     EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface.
291     EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure.
292 }
293 
294 
295 
296 // dfmt off
297 extern (C) : @system : nothrow :
298 // dfmt on
299 
300 align(1) struct epoll_event
301 {
302 align(1):
303 uint events;
304     epoll_data_t data;
305 }
306 
307 union epoll_data_t
308 {
309     void* ptr;
310     int fd;
311     uint u32;
312     ulong u64;
313 }
314 
315 int epoll_create(int size);
316 int epoll_create1(int flags);
317 int epoll_ctl(int epfd, int op, int fd, epoll_event* event);
318 int epoll_wait(int epfd, epoll_event* events, int maxevents, int timeout);
319 
320 socket_t eventfd(uint initval, int flags);
321