1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module kiss.event.selector.iocp;
13 
14 // dfmt off
15 version (Windows) : 
16 // dfmt on
17 
18 import kiss.event.socket;
19 
20 import kiss.event.core;
21 import kiss.event.socket.iocp;
22 import kiss.event.timer;
23 
24 import core.sys.windows.windows;
25 import std.conv;
26 import kiss.logger;
27 
28 /**
29 */
30 class AbstractSelector : Selector
31 {
32     this()
33     {
34         _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
35         _event = new EventChannel(this);
36         _timer.init();
37     }
38 
39     ~this()
40     {
41         // .close(_iocpHandle);
42     }
43 
44     override bool register(AbstractChannel watcher)
45     {
46         assert(watcher !is null);
47 
48         if (watcher.type == WatcherType.Timer)
49         {
50             AbstractTimer wt = cast(AbstractTimer) watcher;
51             assert(wt !is null);
52             if (wt is null || !wt.setTimerOut())
53                 return false;
54             _timer.timeWheel().addNewTimer(wt.timer, wt.wheelSize());
55         }
56         else if (watcher.type == WatcherType.TCP
57                 || watcher.type == WatcherType.Accept || watcher.type == WatcherType.UDP)
58         {
59             version (KissDebugMode)
60                 trace("Run CreateIoCompletionPort on socket: ", watcher.handle);
61             CreateIoCompletionPort(cast(HANDLE) watcher.handle, _iocpHandle,
62                     cast(size_t)(cast(void*) watcher), 0);
63         }
64 
65         version (KissDebugMode)
66             infof("register, watcher(fd=%d, type=%s)", watcher.handle, watcher.type);
67         _event.setNext(watcher);
68         return true;
69     }
70 
71     override bool reregister(AbstractChannel watcher)
72     {
73         throw new LoopException("The IOCP does not support reregister!");
74     }
75 
76     override bool deregister(AbstractChannel watcher)
77     {
78 
79         // IocpContext _data;
80         // _data.watcher = watcher;
81         // _data.operation = IocpOperation.close;
82         // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
83 
84         return true;
85     }
86 
87     void weakUp()
88     {
89         IocpContext _data;
90         _data.watcher = _event;
91         _data.operation = IocpOperation.event;
92 
93         PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped);
94     }
95 
96     void onLoop(scope void delegate() handler)
97     {
98         _runing = true;
99         _timer.init();
100         do
101         {
102             handler();
103             handleSocketEvent();
104         }
105         while (_runing);
106     }
107 
108     private void handleSocketEvent()
109     {
110         auto timeout = _timer.doWheel();
111         OVERLAPPED* overlapped;
112         ULONG_PTR key = 0;
113         DWORD bytes = 0;
114 
115         debug
116         {
117             // const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
118             //         &key, &overlapped, INFINITE);
119             // tracef("GetQueuedCompletionStatus, ret=%d", ret);
120 
121             // trace("timeout=", timeout);
122             const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
123                     &key, &overlapped, timeout);
124         }
125         else
126         {
127             const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes,
128                     &key, &overlapped, timeout);
129         }
130 
131         if (ret == 0)
132         {
133             const auto erro = GetLastError();
134             if (erro == WAIT_TIMEOUT) // || erro == ERROR_OPERATION_ABORTED
135                 return;
136 
137             error("error occurred, code=", erro);
138             auto ev = cast(IocpContext*) overlapped;
139             if (ev && ev.watcher)
140                 ev.watcher.close();
141             return;
142         }
143 
144         auto ev = cast(IocpContext*) overlapped;
145         if (ev is null || ev.watcher is null)
146         {
147             warning("ev is null: ", ev is null);
148             return;
149         }
150 
151         version (KissDebugMode)
152             trace("ev.operation: ", ev.operation);
153 
154         switch (ev.operation)
155         {
156         case IocpOperation.accept:
157             ev.watcher.onRead();
158             break;
159         case IocpOperation.connect:
160             onSocketRead(ev.watcher, 0);
161             break;
162         case IocpOperation.read:
163             onSocketRead(ev.watcher, bytes);
164             break;
165         case IocpOperation.write:
166             onSocketWrite(ev.watcher, bytes);
167             break;
168         case IocpOperation.event:
169             ev.watcher.onRead();
170             break;
171         case IocpOperation.close:
172             warning("close: ");
173             break;
174         default:
175             warning("unsupported operation type: ", ev.operation);
176             break;
177         }
178     }
179 
180     override void stop()
181     {
182         _runing = false;
183         weakUp();
184     }
185 
186     void handleTimer()
187     {
188 
189     }
190 
191     void dispose()
192     {
193 
194     }
195 
196     private void onSocketRead(AbstractChannel wt, size_t len)
197     {
198         AbstractSocketChannel io = cast(AbstractSocketChannel) wt;
199         assert(io !is null, "The type of channel is: " ~ to!string(typeid(wt)));
200         if (io is null)
201         {
202             warning("The channel socket is null: ", typeid(wt));
203             return;
204         }
205         io.setRead(len);
206         wt.onRead();
207     }
208 
209     private void onSocketWrite(AbstractChannel wt, size_t len)
210     {
211         AbstractStream client = cast(AbstractStream) wt;
212         assert(client !is null, "The type of channel is: " ~ to!string(typeid(wt)));
213 
214         client.onWriteDone(len); // Notify the client about how many bytes actually sent.
215     }
216 
217 private:
218     bool _runing;
219     HANDLE _iocpHandle;
220     EventChannel _event;
221     CustomTimer _timer;
222 }