1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.selector.epoll;
13 
14 // dfmt off
15 version(linux):
16 
17 // dfmt on
18 
19 import std.exception;
20 import std.socket;
21 import std.string;
22 import kiss.logger;
23 
24 import core.time;
25 import core.stdc.string;
26 import core.stdc.errno;
27 import core.sys.posix.sys.types; // for ssize_t, size_t
28 import core.sys.posix.netinet.tcp;
29 import core.sys.posix.netinet.in_;
30 import core.sys.posix.unistd;
31 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC;
32 
33 import kiss.event.core;
34 import kiss.event.socket;
35 import kiss.event.timer;
36 import kiss.event.timer.epoll;
37 
38 /**
39 */
40 class AbstractSelector : Selector
41 {
42     this()
43     {
44         _epollFD = epoll_create1(0);
45         _event = new EpollEventChannel(this);
46         register(_event);
47     }
48     
49     ~this()
50     {
51         dispose();
52     }
53 
54     void dispose()
55     {
56         if(isDisposed)
57             return;
58         isDisposed = true;
59         deregister(_event);
60         core.sys.posix.unistd.close(_epollFD);
61     }
62     private bool isDisposed = false;
63 
64 
65     override bool register(AbstractChannel watcher)
66     {
67         assert(watcher !is null);
68 
69         if (watcher.type == WatcherType.Timer)
70         {
71             auto wt = cast(AbstractTimer) watcher;
72             if (wt !is null)
73                 wt.setTimer();
74         }
75 
76         // version(KissDebugMode) infof("register, watcher(fd=%d)", watcher.handle);
77         const fd = watcher.handle;
78         assert(fd >= 0, "The watcher.handle is not initilized!");
79 
80         // if(fd < 0) return false;
81         epoll_event ev = buildEpollEvent(watcher);
82         if ((epoll_ctl(_epollFD, EPOLL_CTL_ADD, fd, &ev)) != 0)
83         {
84             if (errno != EEXIST)
85                 return false;
86         }
87         
88         _event.setNext(watcher);
89         return true;
90     }
91 
92     override bool reregister(AbstractChannel watcher)
93     {
94         assert(watcher !is null);
95         const int fd = watcher.handle; 
96         if (fd < 0)
97             return false;
98         auto ev = buildEpollEvent(watcher);
99        return epoll_ctl(_epollFD, EPOLL_CTL_MOD, fd, &ev) == 0;
100     }
101 
102     override bool deregister(AbstractChannel watcher)
103     {
104         assert(watcher !is null);
105         // version(KissDebugMode) infof("unregister watcher(fd=%d)", watcher.handle);
106 
107         const int fd = watcher.handle;
108         if (fd < 0)
109             return false;
110 
111         if ((epoll_ctl(_epollFD, EPOLL_CTL_DEL, fd, null)) != 0)
112         {
113             errorf("unregister failed, watcher.handle=%d", watcher.handle);
114             return false;
115         }
116         // TODO: check this
117         // watcher.clear();
118         return true;
119     }
120 
121  
122     void onLoop(scope void delegate() weak)
123     {
124         _runing = true;
125         do
126         {
127             weak();
128             handleEpollEvent();
129         }
130         while (_runing);
131     }
132 
133     private void handleEpollEvent()
134     {
135         epoll_event[64] events;
136         const int len = epoll_wait(_epollFD, events.ptr, events.length, 10);
137         foreach (i; 0 .. len)
138         {
139             AbstractChannel watch = cast(AbstractChannel)(events[i].data.ptr);
140             if (watch is null)
141             {
142                 version(KissDebugMode) warningf("watcher is null");
143                 continue;
144             }
145 
146             if (isErro(events[i].events))
147             {
148                 version(KissDebugMode) warning("close event: ", watch.handle);
149                 watch.close();
150                 continue;
151             }
152 
153             if (watch.isRegistered && isRead(events[i].events))
154             {
155                 watch.onRead();
156             }
157 
158             if (watch.isRegistered && isWrite(events[i].events))
159             {
160                 AbstractSocketChannel wt = cast(AbstractSocketChannel) watch;
161                     assert(wt !is null);
162                     wt.onWriteDone();
163                 // watch.onWrite();
164             }
165         }
166     }
167 
168     override void stop()
169     {
170         _runing = false;
171     }
172 
173 protected:
174     bool isErro(uint events) nothrow
175     {
176         return (events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) != 0;
177     }
178 
179     bool isRead(uint events) nothrow
180     {
181         return (events & EPOLLIN) != 0;
182     }
183 
184     bool isWrite(uint events) nothrow
185     {
186         return (events & EPOLLOUT) != 0;
187     }
188 
189     static epoll_event buildEpollEvent(AbstractChannel watch)
190     {
191         epoll_event ev;
192         ev.data.ptr = cast(void*) watch;
193         ev.events = EPOLLRDHUP | EPOLLERR | EPOLLHUP;
194         if (watch.flag(WatchFlag.Read))
195             ev.events |= EPOLLIN;
196         if (watch.flag(WatchFlag.Write))
197             ev.events |= EPOLLOUT;
198         if (watch.flag(WatchFlag.OneShot))
199             ev.events |= EPOLLONESHOT;
200         if (watch.flag(WatchFlag.ETMode))
201             ev.events |= EPOLLET;
202         return ev;
203     }
204 
205 private:
206     bool _runing;
207     int _epollFD;
208     EventChannel _event;
209 }
210 
211 /**
212 */
213 class EpollEventChannel : EventChannel
214 {
215     alias UlongObject = BaseTypeObject!ulong;
216     this(Selector loop)
217     {
218         super(loop);
219         setFlag(WatchFlag.Read, true);
220         _readBuffer = new UlongObject();
221         this.handle = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
222     }
223 
224     ~this()
225     {
226         close();
227     }
228 
229     override void call()
230     {
231         ulong value = 1;
232         core.sys.posix.unistd.write(this.handle, &value, value.sizeof);
233     }
234 
235     override void onRead()
236     {
237         readEvent((Object obj) {  });
238         super.onRead();
239     }
240 
241     bool readEvent(scope ReadCallBack read)
242     {
243         this.clearError();
244         ulong value;
245         core.sys.posix.unistd.read(this.handle, &value, value.sizeof);
246         this._readBuffer.data = value;
247         if (read)
248             read(this._readBuffer);
249         return false;
250     }
251 
252     UlongObject _readBuffer;
253 }
254 
255 enum
256 {
257     EFD_SEMAPHORE = 0x1,
258     EFD_CLOEXEC = 0x80000,
259     EFD_NONBLOCK = 0x800
260 };
261 
262 enum
263 {
264     EPOLL_CLOEXEC = 0x80000,
265     EPOLL_NONBLOCK = 0x800
266 }
267 
268 enum
269 {
270     EPOLLIN = 0x001,
271     EPOLLPRI = 0x002,
272     EPOLLOUT = 0x004,
273     EPOLLRDNORM = 0x040,
274     EPOLLRDBAND = 0x080,
275     EPOLLWRNORM = 0x100,
276     EPOLLWRBAND = 0x200,
277     EPOLLMSG = 0x400,
278     EPOLLERR = 0x008,
279     EPOLLHUP = 0x010,
280     EPOLLRDHUP = 0x2000, // since Linux 2.6.17
281     EPOLLONESHOT = 1u << 30,
282     EPOLLET = 1u << 31
283 }
284 
285 /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
286 enum
287 {
288     EPOLL_CTL_ADD = 1, // Add a file descriptor to the interface.
289     EPOLL_CTL_DEL = 2, // Remove a file descriptor from the interface.
290     EPOLL_CTL_MOD = 3, // Change file descriptor epoll_event structure.
291 }
292 
293 
294 
295 // dfmt off
296 extern (C) : @system : nothrow :
297 // dfmt on
298 
299 align(1) struct epoll_event
300 {
301 align(1):
302 uint events;
303     epoll_data_t data;
304 }
305 
306 union epoll_data_t
307 {
308     void* ptr;
309     int fd;
310     uint u32;
311     ulong u64;
312 }
313 
314 int epoll_create(int size);
315 int epoll_create1(int flags);
316 int epoll_ctl(int epfd, int op, int fd, epoll_event* event);
317 int epoll_wait(int epfd, epoll_event* events, int maxevents, int timeout);
318 
319 socket_t eventfd(uint initval, int flags);
320