1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

http: add workers to improve the server's concurrent capacity. (#18271)

This commit is contained in:
xiusin 2023-05-27 06:57:32 +08:00 committed by GitHub
parent f9efbdff10
commit 43bc85d3bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,11 +6,12 @@ module http
import io import io
import net import net
import time import time
import runtime
// ServerStatus is the current status of the server. // ServerStatus is the current status of the server.
// .running means that the server is active and serving. // .running means that the server is active and serving.
// .stopped means that the server is not active but still listening. // .stopped means that the server is not active but still listening.
// .closed means that the server is completely inactive. // .closed means that the server is completely inactive.
pub enum ServerStatus { pub enum ServerStatus {
running running
stopped stopped
@ -32,6 +33,8 @@ pub mut:
read_timeout time.Duration = 30 * time.second read_timeout time.Duration = 30 * time.second
write_timeout time.Duration = 30 * time.second write_timeout time.Duration = 30 * time.second
accept_timeout time.Duration = 30 * time.second accept_timeout time.Duration = 30 * time.second
pool_channel_slots int = 1024
worker_num int = runtime.nr_jobs()
} }
// listen_and_serve listens on the server port `s.port` over TCP network and // listen_and_serve listens on the server port `s.port` over TCP network and
@ -45,6 +48,16 @@ pub fn (mut s Server) listen_and_serve() {
return return
} }
s.listener.set_accept_timeout(s.accept_timeout) s.listener.set_accept_timeout(s.accept_timeout)
// Create tcp connection channel
ch := chan &net.TcpConn{cap: s.pool_channel_slots}
// Create workers
mut ws := []thread{cap: s.worker_num}
for wid in 0 .. s.worker_num {
ws << new_handler_worker(wid, ch, s.handler)
}
eprintln('Listening on :${s.port}') eprintln('Listening on :${s.port}')
s.state = .running s.state = .running
for { for {
@ -62,8 +75,7 @@ pub fn (mut s Server) listen_and_serve() {
} }
conn.set_read_timeout(s.read_timeout) conn.set_read_timeout(s.read_timeout)
conn.set_write_timeout(s.write_timeout) conn.set_write_timeout(s.write_timeout)
// TODO: make concurrent ch <- conn
s.parse_and_respond(mut conn)
} }
if s.state == .stopped { if s.state == .stopped {
s.close() s.close()
@ -89,7 +101,30 @@ pub fn (s &Server) status() ServerStatus {
return s.state return s.state
} }
fn (mut s Server) parse_and_respond(mut conn net.TcpConn) { struct HandlerWorker {
id int
ch chan &net.TcpConn
pub mut:
handler Handler
}
fn new_handler_worker(wid int, ch chan &net.TcpConn, handler Handler) thread {
mut w := &HandlerWorker{
id: wid
ch: ch
handler: handler
}
return spawn w.process_requests()
}
fn (mut w HandlerWorker) process_requests() {
for {
mut conn := <-w.ch or { break }
w.handle_conn(mut conn)
}
}
fn (mut w HandlerWorker) handle_conn(mut conn net.TcpConn) {
defer { defer {
conn.close() or { eprintln('close() failed: ${err}') } conn.close() or { eprintln('close() failed: ${err}') }
} }
@ -111,10 +146,16 @@ fn (mut s Server) parse_and_respond(mut conn net.TcpConn) {
remote_ip := conn.peer_ip() or { '' } remote_ip := conn.peer_ip() or { '' }
req.header.add_custom('Remote-Addr', remote_ip) or {} req.header.add_custom('Remote-Addr', remote_ip) or {}
mut resp := s.handler.handle(req) mut resp := w.handler.handle(req)
if resp.version() == .unknown { if resp.version() == .unknown {
resp.set_version(req.version) resp.set_version(req.version)
} }
// Implemented by developers?
if !resp.header.contains(.content_length) {
resp.header.set(.content_length, '${resp.body.len}')
}
conn.write(resp.bytes()) or { eprintln('error sending response: ${err}') } conn.write(resp.bytes()) or { eprintln('error sending response: ${err}') }
} }