1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.selector.iocp; 13 14 // dfmt off 15 version (Windows) : 16 // dfmt on 17 18 import kiss.event.socket; 19 20 import kiss.event.core; 21 import kiss.event.socket.iocp; 22 import kiss.event.timer; 23 24 import core.sys.windows.windows; 25 import std.conv; 26 import kiss.logger; 27 28 /** 29 */ 30 class AbstractSelector : Selector 31 { 32 this() 33 { 34 _iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 35 _event = new EventChannel(this); 36 _timer.init(); 37 } 38 39 ~this() 40 { 41 // .close(_iocpHandle); 42 } 43 44 override bool register(AbstractChannel watcher) 45 { 46 assert(watcher !is null); 47 48 if (watcher.type == WatcherType.Timer) 49 { 50 AbstractTimer wt = cast(AbstractTimer) watcher; 51 assert(wt !is null); 52 if (wt is null || !wt.setTimerOut()) 53 return false; 54 _timer.timeWheel().addNewTimer(wt.timer, wt.wheelSize()); 55 } 56 else if (watcher.type == WatcherType.TCP 57 || watcher.type == WatcherType.Accept || watcher.type == WatcherType.UDP) 58 { 59 version (KissDebugMode) 60 trace("Run CreateIoCompletionPort on socket: ", watcher.handle); 61 CreateIoCompletionPort(cast(HANDLE) watcher.handle, _iocpHandle, 62 cast(size_t)(cast(void*) watcher), 0); 63 } 64 65 version (KissDebugMode) 66 infof("register, watcher(fd=%d, type=%s)", watcher.handle, watcher.type); 67 _event.setNext(watcher); 68 return true; 69 } 70 71 override bool reregister(AbstractChannel watcher) 72 { 73 throw new LoopException("The IOCP does not support reregister!"); 74 } 75 76 override bool deregister(AbstractChannel watcher) 77 { 78 79 // IocpContext _data; 80 // _data.watcher = watcher; 81 // _data.operation = IocpOperation.close; 82 // PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 83 84 return true; 85 } 86 87 void weakUp() 88 { 89 IocpContext _data; 90 _data.watcher = _event; 91 _data.operation = IocpOperation.event; 92 93 PostQueuedCompletionStatus(_iocpHandle, 0, 0, &_data.overlapped); 94 } 95 96 void onLoop(scope void delegate() handler) 97 { 98 _runing = true; 99 _timer.init(); 100 do 101 { 102 handler(); 103 handleSocketEvent(); 104 } 105 while (_runing); 106 } 107 108 private void handleSocketEvent() 109 { 110 auto timeout = _timer.doWheel(); 111 OVERLAPPED* overlapped; 112 ULONG_PTR key = 0; 113 DWORD bytes = 0; 114 115 debug 116 { 117 // const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 118 // &key, &overlapped, INFINITE); 119 // tracef("GetQueuedCompletionStatus, ret=%d", ret); 120 121 // trace("timeout=", timeout); 122 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 123 &key, &overlapped, timeout); 124 } 125 else 126 { 127 const int ret = GetQueuedCompletionStatus(_iocpHandle, &bytes, 128 &key, &overlapped, timeout); 129 } 130 131 if (ret == 0) 132 { 133 const auto erro = GetLastError(); 134 if (erro == WAIT_TIMEOUT) // || erro == ERROR_OPERATION_ABORTED 135 return; 136 137 error("error occurred, code=", erro); 138 auto ev = cast(IocpContext*) overlapped; 139 if (ev && ev.watcher) 140 ev.watcher.close(); 141 return; 142 } 143 144 auto ev = cast(IocpContext*) overlapped; 145 if (ev is null || ev.watcher is null) 146 { 147 warning("ev is null: ", ev is null); 148 return; 149 } 150 151 version (KissDebugMode) 152 trace("ev.operation: ", ev.operation); 153 154 switch (ev.operation) 155 { 156 case IocpOperation.accept: 157 ev.watcher.onRead(); 158 break; 159 case IocpOperation.connect: 160 onSocketRead(ev.watcher, 0); 161 break; 162 case IocpOperation.read: 163 onSocketRead(ev.watcher, bytes); 164 break; 165 case IocpOperation.write: 166 onSocketWrite(ev.watcher, bytes); 167 break; 168 case IocpOperation.event: 169 ev.watcher.onRead(); 170 break; 171 case IocpOperation.close: 172 warning("close: "); 173 break; 174 default: 175 warning("unsupported operation type: ", ev.operation); 176 break; 177 } 178 } 179 180 override void stop() 181 { 182 _runing = false; 183 weakUp(); 184 } 185 186 void handleTimer() 187 { 188 189 } 190 191 void dispose() 192 { 193 194 } 195 196 private void onSocketRead(AbstractChannel wt, size_t len) 197 { 198 AbstractSocketChannel io = cast(AbstractSocketChannel) wt; 199 assert(io !is null, "The type of channel is: " ~ to!string(typeid(wt))); 200 if (io is null) 201 { 202 warning("The channel socket is null: ", typeid(wt)); 203 return; 204 } 205 io.setRead(len); 206 wt.onRead(); 207 } 208 209 private void onSocketWrite(AbstractChannel wt, size_t len) 210 { 211 AbstractStream client = cast(AbstractStream) wt; 212 assert(client !is null, "The type of channel is: " ~ to!string(typeid(wt))); 213 214 client.onWriteDone(len); // Notify the client about how many bytes actually sent. 215 } 216 217 private: 218 bool _runing; 219 HANDLE _iocpHandle; 220 EventChannel _event; 221 CustomTimer _timer; 222 }