1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.selector.iocp;
13 
14 // dfmt off
15 version (Windows) : 
16 // dfmt on
17 
18 import kiss.event.socket;
19 
20 import kiss.event.core;
21 import kiss.event.socket.iocp;
22 import kiss.event.timer;
23 
24 import core.sys.windows.windows;
25 import std.conv;
26 import kiss.logger;
27 
28 /**
29 */
30 class AbstractSelector : Selector
31 {
32     this()
33     {
34         _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
35         _event = new EventChannel(this);
36         _timer.init();
37     }
38 
39     ~this()
40     {
41         // .close(_iocpHandle);
42     }
43 
44     override bool register(AbstractChannel watcher)
45     {
46         assert(watcher !is null);
47 
48         if (watcher.type == WatcherType.Timer)
49         {
50             AbstractTimer wt = cast(AbstractTimer) watcher;
51             assert(wt !is null);
52             if (wt is null || !wt.setTimerOut())
53                 return false;
54             _timer.timeWheel().addNewTimer(wt.timer, wt.wheelSize());
55         }
56         else if (watcher.type == WatcherType.TCP
57                 || watcher.type == WatcherType.Accept 
58                 || watcher.type == WatcherType.UDP)
59         {
60             version (KissDebugMode)
61                 trace("Run CreateIoCompletionPort on socket: ", watcher.handle);
62             CreateIoCompletionPort(cast(HANDLE) watcher.handle, _iocpHandle,
63                     cast(size_t)(cast(void*) watcher), 0);
64         }
65 
66         version (KissDebugMode)
67             infof("register, watcher(fd=%d, type=%s)", watcher.handle, watcher.type);
68         _event.setNext(watcher);
69         return true;
70     }
71 
72     override bool reregister(AbstractChannel watcher)
73     {
74         throw new LoopException("The IOCP does not support reregister!");
75     }
76 
77     override bool deregister(AbstractChannel watcher)
78     {
79         // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM
80         // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp
81         //tracef("deregister (fd=%d)", watcher.handle);
82 
83         // IocpContext _data;
84         // _data.watcher = watcher;
85         // _data.operation = IocpOperation.close;
86         // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
87 
88         return true;
89     }
90 
91     void weakUp()
92     {
93         IocpContext _data;
94         _data.watcher = _event;
95         _data.operation = IocpOperation.event;
96 
97         PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
98     }
99 
100     void onLoop(scope void delegate() handler)
101     {
102         _runing = true;
103         _timer.init();
104         do
105         {
106             handler();
107             handleSocketEvent();
108         }
109         while (_runing);
110     }
111 
112     private void handleSocketEvent()
113     {
114         auto timeout = _timer.doWheel();
115         OVERLAPPED* overlapped;
116         ULONG_PTR key = 0;
117         DWORD bytes = 0;
118 
119         debug
120         {
121             // const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
122             //         &key, &overlapped, INFINITE);
123             // tracef("GetQueuedCompletionStatus, ret=%d", ret);
124 
125             // trace("timeout=", timeout);
126             const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
127                     &key, &overlapped, timeout);
128         }
129         else
130         {
131             const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
132                     &key, &overlapped, timeout);
133         }
134 
135         IocpContext* ev = cast(IocpContext*) overlapped;
136         if (ret == 0)
137         {
138             const auto erro = GetLastError();
139             if (erro == WAIT_TIMEOUT) // || erro == ERROR_OPERATION_ABORTED
140                 return;
141 
142             error("error occurred, code=", erro);
143             if (ev !is null) {
144                 AbstractChannel channel = ev.watcher;
145                 if(channel !is null && !channel.isClosed())
146                     channel.close();
147             } 
148             return;
149         }
150 
151         if (ev is null || ev.watcher is null)
152             warning("ev is null or ev.watche is null");
153         else
154             handleIocpOperation(ev.operation, ev.watcher, bytes);
155     }
156 
157     private void handleIocpOperation(IocpOperation op, AbstractChannel channel, DWORD bytes) {
158 
159         version (KissDebugMode)
160             trace("ev.operation: ", op);
161 
162         switch (op)
163         {
164         case IocpOperation.accept:
165             channel.onRead();
166             break;
167         case IocpOperation.connect:
168             onSocketRead(channel, 0);
169             break;
170         case IocpOperation.read:
171             onSocketRead(channel, bytes);
172             break;
173         case IocpOperation.write:
174             onSocketWrite(channel, bytes);
175             break;
176         case IocpOperation.event:
177             channel.onRead();
178             break;
179         case IocpOperation.close:
180             warning("close: ", );
181             break;
182         default:
183             warning("unsupported operation type: ", op);
184             break;
185         }
186     }
187 
188     override void stop()
189     {
190         _runing = false;
191         weakUp();
192     }
193 
194     void handleTimer()
195     {
196 
197     }
198 
199     void dispose()
200     {
201 
202     }
203 
204     private void onSocketRead(AbstractChannel wt, size_t len)
205     {
206         debug if(wt is null) {
207             warning("channel is null");
208             return;
209         }
210 
211         if(len == 0 || wt.isClosed) {
212             version (KissDebugMode) info("channel closed");
213             return;
214         }
215 
216         AbstractSocketChannel io = cast(AbstractSocketChannel) wt;
217         // assert(io !is null, "The type of channel is: " ~ typeid(wt).name);
218         if (io is null) {
219             warning("The channel socket is null: ");
220             return;
221         }
222         io.setRead(len);
223         wt.onRead();
224     }
225 
226     private void onSocketWrite(AbstractChannel wt, size_t len)
227     {
228         debug if(wt is null) {
229             warning("channel is null");
230             return;
231         }
232         AbstractStream client = cast(AbstractStream) wt;
233         // assert(client !is null, "The type of channel is: " ~ typeid(wt).name);
234         if (client is null) {
235             warning("The channel socket is null: ");
236             return;
237         }
238         client.onWriteDone(len); // Notify the client about how many bytes actually sent.
239     }
240 
241 private:
242     bool _runing;
243     HANDLE _iocpHandle;
244     EventChannel _event;
245     CustomTimer _timer;
246 }