1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

net.websocket: make thread safe/concurrent (#18179)

This commit is contained in:
kbkpbot 2023-05-18 17:27:00 +08:00 committed by GitHub
parent 1e88b1ab3e
commit c8d2098a14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 109 additions and 73 deletions

View File

@ -22,7 +22,7 @@ fn start_server() ! {
} }
} }
// Make that in execution test time give time to execute at least one time // Make that in execution test time give time to execute at least one time
s.ping_interval = 100 s.set_ping_interval(100)
s.on_connect(fn (mut s websocket.ServerClient) !bool { s.on_connect(fn (mut s websocket.ServerClient) !bool {
slog('s.on_connect') slog('s.on_connect')
// Here you can look att the client info and accept or not accept // Here you can look att the client info and accept or not accept
@ -37,9 +37,13 @@ fn start_server() ! {
s.on_message_ref(fn (mut ws websocket.Client, msg &websocket.Message, mut m websocket.Server) ! { s.on_message_ref(fn (mut ws websocket.Client, msg &websocket.Message, mut m websocket.Server) ! {
slog('s.on_message_ref') slog('s.on_message_ref')
// for _, cli in m.clients { // for _, cli in m.clients {
for i, _ in m.clients { for i, _ in rlock m.server_state {
mut c := m.clients[i] or { continue } m.server_state.clients
if c.client.state == .open && c.client.id != ws.id { } {
mut c := rlock m.server_state {
m.server_state.clients[i] or { continue }
}
if c.client.get_state() == .open && c.client.id != ws.id {
c.client.write(msg.payload, websocket.OPCode.text_frame) or { panic(err) } c.client.write(msg.payload, websocket.OPCode.text_frame) or { panic(err) }
} }
} }

View File

@ -32,7 +32,7 @@ fn start_server() ! {
} }
} }
// Make that in execution test time give time to execute at least one time // Make that in execution test time give time to execute at least one time
s.ping_interval = 100 s.set_ping_interval(100)
s.on_connect(fn (mut s websocket.ServerClient) !bool { s.on_connect(fn (mut s websocket.ServerClient) !bool {
slog('ws.on_connect, s.client_key: ${s.client_key}') slog('ws.on_connect, s.client_key: ${s.client_key}')
// Here you can look att the client info and accept or not accept // Here you can look att the client info and accept or not accept

View File

@ -5,7 +5,7 @@ import net
// socket_read reads from socket into the provided buffer // socket_read reads from socket into the provided buffer
fn (mut ws Client) socket_read(mut buffer []u8) !int { fn (mut ws Client) socket_read(mut buffer []u8) !int {
lock { lock {
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { if ws.get_state() in [.closed, .closing] || ws.conn.sock.handle <= 1 {
return error('socket_read: trying to read a closed socket') return error('socket_read: trying to read a closed socket')
} }
if ws.is_ssl { if ws.is_ssl {
@ -22,7 +22,7 @@ fn (mut ws Client) socket_read(mut buffer []u8) !int {
// socket_read reads from socket into the provided byte pointer and length // socket_read reads from socket into the provided byte pointer and length
fn (mut ws Client) socket_read_ptr(buf_ptr &u8, len int) !int { fn (mut ws Client) socket_read_ptr(buf_ptr &u8, len int) !int {
lock { lock {
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { if ws.get_state() in [.closed, .closing] || ws.conn.sock.handle <= 1 {
return error('socket_read_ptr: trying to read a closed socket') return error('socket_read_ptr: trying to read a closed socket')
} }
if ws.is_ssl { if ws.is_ssl {
@ -39,7 +39,7 @@ fn (mut ws Client) socket_read_ptr(buf_ptr &u8, len int) !int {
// socket_write writes the provided byte array to the socket // socket_write writes the provided byte array to the socket
fn (mut ws Client) socket_write(bytes []u8) !int { fn (mut ws Client) socket_write(bytes []u8) !int {
lock { lock {
if ws.state == .closed || ws.conn.sock.handle <= 1 { if ws.get_state() == .closed || ws.conn.sock.handle <= 1 {
ws.debug_log('socket_write: Socket already closed') ws.debug_log('socket_write: Socket already closed')
return error('socket_write: trying to write on a closed socket') return error('socket_write: trying to write on a closed socket')
} }

View File

@ -214,7 +214,7 @@ pub fn (mut ws Client) parse_frame_header() !Frame {
mut frame := Frame{} mut frame := Frame{}
mut rbuff := [1]u8{} mut rbuff := [1]u8{}
mut mask_end_byte := 0 mut mask_end_byte := 0
for ws.state == .open { for ws.get_state() == .open {
read_bytes := ws.socket_read_ptr(&rbuff[0], 1)! read_bytes := ws.socket_read_ptr(&rbuff[0], 1)!
if read_bytes == 0 { if read_bytes == 0 {
// this is probably a timeout or close // this is probably a timeout or close

View File

@ -3,27 +3,18 @@ module websocket
import rand import rand
import crypto.sha1 import crypto.sha1
import encoding.base64 import encoding.base64
import encoding.binary
// htonl64 converts payload length to header bits // htonl64 converts payload length to header bits
fn htonl64(payload_len u64) []u8 { fn htonl64(payload_len u64) []u8 {
mut ret := []u8{len: 8} mut ret := []u8{len: 8}
ret[0] = u8(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff) binary.big_endian_put_u64(mut ret, payload_len)
ret[1] = u8(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff)
ret[2] = u8(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff)
ret[3] = u8(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff)
ret[4] = u8(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff)
ret[5] = u8(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff)
ret[6] = u8(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff)
ret[7] = u8(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff)
return ret return ret
} }
// create_masking_key returns a new masking key to use when masking websocket messages // create_masking_key returns a new masking key to use when masking websocket messages
fn create_masking_key() []u8 { fn create_masking_key() []u8 {
mask_bit := rand.u8() return rand.bytes(4) or { [0, 0, 0, 0] }
buf := []u8{len: 4, init: `0`}
unsafe { C.memcpy(buf.data, &mask_bit, 4) }
return buf
} }
// create_key_challenge_response creates a key challenge response from security key // create_key_challenge_response creates a key challenge response from security key
@ -45,10 +36,6 @@ fn create_key_challenge_response(seckey string) !string {
// get_nonce creates a randomized array used in handshake process // get_nonce creates a randomized array used in handshake process
fn get_nonce(nonce_size int) string { fn get_nonce(nonce_size int) string {
mut nonce := []u8{len: nonce_size, cap: nonce_size} return rand.string_from_set('0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz',
alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz' nonce_size)
for i in 0 .. nonce_size {
nonce[i] = alphanum[rand.intn(alphanum.len) or { 0 }]
}
return unsafe { tos(nonce.data, nonce.len) }.clone()
} }

View File

@ -15,6 +15,11 @@ const (
empty_bytearr = []u8{} // used as empty response to avoid allocation empty_bytearr = []u8{} // used as empty response to avoid allocation
) )
pub struct ClientState {
mut:
state State = .closed // current state of connection
}
// Client represents websocket client // Client represents websocket client
pub struct Client { pub struct Client {
is_server bool is_server bool
@ -36,8 +41,8 @@ pub mut:
header http.Header // headers that will be passed when connecting header http.Header // headers that will be passed when connecting
conn &net.TcpConn = unsafe { nil } // underlying TCP socket connection conn &net.TcpConn = unsafe { nil } // underlying TCP socket connection
nonce_size int = 16 // size of nounce used for masking nonce_size int = 16 // size of nounce used for masking
panic_on_callback bool // set to true of callbacks can panic panic_on_callback bool // set to true of callbacks can panic
state State // current state of connection client_state shared ClientState // current state of connection
// logger used to log messages // logger used to log messages
logger &log.Logger = &log.Logger(&log.Log{ logger &log.Logger = &log.Logger(&log.Log{
level: .info level: .info
@ -97,7 +102,9 @@ pub fn new_client(address string, opt ClientOpt) !&Client {
is_ssl: address.starts_with('wss') is_ssl: address.starts_with('wss')
logger: opt.logger logger: opt.logger
uri: uri uri: uri
state: .closed client_state: ClientState{
state: .closed
}
id: rand.uuid_v4() id: rand.uuid_v4()
header: http.new_header() header: http.new_header()
read_timeout: opt.read_timeout read_timeout: opt.read_timeout
@ -124,20 +131,20 @@ pub fn (mut ws Client) listen() ! {
unsafe { log_msg.free() } unsafe { log_msg.free() }
defer { defer {
ws.logger.info('Quit client listener, server(${ws.is_server})...') ws.logger.info('Quit client listener, server(${ws.is_server})...')
if ws.state == .open { if ws.get_state() == .open {
ws.close(1000, 'closed by client') or {} ws.close(1000, 'closed by client') or {}
} }
} }
for ws.state == .open { for ws.get_state() == .open {
msg := ws.read_next_message() or { msg := ws.read_next_message() or {
if ws.state in [.closed, .closing] { if ws.get_state() in [.closed, .closing] {
return return
} }
ws.debug_log('failed to read next message: ${err}') ws.debug_log('failed to read next message: ${err}')
ws.send_error_event('failed to read next message: ${err}') ws.send_error_event('failed to read next message: ${err}')
return err return err
} }
if ws.state in [.closed, .closing] { if ws.get_state() in [.closed, .closing] {
return return
} }
ws.debug_log('got message: ${msg.opcode}') ws.debug_log('got message: ${msg.opcode}')
@ -197,7 +204,7 @@ pub fn (mut ws Client) listen() ! {
if reason.len > 0 { if reason.len > 0 {
ws.validate_utf_8(.close, reason)! ws.validate_utf_8(.close, reason)!
} }
if ws.state !in [.closing, .closed] { if ws.get_state() !in [.closing, .closed] {
// sending close back according to spec // sending close back according to spec
ws.debug_log('close with reason, code: ${code}, reason: ${reason}') ws.debug_log('close with reason, code: ${code}, reason: ${reason}')
r := reason.bytestr() r := reason.bytestr()
@ -205,7 +212,7 @@ pub fn (mut ws Client) listen() ! {
} }
unsafe { msg.free() } unsafe { msg.free() }
} else { } else {
if ws.state !in [.closing, .closed] { if ws.get_state() !in [.closing, .closed] {
ws.debug_log('close with reason, no code') ws.debug_log('close with reason, no code')
// sending close back according to spec // sending close back according to spec
ws.close(1000, 'normal')! ws.close(1000, 'normal')!
@ -242,7 +249,7 @@ pub fn (mut ws Client) pong() ! {
// write_ptr writes len bytes provided a byteptr with a websocket messagetype // write_ptr writes len bytes provided a byteptr with a websocket messagetype
pub fn (mut ws Client) write_ptr(bytes &u8, payload_len int, code OPCode) !int { pub fn (mut ws Client) write_ptr(bytes &u8, payload_len int, code OPCode) !int {
// ws.debug_log('write_ptr code: $code') // ws.debug_log('write_ptr code: $code')
if ws.state != .open || ws.conn.sock.handle < 1 { if ws.get_state() != .open || ws.conn.sock.handle < 1 {
// todo: send error here later // todo: send error here later
return error('trying to write on a closed socket!') return error('trying to write on a closed socket!')
} }
@ -329,8 +336,9 @@ pub fn (mut ws Client) write_string(str string) !int {
// close closes the websocket connection // close closes the websocket connection
pub fn (mut ws Client) close(code int, message string) ! { pub fn (mut ws Client) close(code int, message string) ! {
ws.debug_log('sending close, ${code}, ${message}') ws.debug_log('sending close, ${code}, ${message}')
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { ws_state := ws.get_state()
ws.debug_log('close: Websocket already closed (${ws.state}), ${message}, ${code} handle(${ws.conn.sock.handle})') if ws_state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
ws.debug_log('close: Websocket already closed (${ws_state}), ${message}, ${code} handle(${ws.conn.sock.handle})')
err_msg := 'Socket already closed: ${code}' err_msg := 'Socket already closed: ${code}'
return error(err_msg) return error(err_msg)
} }
@ -362,7 +370,7 @@ pub fn (mut ws Client) close(code int, message string) ! {
// send_control_frame sends a control frame to the server // send_control_frame sends a control frame to the server
fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []u8) ! { fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []u8) ! {
ws.debug_log('send control frame ${code}, frame_type: ${frame_typ}') ws.debug_log('send control frame ${code}, frame_type: ${frame_typ}')
if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 { if ws.get_state() !in [.open, .closing] && ws.conn.sock.handle > 1 {
return error('socket is not connected') return error('socket is not connected')
} }
header_len := if ws.is_server { 2 } else { 6 } header_len := if ws.is_server { 2 } else { 6 }
@ -446,15 +454,22 @@ fn parse_uri(url string) !&Uri {
} }
// set_state sets current state of the websocket connection // set_state sets current state of the websocket connection
fn (mut ws Client) set_state(state State) { pub fn (mut ws Client) set_state(state State) {
lock { lock ws.client_state {
ws.state = state ws.client_state.state = state
}
}
// get_state return the current state of the websocket connection
pub fn (ws Client) get_state() State {
return rlock ws.client_state {
ws.client_state.state
} }
} }
// assert_not_connected returns error if the connection is not connected // assert_not_connected returns error if the connection is not connected
fn (ws Client) assert_not_connected() ! { fn (ws Client) assert_not_connected() ! {
match ws.state { match ws.get_state() {
.connecting { return error('connect: websocket is connecting') } .connecting { return error('connect: websocket is connecting') }
.open { return error('connect: websocket already open') } .open { return error('connect: websocket already open') }
.closing { return error('connect: reconnect on closing websocket not supported, please use new client') } .closing { return error('connect: reconnect on closing websocket not supported, please use new client') }
@ -463,9 +478,9 @@ fn (ws Client) assert_not_connected() ! {
} }
// reset_state resets the websocket and initialize default settings // reset_state resets the websocket and initialize default settings
fn (mut ws Client) reset_state() ! { pub fn (mut ws Client) reset_state() ! {
lock { lock ws.client_state {
ws.state = .closed ws.client_state.state = .closed
ws.ssl_conn = ssl.new_ssl_conn()! ws.ssl_conn = ssl.new_ssl_conn()!
ws.flags = [] ws.flags = []
ws.fragments = [] ws.fragments = []

View File

@ -6,6 +6,14 @@ import log
import time import time
import rand import rand
pub struct ServerState {
mut:
ping_interval int = 30 // interval for sending ping to clients (seconds)
state State = .closed // current state of connection
pub mut:
clients map[string]&ServerClient // clients connected to this server
}
// Server represents a websocket server connection // Server represents a websocket server connection
pub struct Server { pub struct Server {
mut: mut:
@ -21,9 +29,7 @@ pub:
port int // port used as listen to incoming connections port int // port used as listen to incoming connections
is_ssl bool // true if secure connection (not supported yet on server) is_ssl bool // true if secure connection (not supported yet on server)
pub mut: pub mut:
clients map[string]&ServerClient // clients connected to this server server_state shared ServerState
ping_interval int = 30 // interval for sending ping to clients (seconds)
state State // current state of connection
} }
// ServerClient represents a connected client // ServerClient represents a connected client
@ -50,13 +56,21 @@ pub fn new_server(family net.AddrFamily, port int, route string, opt ServerOpt)
family: family family: family
port: port port: port
logger: opt.logger logger: opt.logger
state: .closed
} }
} }
// set_ping_interval sets the interval that the server will send ping messages to clients // set_ping_interval sets the interval that the server will send ping messages to clients
pub fn (mut s Server) set_ping_interval(seconds int) { pub fn (mut s Server) set_ping_interval(seconds int) {
s.ping_interval = seconds lock s.server_state {
s.server_state.ping_interval = seconds
}
}
// get_ping_interval return the interval that the server will send ping messages to clients
pub fn (mut s Server) get_ping_interval() int {
return rlock s.server_state {
s.server_state.ping_interval
}
} }
// listen start listen and process to incoming connections from websocket clients // listen start listen and process to incoming connections from websocket clients
@ -80,11 +94,15 @@ fn (mut s Server) close() {
// handle_ping sends ping to all clients every set interval // handle_ping sends ping to all clients every set interval
fn (mut s Server) handle_ping() { fn (mut s Server) handle_ping() {
mut clients_to_remove := []string{} mut clients_to_remove := []string{}
for s.state == .open { for s.get_state() == .open {
time.sleep(s.ping_interval * time.second) time.sleep(s.get_ping_interval() * time.second)
for i, _ in s.clients { for i, _ in rlock s.server_state {
mut c := s.clients[i] or { continue } s.server_state.clients
if c.client.state == .open { } {
mut c := rlock s.server_state {
s.server_state.clients[i] or { continue }
}
if c.client.get_state() == .open {
c.client.ping() or { c.client.ping() or {
s.logger.debug('server-> error sending ping to client') s.logger.debug('server-> error sending ping to client')
c.client.close(1002, 'Closing connection: ping send error') or { c.client.close(1002, 'Closing connection: ping send error') or {
@ -93,7 +111,7 @@ fn (mut s Server) handle_ping() {
} }
clients_to_remove << c.client.id clients_to_remove << c.client.id
} }
if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 { if (time.now().unix - c.client.last_pong_ut) > s.get_ping_interval() * 2 {
clients_to_remove << c.client.id clients_to_remove << c.client.id
c.client.close(1000, 'no pong received') or { continue } c.client.close(1000, 'no pong received') or { continue }
} }
@ -101,8 +119,8 @@ fn (mut s Server) handle_ping() {
} }
// TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges // TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges
for client in clients_to_remove { for client in clients_to_remove {
lock { lock s.server_state {
s.clients.delete(client) s.server_state.clients.delete(client)
} }
} }
clients_to_remove.clear() clients_to_remove.clear()
@ -124,10 +142,8 @@ fn (mut s Server) serve_client(mut c Client) ! {
} }
// the client is accepted // the client is accepted
c.socket_write(handshake_response.bytes())! c.socket_write(handshake_response.bytes())!
lock { lock s.server_state {
unsafe { s.server_state.clients[server_client.client.id] = server_client
s.clients[server_client.client.id] = server_client
}
} }
s.setup_callbacks(mut server_client) s.setup_callbacks(mut server_client)
c.listen() or { c.listen() or {
@ -159,8 +175,8 @@ fn (mut s Server) setup_callbacks(mut sc ServerClient) {
// set standard close so we can remove client if closed // set standard close so we can remove client if closed
sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ! { sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ! {
c.logger.debug('server-> Delete client') c.logger.debug('server-> Delete client')
lock { lock sc.server.server_state {
sc.server.clients.delete(sc.client.id) sc.server.server_state.clients.delete(sc.client.id)
} }
}, sc) }, sc)
} }
@ -173,7 +189,9 @@ fn (mut s Server) accept_new_client() !&Client {
conn: new_conn conn: new_conn
ssl_conn: ssl.new_ssl_conn()! ssl_conn: ssl.new_ssl_conn()!
logger: s.logger logger: s.logger
state: .open client_state: ClientState{
state: .open
}
last_pong_ut: time.now().unix last_pong_ut: time.now().unix
id: rand.uuid_v4() id: rand.uuid_v4()
} }
@ -181,16 +199,28 @@ fn (mut s Server) accept_new_client() !&Client {
} }
// set_state sets current state in a thread safe way // set_state sets current state in a thread safe way
fn (mut s Server) set_state(state State) { pub fn (mut s Server) set_state(state State) {
lock { lock s.server_state {
s.state = state s.server_state.state = state
}
}
// get_state return current state in a thread safe way
pub fn (s Server) get_state() State {
return rlock s.server_state {
s.server_state.state
} }
} }
// free manages manual free of memory for Server instance // free manages manual free of memory for Server instance
pub fn (mut s Server) free() { pub fn (mut s Server) free() {
lock s.server_state {
unsafe {
s.server_state.clients.free()
}
}
unsafe { unsafe {
s.clients.free()
s.accept_client_callbacks.free() s.accept_client_callbacks.free()
s.message_callbacks.free() s.message_callbacks.free()
s.close_callbacks.free() s.close_callbacks.free()

View File

@ -50,7 +50,7 @@ fn start_server(family net.AddrFamily, listen_port int) ! {
eprintln('> start_server family:${family} | listen_port: ${listen_port}') eprintln('> start_server family:${family} | listen_port: ${listen_port}')
mut s := websocket.new_server(family, listen_port, '') mut s := websocket.new_server(family, listen_port, '')
// make that in execution test time give time to execute at least one time // make that in execution test time give time to execute at least one time
s.ping_interval = 1 s.set_ping_interval(1)
s.on_connect(fn (mut s websocket.ServerClient) !bool { s.on_connect(fn (mut s websocket.ServerClient) !bool {
// here you can look att the client info and accept or not accept // here you can look att the client info and accept or not accept
@ -80,7 +80,7 @@ fn start_server_in_thread_and_wait_till_it_is_ready_to_accept_connections(mut ws
spawn fn [mut ws] () { spawn fn [mut ws] () {
ws.listen() or { panic('websocket server could not listen, err: ${err}') } ws.listen() or { panic('websocket server could not listen, err: ${err}') }
}() }()
for ws.state != .open { for ws.get_state() != .open {
time.sleep(10 * time.millisecond) time.sleep(10 * time.millisecond)
} }
eprintln('-----------------------------------------------------------------------------') eprintln('-----------------------------------------------------------------------------')