1 /* 2 * Kiss - A refined core library for D programming language. 3 * 4 * Copyright (C) 2015-2018 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: HuntLabs.cn 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module kiss.net.TcpListener; 13 14 import kiss.event; 15 import kiss.net.core; 16 17 import std.socket; 18 import std.exception; 19 import kiss.logger; 20 import core.thread; 21 import core.time; 22 23 import kiss.core; 24 import kiss.net.TcpStream; 25 26 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream); 27 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize); 28 29 /** 30 */ 31 class TcpListener : AbstractListener 32 { 33 private bool isSslEnabled = false; 34 private size_t _bufferSize = 4 * 1024; 35 protected EventHandler _shutdownHandler; 36 37 /// event handlers 38 AcceptEventHandler acceptHandler; 39 SimpleEventHandler closeHandler; 40 PeerCreateHandler peerCreateHandler; 41 42 this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) 43 { 44 _bufferSize = bufferSize; 45 version (Windows) 46 super(loop, family, bufferSize); 47 else 48 super(loop, family); 49 } 50 51 TcpListener onConnectionAccepted(AcceptEventHandler handler) 52 { 53 acceptHandler = handler; 54 return this; 55 } 56 57 TcpListener onPeerCreating(PeerCreateHandler handler) 58 { 59 peerCreateHandler = handler; 60 return this; 61 } 62 63 TcpListener onShutdown(EventHandler handler) 64 { 65 _shutdownHandler = handler; 66 return this; 67 } 68 69 TcpListener bind(string ip, ushort port) 70 { 71 bind(parseAddress(ip, port)); 72 return this; 73 } 74 75 TcpListener bind(ushort port) 76 { 77 bind(createAddress(this.socket, port)); 78 return this; 79 } 80 81 TcpListener bind(Address addr) 82 { 83 this.socket.bind(addr); 84 _localAddress = _socket.localAddress(); 85 return this; 86 } 87 88 Address bindingAddress() 89 { 90 return _localAddress; 91 } 92 93 TcpListener reusePort(bool use) 94 { 95 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, use); 96 97 version (Posix) 98 { 99 import core.sys.posix.sys.socket; 100 101 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, use); 102 } 103 else version (windows) 104 { 105 import core.sys.windows.winsock2; 106 107 if (!use) 108 this.socket.setOption(SocketOptionLevel.SOCKET, 109 cast(SocketOption) SO_EXCLUSIVEADDRUSE, true); 110 } 111 112 return this; 113 } 114 115 TcpListener listen(int backlog) 116 { 117 this.socket.listen(backlog); 118 return this; 119 } 120 121 override void start() 122 { 123 _inLoop.register(this); 124 _isRegistered = true; 125 version (Windows) 126 this.doAccept(); 127 } 128 129 override void close() 130 { 131 if (closeHandler !is null) 132 closeHandler(); 133 else if (_shutdownHandler !is null) 134 _shutdownHandler(this, null); 135 this.onClose(); 136 } 137 138 protected override void onRead() 139 { 140 bool canRead = true; 141 version (KissDebugMode) 142 trace("start to listen"); 143 // while(canRead && this.isRegistered) // why?? 144 { 145 version (KissDebugMode) 146 trace("listening..."); 147 canRead = onAccept((Socket socket) { 148 149 version (KissDebugMode) 150 infof("new connection from %s, fd=%d", 151 socket.remoteAddress.toString(), socket.handle()); 152 153 if (acceptHandler !is null) 154 { 155 TcpStream stream; 156 if (peerCreateHandler is null) 157 stream = new TcpStream(_inLoop, socket, _bufferSize); 158 else 159 stream = peerCreateHandler(this, socket, _bufferSize); 160 161 acceptHandler(this, stream); 162 stream.start(); 163 } 164 }); 165 166 if (this.isError) 167 { 168 canRead = false; 169 error("listener error: ", this.erroString); 170 this.close(); 171 } 172 } 173 } 174 }