diff --git a/examples/websocket/client-server/server.v b/examples/websocket/client-server/server.v index 3d6e0ad6d5..4bf0fb2dbe 100644 --- a/examples/websocket/client-server/server.v +++ b/examples/websocket/client-server/server.v @@ -22,7 +22,7 @@ fn start_server() ! { } } // Make that in execution test time give time to execute at least one time - s.ping_interval = 100 + s.set_ping_interval(100) s.on_connect(fn (mut s websocket.ServerClient) !bool { slog('s.on_connect') // Here you can look att the client info and accept or not accept @@ -37,9 +37,13 @@ fn start_server() ! { s.on_message_ref(fn (mut ws websocket.Client, msg &websocket.Message, mut m websocket.Server) ! { slog('s.on_message_ref') // for _, cli in m.clients { - for i, _ in m.clients { - mut c := m.clients[i] or { continue } - if c.client.state == .open && c.client.id != ws.id { + for i, _ in rlock m.server_state { + m.server_state.clients + } { + mut c := rlock m.server_state { + m.server_state.clients[i] or { continue } + } + if c.client.get_state() == .open && c.client.id != ws.id { c.client.write(msg.payload, websocket.OPCode.text_frame) or { panic(err) } } } diff --git a/examples/websocket/ping.v b/examples/websocket/ping.v index 19871e2e72..138b3ee3a6 100644 --- a/examples/websocket/ping.v +++ b/examples/websocket/ping.v @@ -32,7 +32,7 @@ fn start_server() ! { } } // Make that in execution test time give time to execute at least one time - s.ping_interval = 100 + s.set_ping_interval(100) s.on_connect(fn (mut s websocket.ServerClient) !bool { slog('ws.on_connect, s.client_key: ${s.client_key}') // Here you can look att the client info and accept or not accept diff --git a/vlib/net/websocket/io.v b/vlib/net/websocket/io.v index ff9c46fffe..a9e35b37d4 100644 --- a/vlib/net/websocket/io.v +++ b/vlib/net/websocket/io.v @@ -5,7 +5,7 @@ import net // socket_read reads from socket into the provided buffer fn (mut ws Client) socket_read(mut buffer []u8) !int { lock { - if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + if ws.get_state() in [.closed, .closing] || ws.conn.sock.handle <= 1 { return error('socket_read: trying to read a closed socket') } if ws.is_ssl { @@ -22,7 +22,7 @@ fn (mut ws Client) socket_read(mut buffer []u8) !int { // socket_read reads from socket into the provided byte pointer and length fn (mut ws Client) socket_read_ptr(buf_ptr &u8, len int) !int { lock { - if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + if ws.get_state() in [.closed, .closing] || ws.conn.sock.handle <= 1 { return error('socket_read_ptr: trying to read a closed socket') } if ws.is_ssl { @@ -39,7 +39,7 @@ fn (mut ws Client) socket_read_ptr(buf_ptr &u8, len int) !int { // socket_write writes the provided byte array to the socket fn (mut ws Client) socket_write(bytes []u8) !int { lock { - if ws.state == .closed || ws.conn.sock.handle <= 1 { + if ws.get_state() == .closed || ws.conn.sock.handle <= 1 { ws.debug_log('socket_write: Socket already closed') return error('socket_write: trying to write on a closed socket') } diff --git a/vlib/net/websocket/message.v b/vlib/net/websocket/message.v index 6c34c05363..e8832264b7 100644 --- a/vlib/net/websocket/message.v +++ b/vlib/net/websocket/message.v @@ -214,7 +214,7 @@ pub fn (mut ws Client) parse_frame_header() !Frame { mut frame := Frame{} mut rbuff := [1]u8{} mut mask_end_byte := 0 - for ws.state == .open { + for ws.get_state() == .open { read_bytes := ws.socket_read_ptr(&rbuff[0], 1)! if read_bytes == 0 { // this is probably a timeout or close diff --git a/vlib/net/websocket/utils.v b/vlib/net/websocket/utils.v index 3181c1d26a..27218f2eb2 100644 --- a/vlib/net/websocket/utils.v +++ b/vlib/net/websocket/utils.v @@ -3,27 +3,18 @@ module websocket import rand import crypto.sha1 import encoding.base64 +import encoding.binary // htonl64 converts payload length to header bits fn htonl64(payload_len u64) []u8 { mut ret := []u8{len: 8} - ret[0] = u8(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff) - ret[1] = u8(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff) - ret[2] = u8(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff) - ret[3] = u8(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff) - ret[4] = u8(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff) - ret[5] = u8(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff) - ret[6] = u8(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff) - ret[7] = u8(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff) + binary.big_endian_put_u64(mut ret, payload_len) return ret } // create_masking_key returns a new masking key to use when masking websocket messages fn create_masking_key() []u8 { - mask_bit := rand.u8() - buf := []u8{len: 4, init: `0`} - unsafe { C.memcpy(buf.data, &mask_bit, 4) } - return buf + return rand.bytes(4) or { [0, 0, 0, 0] } } // create_key_challenge_response creates a key challenge response from security key @@ -45,10 +36,6 @@ fn create_key_challenge_response(seckey string) !string { // get_nonce creates a randomized array used in handshake process fn get_nonce(nonce_size int) string { - mut nonce := []u8{len: nonce_size, cap: nonce_size} - alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz' - for i in 0 .. nonce_size { - nonce[i] = alphanum[rand.intn(alphanum.len) or { 0 }] - } - return unsafe { tos(nonce.data, nonce.len) }.clone() + return rand.string_from_set('0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz', + nonce_size) } diff --git a/vlib/net/websocket/websocket_client.v b/vlib/net/websocket/websocket_client.v index 436e80d6f3..05b98fdda8 100644 --- a/vlib/net/websocket/websocket_client.v +++ b/vlib/net/websocket/websocket_client.v @@ -15,6 +15,11 @@ const ( empty_bytearr = []u8{} // used as empty response to avoid allocation ) +pub struct ClientState { +mut: + state State = .closed // current state of connection +} + // Client represents websocket client pub struct Client { is_server bool @@ -36,8 +41,8 @@ pub mut: header http.Header // headers that will be passed when connecting conn &net.TcpConn = unsafe { nil } // underlying TCP socket connection nonce_size int = 16 // size of nounce used for masking - panic_on_callback bool // set to true of callbacks can panic - state State // current state of connection + panic_on_callback bool // set to true of callbacks can panic + client_state shared ClientState // current state of connection // logger used to log messages logger &log.Logger = &log.Logger(&log.Log{ level: .info @@ -97,7 +102,9 @@ pub fn new_client(address string, opt ClientOpt) !&Client { is_ssl: address.starts_with('wss') logger: opt.logger uri: uri - state: .closed + client_state: ClientState{ + state: .closed + } id: rand.uuid_v4() header: http.new_header() read_timeout: opt.read_timeout @@ -124,20 +131,20 @@ pub fn (mut ws Client) listen() ! { unsafe { log_msg.free() } defer { ws.logger.info('Quit client listener, server(${ws.is_server})...') - if ws.state == .open { + if ws.get_state() == .open { ws.close(1000, 'closed by client') or {} } } - for ws.state == .open { + for ws.get_state() == .open { msg := ws.read_next_message() or { - if ws.state in [.closed, .closing] { + if ws.get_state() in [.closed, .closing] { return } ws.debug_log('failed to read next message: ${err}') ws.send_error_event('failed to read next message: ${err}') return err } - if ws.state in [.closed, .closing] { + if ws.get_state() in [.closed, .closing] { return } ws.debug_log('got message: ${msg.opcode}') @@ -197,7 +204,7 @@ pub fn (mut ws Client) listen() ! { if reason.len > 0 { ws.validate_utf_8(.close, reason)! } - if ws.state !in [.closing, .closed] { + if ws.get_state() !in [.closing, .closed] { // sending close back according to spec ws.debug_log('close with reason, code: ${code}, reason: ${reason}') r := reason.bytestr() @@ -205,7 +212,7 @@ pub fn (mut ws Client) listen() ! { } unsafe { msg.free() } } else { - if ws.state !in [.closing, .closed] { + if ws.get_state() !in [.closing, .closed] { ws.debug_log('close with reason, no code') // sending close back according to spec ws.close(1000, 'normal')! @@ -242,7 +249,7 @@ pub fn (mut ws Client) pong() ! { // write_ptr writes len bytes provided a byteptr with a websocket messagetype pub fn (mut ws Client) write_ptr(bytes &u8, payload_len int, code OPCode) !int { // ws.debug_log('write_ptr code: $code') - if ws.state != .open || ws.conn.sock.handle < 1 { + if ws.get_state() != .open || ws.conn.sock.handle < 1 { // todo: send error here later return error('trying to write on a closed socket!') } @@ -329,8 +336,9 @@ pub fn (mut ws Client) write_string(str string) !int { // close closes the websocket connection pub fn (mut ws Client) close(code int, message string) ! { ws.debug_log('sending close, ${code}, ${message}') - if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { - ws.debug_log('close: Websocket already closed (${ws.state}), ${message}, ${code} handle(${ws.conn.sock.handle})') + ws_state := ws.get_state() + if ws_state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + ws.debug_log('close: Websocket already closed (${ws_state}), ${message}, ${code} handle(${ws.conn.sock.handle})') err_msg := 'Socket already closed: ${code}' return error(err_msg) } @@ -362,7 +370,7 @@ pub fn (mut ws Client) close(code int, message string) ! { // send_control_frame sends a control frame to the server fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []u8) ! { ws.debug_log('send control frame ${code}, frame_type: ${frame_typ}') - if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 { + if ws.get_state() !in [.open, .closing] && ws.conn.sock.handle > 1 { return error('socket is not connected') } header_len := if ws.is_server { 2 } else { 6 } @@ -446,15 +454,22 @@ fn parse_uri(url string) !&Uri { } // set_state sets current state of the websocket connection -fn (mut ws Client) set_state(state State) { - lock { - ws.state = state +pub fn (mut ws Client) set_state(state State) { + lock ws.client_state { + ws.client_state.state = state + } +} + +// get_state return the current state of the websocket connection +pub fn (ws Client) get_state() State { + return rlock ws.client_state { + ws.client_state.state } } // assert_not_connected returns error if the connection is not connected fn (ws Client) assert_not_connected() ! { - match ws.state { + match ws.get_state() { .connecting { return error('connect: websocket is connecting') } .open { return error('connect: websocket already open') } .closing { return error('connect: reconnect on closing websocket not supported, please use new client') } @@ -463,9 +478,9 @@ fn (ws Client) assert_not_connected() ! { } // reset_state resets the websocket and initialize default settings -fn (mut ws Client) reset_state() ! { - lock { - ws.state = .closed +pub fn (mut ws Client) reset_state() ! { + lock ws.client_state { + ws.client_state.state = .closed ws.ssl_conn = ssl.new_ssl_conn()! ws.flags = [] ws.fragments = [] diff --git a/vlib/net/websocket/websocket_server.v b/vlib/net/websocket/websocket_server.v index d8edb5ec2f..5083737740 100644 --- a/vlib/net/websocket/websocket_server.v +++ b/vlib/net/websocket/websocket_server.v @@ -6,6 +6,14 @@ import log import time import rand +pub struct ServerState { +mut: + ping_interval int = 30 // interval for sending ping to clients (seconds) + state State = .closed // current state of connection +pub mut: + clients map[string]&ServerClient // clients connected to this server +} + // Server represents a websocket server connection pub struct Server { mut: @@ -21,9 +29,7 @@ pub: port int // port used as listen to incoming connections is_ssl bool // true if secure connection (not supported yet on server) pub mut: - clients map[string]&ServerClient // clients connected to this server - ping_interval int = 30 // interval for sending ping to clients (seconds) - state State // current state of connection + server_state shared ServerState } // ServerClient represents a connected client @@ -50,13 +56,21 @@ pub fn new_server(family net.AddrFamily, port int, route string, opt ServerOpt) family: family port: port logger: opt.logger - state: .closed } } // set_ping_interval sets the interval that the server will send ping messages to clients pub fn (mut s Server) set_ping_interval(seconds int) { - s.ping_interval = seconds + lock s.server_state { + s.server_state.ping_interval = seconds + } +} + +// get_ping_interval return the interval that the server will send ping messages to clients +pub fn (mut s Server) get_ping_interval() int { + return rlock s.server_state { + s.server_state.ping_interval + } } // listen start listen and process to incoming connections from websocket clients @@ -80,11 +94,15 @@ fn (mut s Server) close() { // handle_ping sends ping to all clients every set interval fn (mut s Server) handle_ping() { mut clients_to_remove := []string{} - for s.state == .open { - time.sleep(s.ping_interval * time.second) - for i, _ in s.clients { - mut c := s.clients[i] or { continue } - if c.client.state == .open { + for s.get_state() == .open { + time.sleep(s.get_ping_interval() * time.second) + for i, _ in rlock s.server_state { + s.server_state.clients + } { + mut c := rlock s.server_state { + s.server_state.clients[i] or { continue } + } + if c.client.get_state() == .open { c.client.ping() or { s.logger.debug('server-> error sending ping to client') c.client.close(1002, 'Closing connection: ping send error') or { @@ -93,7 +111,7 @@ fn (mut s Server) handle_ping() { } clients_to_remove << c.client.id } - if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 { + if (time.now().unix - c.client.last_pong_ut) > s.get_ping_interval() * 2 { clients_to_remove << c.client.id c.client.close(1000, 'no pong received') or { continue } } @@ -101,8 +119,8 @@ fn (mut s Server) handle_ping() { } // TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges for client in clients_to_remove { - lock { - s.clients.delete(client) + lock s.server_state { + s.server_state.clients.delete(client) } } clients_to_remove.clear() @@ -124,10 +142,8 @@ fn (mut s Server) serve_client(mut c Client) ! { } // the client is accepted c.socket_write(handshake_response.bytes())! - lock { - unsafe { - s.clients[server_client.client.id] = server_client - } + lock s.server_state { + s.server_state.clients[server_client.client.id] = server_client } s.setup_callbacks(mut server_client) c.listen() or { @@ -159,8 +175,8 @@ fn (mut s Server) setup_callbacks(mut sc ServerClient) { // set standard close so we can remove client if closed sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ! { c.logger.debug('server-> Delete client') - lock { - sc.server.clients.delete(sc.client.id) + lock sc.server.server_state { + sc.server.server_state.clients.delete(sc.client.id) } }, sc) } @@ -173,7 +189,9 @@ fn (mut s Server) accept_new_client() !&Client { conn: new_conn ssl_conn: ssl.new_ssl_conn()! logger: s.logger - state: .open + client_state: ClientState{ + state: .open + } last_pong_ut: time.now().unix id: rand.uuid_v4() } @@ -181,16 +199,28 @@ fn (mut s Server) accept_new_client() !&Client { } // set_state sets current state in a thread safe way -fn (mut s Server) set_state(state State) { - lock { - s.state = state +pub fn (mut s Server) set_state(state State) { + lock s.server_state { + s.server_state.state = state + } +} + +// get_state return current state in a thread safe way +pub fn (s Server) get_state() State { + return rlock s.server_state { + s.server_state.state } } // free manages manual free of memory for Server instance pub fn (mut s Server) free() { + lock s.server_state { + unsafe { + s.server_state.clients.free() + } + } + unsafe { - s.clients.free() s.accept_client_callbacks.free() s.message_callbacks.free() s.close_callbacks.free() diff --git a/vlib/net/websocket/websocket_test.v b/vlib/net/websocket/websocket_test.v index dc55918450..334505c7c9 100644 --- a/vlib/net/websocket/websocket_test.v +++ b/vlib/net/websocket/websocket_test.v @@ -50,7 +50,7 @@ fn start_server(family net.AddrFamily, listen_port int) ! { eprintln('> start_server family:${family} | listen_port: ${listen_port}') mut s := websocket.new_server(family, listen_port, '') // make that in execution test time give time to execute at least one time - s.ping_interval = 1 + s.set_ping_interval(1) s.on_connect(fn (mut s websocket.ServerClient) !bool { // here you can look att the client info and accept or not accept @@ -80,7 +80,7 @@ fn start_server_in_thread_and_wait_till_it_is_ready_to_accept_connections(mut ws spawn fn [mut ws] () { ws.listen() or { panic('websocket server could not listen, err: ${err}') } }() - for ws.state != .open { + for ws.get_state() != .open { time.sleep(10 * time.millisecond) } eprintln('-----------------------------------------------------------------------------')