1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

vweb: implement worker pool (#17298)

This commit is contained in:
Josh Montoya 2023-04-01 16:24:33 -07:00 committed by GitHub
parent 1471ba4678
commit 51ad565ed6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 177 additions and 62 deletions

View File

@ -3,9 +3,12 @@ module main
import vweb import vweb
import rand import rand
const ( const port = 8082
port = 8082
) struct State {
mut:
cnt int
}
struct App { struct App {
vweb.Context vweb.Context
@ -13,18 +16,10 @@ mut:
state shared State state shared State
} }
struct State { pub fn (app &App) before_request() {
mut: $if trace_before_request ? {
cnt int eprintln('[vweb] before_request: ${app.req.method} ${app.req.url}')
} }
pub fn (app App) before_request() {
println('[vweb] before_request: ${app.req.method} ${app.req.url}')
}
fn main() {
println('vweb example')
vweb.run(&App{}, port)
} }
['/users/:user'] ['/users/:user']
@ -36,11 +31,17 @@ pub fn (mut app App) user_endpoint(user string) vweb.Result {
} }
pub fn (mut app App) index() vweb.Result { pub fn (mut app App) index() vweb.Result {
mut c := 0
lock app.state { lock app.state {
app.state.cnt++ app.state.cnt++
c = app.state.cnt
//
$if trace_address_of_app_state_cnt ? {
dump(ptr_str(app.state.cnt))
}
} }
show := true show := true
hello := 'Hello world from vweb' hello := 'Hello world from vweb, request number: ${c}'
numbers := [1, 2, 3] numbers := [1, 2, 3]
return $vweb.html() return $vweb.html()
} }
@ -58,3 +59,8 @@ pub fn (mut app App) cookie() vweb.Result {
pub fn (mut app App) post() vweb.Result { pub fn (mut app App) post() vweb.Result {
return app.text('Post body: ${app.req.data}') return app.text('Post body: ${app.req.data}')
} }
fn main() {
println('vweb example')
vweb.run(&App{}, port)
}

View File

@ -14,6 +14,7 @@ pub struct TcpConn {
pub mut: pub mut:
sock TcpSocket sock TcpSocket
mut: mut:
handle int
write_deadline time.Time write_deadline time.Time
read_deadline time.Time read_deadline time.Time
read_timeout time.Duration read_timeout time.Duration
@ -242,6 +243,15 @@ pub fn (mut c TcpConn) wait_for_write() ! {
return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout) return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout)
} }
// set_sock initialises the c.sock field. It should be called after `.accept_only()!`.
// Note: just use `.accept()!`. In most cases it is simpler, and calls `.set_sock()!` for you.
pub fn (mut c TcpConn) set_sock() ! {
c.sock = tcp_socket_from_handle(c.handle)!
$if trace_tcp ? {
eprintln(' TcpListener.accept | << new_sock.handle: ${c.handle:6}')
}
}
pub fn (c &TcpConn) peer_addr() !Addr { pub fn (c &TcpConn) peer_addr() !Addr {
mut addr := Addr{ mut addr := Addr{
addr: AddrData{ addr: AddrData{
@ -295,10 +305,32 @@ pub fn listen_tcp(family AddrFamily, saddr string) !&TcpListener {
} }
} }
// accept a tcp connection from an external source to the listener `l`.
pub fn (mut l TcpListener) accept() !&TcpConn { pub fn (mut l TcpListener) accept() !&TcpConn {
mut res := l.accept_only()!
res.set_sock()!
return res
}
// accept_only accepts a tcp connection from an external source to the listener `l`.
// Unlike `accept`, `accept_only` *will not call* `.set_sock()!` on the result,
// and is thus faster.
//
// Note: you *need* to call `.set_sock()!` manually, before using the
// connection after calling `.accept_only()!`, but that does not have to happen
// in the same thread that called `.accept_only()!`.
// The intention of this API, is to have a more efficient way to accept
// connections, that are later processed by a thread pool, while the main
// thread remains active, so that it can accept other connections.
// See also vlib/vweb/vweb.v .
//
// If you do not need that, just call `.accept()!` instead, which will call
// `.set_sock()!` for you.
pub fn (mut l TcpListener) accept_only() !&TcpConn {
$if trace_tcp ? { $if trace_tcp ? {
eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}') eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}')
} }
mut new_handle := C.accept(l.sock.handle, 0, 0) mut new_handle := C.accept(l.sock.handle, 0, 0)
if new_handle <= 0 { if new_handle <= 0 {
l.wait_for_accept()! l.wait_for_accept()!
@ -307,12 +339,9 @@ pub fn (mut l TcpListener) accept() !&TcpConn {
return error('accept failed') return error('accept failed')
} }
} }
new_sock := tcp_socket_from_handle(new_handle)!
$if trace_tcp ? {
eprintln(' TcpListener.accept | << new_sock.handle: ${new_sock.handle:6}')
}
return &TcpConn{ return &TcpConn{
sock: new_sock handle: new_handle
read_timeout: net.tcp_default_read_timeout read_timeout: net.tcp_default_read_timeout
write_timeout: net.tcp_default_write_timeout write_timeout: net.tcp_default_write_timeout
} }

View File

@ -5,6 +5,7 @@ module vweb
import os import os
import io import io
import runtime
import net import net
import net.http import net.http
import net.urllib import net.urllib
@ -263,7 +264,7 @@ pub fn (mut ctx Context) file(f_path string) Result {
} }
content_type := vweb.mime_types[ext] content_type := vweb.mime_types[ext]
if content_type.len == 0 { if content_type.len == 0 {
eprintln('no MIME type found for extension ${ext}') eprintln('[vweb] no MIME type found for extension ${ext}')
ctx.server_error(500) ctx.server_error(500)
} else { } else {
ctx.send_response_to_client(content_type, data) ctx.send_response_to_client(content_type, data)
@ -318,7 +319,7 @@ pub fn (mut ctx Context) not_found() Result {
pub fn (mut ctx Context) set_cookie(cookie http.Cookie) { pub fn (mut ctx Context) set_cookie(cookie http.Cookie) {
cookie_raw := cookie.str() cookie_raw := cookie.str()
if cookie_raw == '' { if cookie_raw == '' {
eprintln('error setting cookie: name of cookie is invalid') eprintln('[vweb] error setting cookie: name of cookie is invalid')
return return
} }
ctx.add_header('Set-Cookie', cookie_raw) ctx.add_header('Set-Cookie', cookie_raw)
@ -387,9 +388,11 @@ pub fn run[T](global_app &T, port int) {
[params] [params]
pub struct RunParams { pub struct RunParams {
host string
port int = 8080
family net.AddrFamily = .ip6 // use `family: .ip, host: 'localhost'` when you want it to bind only to 127.0.0.1 family net.AddrFamily = .ip6 // use `family: .ip, host: 'localhost'` when you want it to bind only to 127.0.0.1
host string
port int = 8080
nr_workers int = runtime.nr_jobs()
pool_channel_slots int = 1000
show_startup_message bool = true show_startup_message bool = true
} }
@ -400,59 +403,96 @@ pub fn run_at[T](global_app &T, params RunParams) ! {
if params.port <= 0 || params.port > 65535 { if params.port <= 0 || params.port > 65535 {
return error('invalid port number `${params.port}`, it should be between 1 and 65535') return error('invalid port number `${params.port}`, it should be between 1 and 65535')
} }
if params.pool_channel_slots < 1 {
return error('invalid pool_channel_slots `${params.pool_channel_slots}`, it should be above 0, preferably higher than 10 x nr_workers')
}
if params.nr_workers < 1 {
return error('invalid nr_workers `${params.nr_workers}`, it should be above 0')
}
mut l := net.listen_tcp(params.family, '${params.host}:${params.port}') or { mut l := net.listen_tcp(params.family, '${params.host}:${params.port}') or {
ecode := err.code() ecode := err.code()
return error('failed to listen ${ecode} ${err}') return error('failed to listen ${ecode} ${err}')
} }
// Parsing methods attributes host := if params.host == '' { 'localhost' } else { params.host }
if params.show_startup_message {
println('[Vweb] Running app on http://${host}:${params.port}/')
}
ch := chan &RequestParams{cap: params.pool_channel_slots}
mut ws := []thread{cap: params.nr_workers}
for worker_number in 0 .. params.nr_workers {
ws << new_worker[T](ch, worker_number)
}
if params.show_startup_message {
println('[Vweb] We have ${ws.len} workers')
}
flush_stdout()
// Parse the attributes of vweb app methods:
mut routes := map[string]Route{} mut routes := map[string]Route{}
$for method in T.methods { $for method in T.methods {
http_methods, route_path, middleware := parse_attrs(method.name, method.attrs) or { http_methods, route_path, middleware := parse_attrs(method.name, method.attrs) or {
return error('error parsing method attributes: ${err}') return error('error parsing method attributes: ${err}')
} }
routes[method.name] = Route{ routes[method.name] = Route{
methods: http_methods methods: http_methods
path: route_path path: route_path
middleware: middleware middleware: middleware
} }
} }
host := if params.host == '' { 'localhost' } else { params.host } // Forever accept every connection that comes, and
if params.show_startup_message { // pass it through the channel, to the thread pool:
println('[Vweb] Running app on http://${host}:${params.port}/')
}
flush_stdout()
for { for {
// Create a new app object for each connection, copy global data like db connections mut connection := l.accept_only() or {
mut request_app := &T{}
$if T is MiddlewareInterface {
request_app = &T{
middlewares: global_app.middlewares.clone()
}
}
$if T is DbInterface {
request_app.db = global_app.db
} $else {
// println('vweb no db')
}
$for field in T.fields {
if 'vweb_global' in field.attrs || field.is_shared {
request_app.$(field.name) = global_app.$(field.name)
}
}
request_app.Context = global_app.Context // copy the context ref that contains static files map etc
mut conn := l.accept() or {
// failures should not panic // failures should not panic
eprintln('accept() failed with error: ${err.msg()}') eprintln('[vweb] accept() failed with error: ${err.msg()}')
continue continue
} }
spawn handle_conn[T](mut conn, mut request_app, routes) ch <- &RequestParams{
connection: connection
global_app: unsafe { global_app }
routes: &routes
}
} }
} }
fn new_request_app[T](global_app &T) &T {
// Create a new app object for each connection, copy global data like db connections
mut request_app := &T{}
$if T is MiddlewareInterface {
request_app = &T{
middlewares: global_app.middlewares.clone()
}
}
$if T is DbInterface {
request_app.db = global_app.db
}
$for field in T.fields {
if field.is_shared {
unsafe {
// TODO: remove this horrible hack, when copying a shared field at comptime works properly!!!
raptr := &voidptr(&request_app.$(field.name))
gaptr := &voidptr(&global_app.$(field.name))
*raptr = *gaptr
_ = raptr // TODO: v produces a warning that `raptr` is unused otherwise, even though it was on the previous line
}
} else {
if 'vweb_global' in field.attrs {
request_app.$(field.name) = global_app.$(field.name)
}
}
}
request_app.Context = global_app.Context // copy the context ref that contains static files map etc
return request_app
}
[manualfree] [manualfree]
fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { fn handle_conn[T](mut conn net.TcpConn, global_app &T, routes &map[string]Route, tid int) {
// Create a new app object for each connection, copy global data like db connections
mut app := new_request_app[T](global_app)
conn.set_read_timeout(30 * time.second) conn.set_read_timeout(30 * time.second)
conn.set_write_timeout(30 * time.second) conn.set_write_timeout(30 * time.second)
defer { defer {
@ -462,6 +502,11 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) {
} }
} }
conn.set_sock() or {
eprintln('[vweb] tid: ${tid:03d}, error setting socket')
return
}
mut reader := io.new_buffered_reader(reader: conn) mut reader := io.new_buffered_reader(reader: conn)
defer { defer {
unsafe { unsafe {
@ -475,7 +520,7 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) {
req := http.parse_request(mut reader) or { req := http.parse_request(mut reader) or {
// Prevents errors from being thrown when BufferedReader is empty // Prevents errors from being thrown when BufferedReader is empty
if '${err}' != 'none' { if '${err}' != 'none' {
eprintln('error parsing request: ${err}') eprintln('[vweb] tid: ${tid:03d}, error parsing request: ${err}')
} }
return return
} }
@ -487,7 +532,7 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) {
} }
// URL Parse // URL Parse
url := urllib.parse(req.url) or { url := urllib.parse(req.url) or {
eprintln('error parsing path: ${err}') eprintln('[vweb] tid: ${tid:03d}, error parsing path: ${err}')
return return
} }
@ -538,8 +583,8 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) {
// Route matching // Route matching
$for method in T.methods { $for method in T.methods {
$if method.return_type is Result { $if method.return_type is Result {
route := routes[method.name] or { route := (*routes)[method.name] or {
eprintln('parsed attributes for the `${method.name}` are not found, skipping...') eprintln('[vweb] tid: ${tid:03d}, parsed attributes for the `${method.name}` are not found, skipping...')
Route{} Route{}
} }
@ -598,7 +643,7 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) {
if params := route_matches(url_words, route_words) { if params := route_matches(url_words, route_words) {
method_args := params.clone() method_args := params.clone()
if method_args.len != method.args.len { if method_args.len != method.args.len {
eprintln('warning: uneven parameters count (${method.args.len}) in `${method.name}`, compared to the vweb route `${method.attrs}` (${method_args.len})') eprintln('[vweb] tid: ${tid:03d}, warning: uneven parameters count (${method.args.len}) in `${method.name}`, compared to the vweb route `${method.attrs}` (${method_args.len})')
} }
$if T is MiddlewareInterface { $if T is MiddlewareInterface {
@ -641,7 +686,7 @@ fn validate_middleware[T](mut app T, full_path string) bool {
fn validate_app_middleware[T](mut app T, middleware string, method_name string) bool { fn validate_app_middleware[T](mut app T, middleware string, method_name string) bool {
// then the middleware that is defined for this route specifically // then the middleware that is defined for this route specifically
valid := fire_app_middleware(mut app, middleware) or { valid := fire_app_middleware(mut app, middleware) or {
eprintln('warning: middleware `${middleware}` for the `${method_name}` are not found') eprintln('[vweb] warning: middleware `${middleware}` for the `${method_name}` are not found')
true true
} }
return valid return valid
@ -654,7 +699,7 @@ fn fire_app_middleware[T](mut app T, method_name string) ?bool {
$if method.return_type is bool { $if method.return_type is bool {
return app.$method() return app.$method()
} $else { } $else {
eprintln('error in `${method.name}, middleware functions must return bool') eprintln('[vweb] error in `${method.name}, middleware functions must return bool')
return none return none
} }
} }
@ -809,7 +854,7 @@ pub fn (ctx &Context) ip() string {
// Set s to the form error // Set s to the form error
pub fn (mut ctx Context) error(s string) { pub fn (mut ctx Context) error(s string) {
eprintln('vweb error: ${s}') eprintln('[vweb] Context.error: ${s}')
ctx.form_error = s ctx.form_error = s
} }
@ -837,3 +882,38 @@ fn send_string(mut conn net.TcpConn, s string) ! {
fn filter(s string) string { fn filter(s string) string {
return html.escape(s) return html.escape(s)
} }
// Worker functions for the thread pool:
struct RequestParams {
global_app voidptr
routes &map[string]Route
mut:
connection &net.TcpConn
}
struct Worker[T] {
id int
ch chan &RequestParams
}
fn new_worker[T](ch chan &RequestParams, id int) thread {
mut w := &Worker[T]{
id: id
ch: ch
}
return spawn w.process_incomming_requests[T]()
}
fn (mut w Worker[T]) process_incomming_requests() {
sid := '[vweb] tid: ${w.id:03d} received request'
for {
mut params := <-w.ch or { break }
$if vweb_trace_worker_scan ? {
eprintln(sid)
}
handle_conn[T](mut params.connection, params.global_app, params.routes, w.id)
}
$if vweb_trace_worker_scan ? {
eprintln('[vweb] closing worker ${w.id}.')
}
}