1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.event.socket.common; 13 14 import kiss.core; 15 import kiss.event.EventLoop; 16 import kiss.event.core; 17 import kiss.exception; 18 import kiss.container.ByteBuffer; 19 20 import std.socket; 21 import kiss.logger; 22 23 24 alias ConnectionHandler = void delegate(bool isSucceeded); 25 26 // dfmt off 27 alias UDPReadCallBack = void delegate(in ubyte[] data, Address addr); 28 alias AcceptCallBack = void delegate(Selector loop, Socket socket) ; 29 // dfmt on 30 31 alias SocketChannelBase = AbstractSocketChannel; 32 // alias AcceptorBase = AbstractListener; 33 // alias StreamSocketBase = AbstractStream; 34 // alias DatagramSocketBase = AbstractDatagramSocket; 35 36 /** 37 */ 38 interface IAcceptor 39 { 40 void onClose(); 41 void onRead(); 42 } 43 44 /** 45 */ 46 interface Stream 47 { 48 49 } 50 51 // alias IStreamSocket = Stream; 52 53 // dfmt off 54 mixin template ChannelSocketOption() { 55 import std.functional; 56 import std.datetime; 57 import core.stdc.stdint; 58 import std.socket; 59 60 version (Windows) import SOCKETOPTIONS = core.sys.windows.winsock2; 61 62 version (Posix) import SOCKETOPTIONS = core.sys.posix.sys.socket; 63 64 /// Get a socket option. 65 /// Returns: The number of bytes written to $(D result). 66 //returns the length, in bytes, of the actual result - very different from getsockopt() 67 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, 68 void[] result) @trusted { 69 70 return this.socket.getOption(level, option, result); 71 } 72 73 /// Common case of getting integer and boolean options. 74 pragma(inline) final int getOption(SocketOptionLevel level, 75 SocketOption option, ref int32_t result) @trusted { 76 return this.socket.getOption(level, option, result); 77 } 78 79 /// Get the linger option. 80 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, 81 ref Linger result) @trusted { 82 return this.socket.getOption(level, option, result); 83 } 84 85 /// Get a timeout (duration) option. 86 pragma(inline) final void getOption(SocketOptionLevel level, 87 SocketOption option, ref Duration result) @trusted { 88 this.socket.getOption(level, option, result); 89 } 90 91 /// Set a socket option. 92 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, 93 void[] value) @trusted { 94 return this.socket.setOption(forward!(level, option, value)); 95 } 96 97 /// Common case for setting integer and boolean options. 98 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, 99 int32_t value) @trusted { 100 return this.socket.setOption(forward!(level, option, value)); 101 } 102 103 /// Set the linger option. 104 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, 105 Linger value) @trusted { 106 return this.socket.setOption(forward!(level, option, value)); 107 } 108 109 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, 110 Duration value) @trusted { 111 return this.socket.setOption(forward!(level, option, value)); 112 } 113 114 final @property @trusted Address remoteAddress() { 115 return _remoteAddress; 116 } 117 protected Address _remoteAddress; 118 119 final @property @trusted Socket socket(){ 120 return this.socket; 121 } 122 123 final @property @trusted Address localAddress() { 124 return _localAddress; 125 } 126 protected Address _localAddress; 127 } 128 // dfmt on 129 130 /** 131 */ 132 abstract class AbstractSocketChannel : AbstractChannel 133 { 134 protected AddressFamily _family; 135 136 this(Selector loop, WatcherType type) 137 { 138 super(loop, type); 139 } 140 141 protected @property void socket(Socket s) 142 { 143 this.handle = s.handle(); 144 this._family = s.addressFamily; 145 // _localAddress = s.localAddress(); 146 version (Posix) 147 s.blocking = false; 148 _socket = s; 149 version (KissDebugMode) 150 trace("new socket fd: ", this.handle); 151 } 152 153 protected @property Socket socket() 154 { 155 return _socket; 156 } 157 158 mixin ChannelSocketOption; 159 160 version (Windows) 161 { 162 163 void setRead(size_t bytes) 164 { 165 readLen = bytes; 166 } 167 168 protected size_t readLen; 169 } 170 171 void start(); 172 173 void onWriteDone() 174 { 175 assert(false, "not implemented"); 176 } 177 178 protected: 179 Socket _socket; 180 } 181 182 /** 183 */ 184 interface IDatagramSocket 185 { 186 187 } 188 189 /** 190 */ 191 class SocketStreamBuffer : StreamWriteBuffer 192 { 193 194 this(const(ubyte)[] data, DataWrittenHandler handler = null) 195 { 196 _data = data; 197 _site = 0; 198 _sentHandler = handler; 199 } 200 201 const(ubyte)[] sendData() 202 { 203 return _data[_site .. $]; 204 } 205 206 // add send offiset and return is empty 207 bool popSize(size_t size) 208 { 209 _site += size; 210 if (_site >= _data.length) 211 return true; 212 else 213 return false; 214 } 215 // do send finish 216 void doFinish() 217 { 218 if (_sentHandler) 219 { 220 _sentHandler(_data, _site); 221 } 222 _sentHandler = null; 223 _data = null; 224 } 225 226 StreamWriteBuffer next() 227 { 228 return _next; 229 } 230 231 void next(StreamWriteBuffer v) 232 { 233 _next = v; 234 } 235 236 private: 237 StreamWriteBuffer _next; 238 size_t _site = 0; 239 const(ubyte)[] _data; 240 DataWrittenHandler _sentHandler; 241 } 242 243 /** 244 */ 245 struct WriteBufferQueue 246 { 247 StreamWriteBuffer front() nothrow @safe 248 { 249 return _first; 250 } 251 252 bool empty() nothrow @safe 253 { 254 return _first is null; 255 } 256 257 void clear() 258 { 259 StreamWriteBuffer current = _first; 260 while (current !is null) 261 { 262 _first = current.next; 263 current.next = null; 264 current = _first; 265 } 266 267 _first = null; 268 _last = null; 269 } 270 271 void enQueue(StreamWriteBuffer wsite) 272 { 273 assert(wsite); 274 if (_last) 275 { 276 _last.next = wsite; 277 } 278 else 279 { 280 _first = wsite; 281 } 282 wsite.next = null; 283 _last = wsite; 284 } 285 286 StreamWriteBuffer deQueue() 287 { 288 // assert(_first && _last); 289 StreamWriteBuffer wsite = _first; 290 if (_first !is null) 291 _first = _first.next; 292 293 if (_first is null) 294 _last = null; 295 296 return wsite; 297 } 298 299 private: 300 StreamWriteBuffer _last = null; 301 StreamWriteBuffer _first = null; 302 }