1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.selector.iocp; 13 14 // dfmt off 15 version (Windows) : 16 // dfmt on 17 18 import kiss.event.socket; 19 20 import kiss.event.core; 21 import kiss.event.socket.iocp; 22 import kiss.event.timer; 23 24 import core.sys.windows.windows; 25 import std.conv; 26 import kiss.logger; 27 28 /** 29 */ 30 class AbstractSelector : Selector 31 { 32 this() 33 { 34 _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 35 _event = new EventChannel(this); 36 _timer.init(); 37 } 38 39 ~this() 40 { 41 // .close(_iocpHandle); 42 } 43 44 override bool register(AbstractChannel watcher) 45 { 46 assert(watcher !is null); 47 48 if (watcher.type == WatcherType.Timer) 49 { 50 AbstractTimer wt = cast(AbstractTimer) watcher; 51 assert(wt !is null); 52 if (wt is null || !wt.setTimerOut()) 53 return false; 54 _timer.timeWheel().addNewTimer(wt.timer, wt.wheelSize()); 55 } 56 else if (watcher.type == WatcherType.TCP 57 || watcher.type == WatcherType.Accept 58 || watcher.type == WatcherType.UDP) 59 { 60 version (KissDebugMode) 61 trace("Run CreateIoCompletionPort on socket: ", watcher.handle); 62 CreateIoCompletionPort(cast(HANDLE) watcher.handle, _iocpHandle, 63 cast(size_t)(cast(void*) watcher), 0); 64 } 65 66 version (KissDebugMode) 67 infof("register, watcher(fd=%d, type=%s)", watcher.handle, watcher.type); 68 _event.setNext(watcher); 69 return true; 70 } 71 72 override bool reregister(AbstractChannel watcher) 73 { 74 throw new LoopException("The IOCP does not support reregister!"); 75 } 76 77 override bool deregister(AbstractChannel watcher) 78 { 79 // FIXME: Needing refactor or cleanup -@Administrator at 8/28/2018, 3:28:18 PM 80 // https://stackoverflow.com/questions/6573218/removing-a-handle-from-a-i-o-completion-port-and-other-questions-about-iocp 81 //tracef("deregister (fd=%d)", watcher.handle); 82 83 // IocpContext _data; 84 // _data.watcher = watcher; 85 // _data.operation = IocpOperation.close; 86 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 87 88 return true; 89 } 90 91 void weakUp() 92 { 93 IocpContext _data; 94 _data.watcher = _event; 95 _data.operation = IocpOperation.event; 96 97 PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 98 } 99 100 void onLoop(scope void delegate() handler) 101 { 102 _runing = true; 103 _timer.init(); 104 do 105 { 106 handler(); 107 handleSocketEvent(); 108 } 109 while (_runing); 110 } 111 112 private void handleSocketEvent() 113 { 114 auto timeout = _timer.doWheel(); 115 OVERLAPPED* overlapped; 116 ULONG_PTR key = 0; 117 DWORD bytes = 0; 118 119 debug 120 { 121 // const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 122 // &key, &overlapped, INFINITE); 123 // tracef("GetQueuedCompletionStatus, ret=%d", ret); 124 125 // trace("timeout=", timeout); 126 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 127 &key, &overlapped, timeout); 128 } 129 else 130 { 131 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 132 &key, &overlapped, timeout); 133 } 134 135 IocpContext* ev = cast(IocpContext*) overlapped; 136 if (ret == 0) 137 { 138 const auto erro = GetLastError(); 139 if (erro == WAIT_TIMEOUT) // || erro == ERROR_OPERATION_ABORTED 140 return; 141 142 error("error occurred, code=", erro); 143 if (ev !is null) { 144 AbstractChannel channel = ev.watcher; 145 if(channel !is null && !channel.isClosed()) 146 channel.close(); 147 } 148 return; 149 } 150 151 if (ev is null || ev.watcher is null) 152 warning("ev is null or ev.watche is null"); 153 else 154 handleIocpOperation(ev.operation, ev.watcher, bytes); 155 } 156 157 private void handleIocpOperation(IocpOperation op, AbstractChannel channel, DWORD bytes) { 158 159 version (KissDebugMode) 160 trace("ev.operation: ", op); 161 162 switch (op) 163 { 164 case IocpOperation.accept: 165 channel.onRead(); 166 break; 167 case IocpOperation.connect: 168 onSocketRead(channel, 0); 169 break; 170 case IocpOperation.read: 171 onSocketRead(channel, bytes); 172 break; 173 case IocpOperation.write: 174 onSocketWrite(channel, bytes); 175 break; 176 case IocpOperation.event: 177 channel.onRead(); 178 break; 179 case IocpOperation.close: 180 warning("close: ", ); 181 break; 182 default: 183 warning("unsupported operation type: ", op); 184 break; 185 } 186 } 187 188 override void stop() 189 { 190 _runing = false; 191 weakUp(); 192 } 193 194 void handleTimer() 195 { 196 197 } 198 199 void dispose() 200 { 201 202 } 203 204 private void onSocketRead(AbstractChannel wt, size_t len) 205 { 206 debug if(wt is null) { 207 warning("channel is null"); 208 return; 209 } 210 211 if(len == 0 || wt.isClosed) { 212 version (KissDebugMode) info("channel closed"); 213 return; 214 } 215 216 AbstractSocketChannel io = cast(AbstractSocketChannel) wt; 217 // assert(io !is null, "The type of channel is: " ~ typeid(wt).name); 218 if (io is null) { 219 warning("The channel socket is null: "); 220 return; 221 } 222 io.setRead(len); 223 wt.onRead(); 224 } 225 226 private void onSocketWrite(AbstractChannel wt, size_t len) 227 { 228 debug if(wt is null) { 229 warning("channel is null"); 230 return; 231 } 232 AbstractStream client = cast(AbstractStream) wt; 233 // assert(client !is null, "The type of channel is: " ~ typeid(wt).name); 234 if (client is null) { 235 warning("The channel socket is null: "); 236 return; 237 } 238 client.onWriteDone(len); // Notify the client about how many bytes actually sent. 239 } 240 241 private: 242 bool _runing; 243 HANDLE _iocpHandle; 244 EventChannel _event; 245 CustomTimer _timer; 246 }