diff --git a/vlib/net/http/server.v b/vlib/net/http/server.v index 89e92566c5..d115cc13d2 100644 --- a/vlib/net/http/server.v +++ b/vlib/net/http/server.v @@ -6,11 +6,12 @@ module http import io import net import time - +import runtime // ServerStatus is the current status of the server. // .running means that the server is active and serving. // .stopped means that the server is not active but still listening. // .closed means that the server is completely inactive. + pub enum ServerStatus { running stopped @@ -27,11 +28,13 @@ mut: state ServerStatus = .closed listener net.TcpListener pub mut: - port int = 8080 - handler Handler = DebugHandler{} - read_timeout time.Duration = 30 * time.second - write_timeout time.Duration = 30 * time.second - accept_timeout time.Duration = 30 * time.second + port int = 8080 + handler Handler = DebugHandler{} + read_timeout time.Duration = 30 * time.second + write_timeout time.Duration = 30 * time.second + accept_timeout time.Duration = 30 * time.second + pool_channel_slots int = 1024 + worker_num int = runtime.nr_jobs() } // listen_and_serve listens on the server port `s.port` over TCP network and @@ -45,6 +48,16 @@ pub fn (mut s Server) listen_and_serve() { return } s.listener.set_accept_timeout(s.accept_timeout) + + // Create tcp connection channel + ch := chan &net.TcpConn{cap: s.pool_channel_slots} + + // Create workers + mut ws := []thread{cap: s.worker_num} + for wid in 0 .. s.worker_num { + ws << new_handler_worker(wid, ch, s.handler) + } + eprintln('Listening on :${s.port}') s.state = .running for { @@ -62,8 +75,7 @@ pub fn (mut s Server) listen_and_serve() { } conn.set_read_timeout(s.read_timeout) conn.set_write_timeout(s.write_timeout) - // TODO: make concurrent - s.parse_and_respond(mut conn) + ch <- conn } if s.state == .stopped { s.close() @@ -89,7 +101,30 @@ pub fn (s &Server) status() ServerStatus { return s.state } -fn (mut s Server) parse_and_respond(mut conn net.TcpConn) { +struct HandlerWorker { + id int + ch chan &net.TcpConn +pub mut: + handler Handler +} + +fn new_handler_worker(wid int, ch chan &net.TcpConn, handler Handler) thread { + mut w := &HandlerWorker{ + id: wid + ch: ch + handler: handler + } + return spawn w.process_requests() +} + +fn (mut w HandlerWorker) process_requests() { + for { + mut conn := <-w.ch or { break } + w.handle_conn(mut conn) + } +} + +fn (mut w HandlerWorker) handle_conn(mut conn net.TcpConn) { defer { conn.close() or { eprintln('close() failed: ${err}') } } @@ -111,10 +146,16 @@ fn (mut s Server) parse_and_respond(mut conn net.TcpConn) { remote_ip := conn.peer_ip() or { '' } req.header.add_custom('Remote-Addr', remote_ip) or {} - mut resp := s.handler.handle(req) + mut resp := w.handler.handle(req) if resp.version() == .unknown { resp.set_version(req.version) } + + // Implemented by developers? + if !resp.header.contains(.content_length) { + resp.header.set(.content_length, '${resp.body.len}') + } + conn.write(resp.bytes()) or { eprintln('error sending response: ${err}') } }