1 /*
2  * Kiss - A refined core library for D programming language.
3  *
4  * Copyright (C) 2015-2018  Shanghai Putao Technology Co., Ltd
5  *
6  * Developer: HuntLabs.cn
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module kiss.net.TcpListener;
13 
14 import kiss.event;
15 import kiss.net.core;
16 
17 import std.socket;
18 import std.exception;
19 import kiss.logger;
20 import core.thread;
21 import core.time;
22 
23 import kiss.core;
24 import kiss.net.TcpStream;
25 
26 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream);
27 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize);
28 
29 /**
30 */
31 class TcpListener : AbstractListener
32 {
33     private bool isSslEnabled = false;
34     private size_t _bufferSize = 4 * 1024;
35     protected EventHandler _shutdownHandler;
36 
37     /// event handlers
38     AcceptEventHandler acceptHandler;
39     SimpleEventHandler closeHandler;
40     PeerCreateHandler peerCreateHandler;
41 
42     this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024)
43     {
44         _bufferSize = bufferSize;
45         version (Windows)
46             super(loop, family, bufferSize);
47         else
48             super(loop, family);
49     }
50 
51     TcpListener onConnectionAccepted(AcceptEventHandler handler)
52     {
53         acceptHandler = handler;
54         return this;
55     }
56 
57     TcpListener onPeerCreating(PeerCreateHandler handler)
58     {
59         peerCreateHandler = handler;
60         return this;
61     }
62 
63     TcpListener onShutdown(EventHandler handler)
64     {
65         _shutdownHandler = handler;
66         return this;
67     }
68 
69     TcpListener bind(string ip, ushort port)
70     {
71         bind(parseAddress(ip, port));
72         return this;
73     }
74 
75     TcpListener bind(ushort port)
76     {
77         bind(createAddress(this.socket, port));
78         return this;
79     }
80 
81     TcpListener bind(Address addr)
82     {
83         this.socket.bind(addr);
84         _localAddress = _socket.localAddress();
85         return this;
86     }
87 
88     Address bindingAddress()
89     {
90         return _localAddress;
91     }
92 
93     TcpListener reusePort(bool use)
94     {
95         this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, use);
96 
97         version (Posix)
98         {
99             import core.sys.posix.sys.socket;
100 
101             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, use);
102         }
103         else version (windows)
104         {
105             import core.sys.windows.winsock2;
106 
107             if (!use)
108                 this.socket.setOption(SocketOptionLevel.SOCKET,
109                         cast(SocketOption) SO_EXCLUSIVEADDRUSE, true);
110         }
111 
112         return this;
113     }
114 
115     TcpListener listen(int backlog)
116     {
117         this.socket.listen(backlog);
118         return this;
119     }
120 
121     override void start()
122     {
123         _inLoop.register(this);
124         _isRegistered = true;
125         version (Windows)
126             this.doAccept();
127     }
128 
129     override void close()
130     {
131         if (closeHandler !is null)
132             closeHandler();
133         else if (_shutdownHandler !is null)
134             _shutdownHandler(this, null);
135         this.onClose();
136     }
137 
138     protected override void onRead()
139     {
140         bool canRead = true;
141         version (KissDebugMode)
142             trace("start to listen");
143         // while(canRead && this.isRegistered) // why??
144         {
145             version (KissDebugMode)
146                 trace("listening...");
147             canRead = onAccept((Socket socket) {
148 
149                 version (KissDebugMode)
150                     infof("new connection from %s, fd=%d",
151                         socket.remoteAddress.toString(), socket.handle());
152 
153                 if (acceptHandler !is null)
154                 {
155                     TcpStream stream;
156                     if (peerCreateHandler is null)
157                         stream = new TcpStream(_inLoop, socket, _bufferSize);
158                     else
159                         stream = peerCreateHandler(this, socket, _bufferSize);
160 
161                     acceptHandler(this, stream);
162                     stream.start();
163                 }
164             });
165 
166             if (this.isError)
167             {
168                 canRead = false;
169                 error("listener error: ", this.erroString);
170                 this.close();
171             }
172         }
173     }
174 }