1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00
v/vlib/net/websocket/websocket_server.v

229 lines
6.1 KiB
V

module websocket
import net
import net.ssl
import log
import time
import rand
pub struct ServerState {
mut:
ping_interval int = 30 // interval for sending ping to clients (seconds)
state State = .closed // current state of connection
pub mut:
clients map[string]&ServerClient // clients connected to this server
}
// Server represents a websocket server connection
pub struct Server {
mut:
logger &log.Logger = &log.Logger(&log.Log{
level: .info
})
ls &net.TcpListener = unsafe { nil } // listener used to get incoming connection to socket
accept_client_callbacks []AcceptClientFn // accept client callback functions
message_callbacks []MessageEventHandler // new message callback functions
close_callbacks []CloseEventHandler // close message callback functions
pub:
family net.AddrFamily = .ip
port int // port used as listen to incoming connections
is_ssl bool // true if secure connection (not supported yet on server)
pub mut:
server_state shared ServerState
}
// ServerClient represents a connected client
pub struct ServerClient {
pub:
resource_name string // resource that the client access
client_key string // unique key of client
pub mut:
server &Server = unsafe { nil }
client &Client = unsafe { nil }
}
[params]
pub struct ServerOpt {
logger &log.Logger = &log.Logger(&log.Log{
level: .info
})
}
// new_server instance a new websocket server on provided port and route
pub fn new_server(family net.AddrFamily, port int, route string, opt ServerOpt) &Server {
return &Server{
ls: 0
family: family
port: port
logger: opt.logger
}
}
// set_ping_interval sets the interval that the server will send ping messages to clients
pub fn (mut s Server) set_ping_interval(seconds int) {
lock s.server_state {
s.server_state.ping_interval = seconds
}
}
// get_ping_interval return the interval that the server will send ping messages to clients
pub fn (mut s Server) get_ping_interval() int {
return rlock s.server_state {
s.server_state.ping_interval
}
}
// listen start listen and process to incoming connections from websocket clients
pub fn (mut s Server) listen() ! {
s.logger.info('websocket server: start listen on port ${s.port}')
s.ls = net.listen_tcp(s.family, ':${s.port}')!
s.set_state(.open)
spawn s.handle_ping()
for {
mut c := s.accept_new_client() or { continue }
spawn s.serve_client(mut c)
}
s.logger.info('websocket server: end listen on port ${s.port}')
}
// Close closes server (not implemented yet)
fn (mut s Server) close() {
// TODO: implement close when moving to net from x.net
}
// handle_ping sends ping to all clients every set interval
fn (mut s Server) handle_ping() {
mut clients_to_remove := []string{}
for s.get_state() == .open {
time.sleep(s.get_ping_interval() * time.second)
for i, _ in rlock s.server_state {
s.server_state.clients
} {
mut c := rlock s.server_state {
s.server_state.clients[i] or { continue }
}
if c.client.get_state() == .open {
c.client.ping() or {
s.logger.debug('server-> error sending ping to client')
c.client.close(1002, 'Closing connection: ping send error') or {
// we want to continue even if error
continue
}
clients_to_remove << c.client.id
}
if (time.now().unix - c.client.last_pong_ut) > s.get_ping_interval() * 2 {
clients_to_remove << c.client.id
c.client.close(1000, 'no pong received') or { continue }
}
}
}
// TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges
for client in clients_to_remove {
lock s.server_state {
s.server_state.clients.delete(client)
}
}
clients_to_remove.clear()
}
}
// serve_client accepts incoming connection and sets up the callbacks
fn (mut s Server) serve_client(mut c Client) ! {
c.logger.debug('server-> Start serve client (${c.id})')
defer {
c.logger.debug('server-> End serve client (${c.id})')
}
mut handshake_response, mut server_client := s.handle_server_handshake(mut c)!
accept := s.send_connect_event(mut server_client)!
if !accept {
s.logger.debug('server-> client not accepted')
c.shutdown_socket()!
return
}
// the client is accepted
c.socket_write(handshake_response.bytes())!
lock s.server_state {
s.server_state.clients[server_client.client.id] = server_client
}
s.setup_callbacks(mut server_client)
c.listen() or {
s.logger.error(err.msg())
return err
}
}
// setup_callbacks initialize all callback functions
fn (mut s Server) setup_callbacks(mut sc ServerClient) {
if s.message_callbacks.len > 0 {
for cb in s.message_callbacks {
if cb.is_ref {
sc.client.on_message_ref(cb.handler2, cb.ref)
} else {
sc.client.on_message(cb.handler)
}
}
}
if s.close_callbacks.len > 0 {
for cb in s.close_callbacks {
if cb.is_ref {
sc.client.on_close_ref(cb.handler2, cb.ref)
} else {
sc.client.on_close(cb.handler)
}
}
}
// set standard close so we can remove client if closed
sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ! {
c.logger.debug('server-> Delete client')
lock sc.server.server_state {
sc.server.server_state.clients.delete(sc.client.id)
}
}, sc)
}
// accept_new_client creates a new client instance for client that connects to the socket
fn (mut s Server) accept_new_client() !&Client {
mut new_conn := s.ls.accept()!
c := &Client{
is_server: true
conn: new_conn
ssl_conn: ssl.new_ssl_conn()!
logger: s.logger
client_state: ClientState{
state: .open
}
last_pong_ut: time.now().unix
id: rand.uuid_v4()
}
return c
}
// set_state sets current state in a thread safe way
pub fn (mut s Server) set_state(state State) {
lock s.server_state {
s.server_state.state = state
}
}
// get_state return current state in a thread safe way
pub fn (s Server) get_state() State {
return rlock s.server_state {
s.server_state.state
}
}
// free manages manual free of memory for Server instance
pub fn (mut s Server) free() {
lock s.server_state {
unsafe {
s.server_state.clients.free()
}
}
unsafe {
s.accept_client_callbacks.free()
s.message_callbacks.free()
s.close_callbacks.free()
}
}