From 8426db7fe5bfd5c72703c4e3995da7a48207e327 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Wed, 8 Apr 2020 14:22:31 +0200 Subject: [PATCH] copy thecodrr/vws to vlib/net/websocket --- vlib/net/websocket/LICENSE | 21 + vlib/net/websocket/README.md | 28 ++ vlib/net/websocket/logger/logger.v | 56 +++ vlib/net/websocket/main.v | 124 ++++++ vlib/net/websocket/utf8.h | 166 ++++++++ vlib/net/websocket/ws/events.v | 40 ++ vlib/net/websocket/ws/handshake.v | 72 ++++ vlib/net/websocket/ws/io.v | 23 + vlib/net/websocket/ws/ssl.v | 46 ++ vlib/net/websocket/ws/utf8.v | 75 ++++ vlib/net/websocket/ws/utils.v | 55 +++ vlib/net/websocket/ws/ws.v | 652 +++++++++++++++++++++++++++++ 12 files changed, 1358 insertions(+) create mode 100644 vlib/net/websocket/LICENSE create mode 100644 vlib/net/websocket/README.md create mode 100644 vlib/net/websocket/logger/logger.v create mode 100644 vlib/net/websocket/main.v create mode 100644 vlib/net/websocket/utf8.h create mode 100644 vlib/net/websocket/ws/events.v create mode 100644 vlib/net/websocket/ws/handshake.v create mode 100644 vlib/net/websocket/ws/io.v create mode 100644 vlib/net/websocket/ws/ssl.v create mode 100644 vlib/net/websocket/ws/utf8.v create mode 100644 vlib/net/websocket/ws/utils.v create mode 100644 vlib/net/websocket/ws/ws.v diff --git a/vlib/net/websocket/LICENSE b/vlib/net/websocket/LICENSE new file mode 100644 index 0000000000..8d6d593bcb --- /dev/null +++ b/vlib/net/websocket/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Abdullah Atta + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vlib/net/websocket/README.md b/vlib/net/websocket/README.md new file mode 100644 index 0000000000..283647a258 --- /dev/null +++ b/vlib/net/websocket/README.md @@ -0,0 +1,28 @@ +# WebSockets Library for V + +**This is still work-in-progress!** + +Heavily inspired (and used **very** liberally) from [cwebsockets](https://github.com/jeremyhahn/cwebsocket). + +The websockets library itself is ready and working (passes all tests of AutoBahn). What's left: + +1. It needs to be updated and made to run with latest V. +2. No Windows Support (SSL issues) +3. No proper AutoBahn test client (a prototype is in the main.v but nothing clean and neat). +4. No Websocket Server. + +## What's needed for Windows support: + +1. SSL (either make the VSChannel work or OpenSSL) + +General code cleanup etc. is also needed. + +## Contributors + +Anyone and everyone is welcome to contribute. I don't have time for working on this completely but I will review and merge Pull Requests ASAP. So if anyone is interested, know that I am interested too. + +If anyone has any questions regarding design etc. please open an Issue or contact me on Discord. + +## Future Planning: + +This is supposed to be merged into V stdlib but it's not ready for that yet. As soon as it is, I will open a PR. diff --git a/vlib/net/websocket/logger/logger.v b/vlib/net/websocket/logger/logger.v new file mode 100644 index 0000000000..ad3768074b --- /dev/null +++ b/vlib/net/websocket/logger/logger.v @@ -0,0 +1,56 @@ +module logger + +const ( + colors = { + "success": "\e[32", + "debug": "\e[36", + "error": "\e[91", + "warn": "\e[33", + "critical": "\e[31", + "fatal": "\e[31", + "info": "\e[37" + } +) + +struct Logger { + mod string +} + +pub fn new(mod string) &Logger { + return &Logger{mod: mod} +} + +pub fn (l &Logger) d(message string){ + $if debug { + l.print("debug", message) + } +} + +pub fn (l &Logger) i(message string){ + l.print('info', message) +} + +pub fn (l &Logger) e(message string){ + l.print('error', message) +} + +pub fn (l &Logger) c(message string){ + l.print('critical', message) +} + +pub fn (l &Logger) f(message string){ + l.print('fatal', message) + exit(-1) +} + +pub fn (l &Logger) w(message string){ + l.print('warn', message) +} + +pub fn (l &Logger) s(message string) { + l.print('success', message) +} + +fn (l &Logger) print(mod, message string) { + println('${colors[mod]};7m[${mod}]\e[0m \e[1m${l.mod}\e[0m: ${message}') +} \ No newline at end of file diff --git a/vlib/net/websocket/main.v b/vlib/net/websocket/main.v new file mode 100644 index 0000000000..5a30e05129 --- /dev/null +++ b/vlib/net/websocket/main.v @@ -0,0 +1,124 @@ +module main + +import ( + ws + eventbus + time + readline + term + benchmark +) + +const ( + eb = eventbus.new() +) +#flag -I $PWD +#include "utf8.h" + +fn C.utf8_validate_str() bool +fn main(){ + //println(sss) + /* for sss in 0..10 { + mut bm := benchmark.new_benchmark() + for i in 0..10000 { + for a, t in tests { + ss := ws.utf8_validate(t.str, t.len) + if !ss { + panic("failed") + } + //println("${a}:${ss}") + } + } + bm.stop() + println( bm.total_message('remarks about the benchmark') ) + } */ + mut websocket := ws.new("ws://localhost:9001/getCaseCount") + //defer { } + websocket.subscriber.subscribe("on_open", on_open) + websocket.subscriber.subscribe("on_message", on_message) + websocket.subscriber.subscribe("on_error", on_error) + websocket.subscriber.subscribe("on_close", on_close) + //go + websocket.connect() + websocket.read() + //time.usleep(2000000) + //go websocket.listen() + //term.erase_clear() + /* text := read_line("[client]:") + if text == "close" { + ws.close(1005, "done") + time.usleep(1000000) + exit(0) + } + ws.write(text, .text_frame) */ + /* time.usleep(1000000) + ws.read() */ + //ws.close(1005, "done") + /* + */ + //websocket.close(1005, "done") + //read_line("wait") +} + +fn read_line(text string) string { + mut r := readline.Readline{} + mut output := r.read_line(text + " ") or { + panic(err) + } + output = output.replace("\n","") + if output.len <= 0 { + return "" + } + return output +} + +fn on_open(params eventbus.Params){ + println("Websocket opened.") +} + +fn on_message(params eventbus.Params){ + println("Message recieved. Sending it back.") + typ := params.get_string("type") + len := params.get_int("len") + mut ws := params.get_caller(ws.Client{}) + if typ == "string" { + message := params.get_string("payload") + if ws.uri.ends_with("getCaseCount") { + num := message.int() + ws.close(1005, "done") + start_tests(mut ws, num) + return + } + //println("Message: " + message) + ws.write(message.str, len, .text_frame) + } else { + println("Binary message.") + message := params.get_raw("payload") + ws.write(message, len, .binary_frame) + } +} + +fn start_tests(websocket mut ws.Client, num int) { + for i := 1; i < num; i++ { + println("Running test: " + i.str()) + websocket.uri = "ws://localhost:9001/runCase?case=${i.str()}&agent=vws/1.0a" + if websocket.connect() >= 0 { + websocket.listen() + } + } + println("Done!") + websocket.uri = "ws://localhost:9001/updateReports?agent=vws/1.0a" + if websocket.connect() >= 0 { + websocket.read() + websocket.close(1000, "disconnecting...") + } + exit(0) +} + +fn on_close(params eventbus.Params){ + println("Websocket closed.") +} + +fn on_error(params eventbus.Params){ + println("we have an error.") +} diff --git a/vlib/net/websocket/utf8.h b/vlib/net/websocket/utf8.h new file mode 100644 index 0000000000..c2745fc56e --- /dev/null +++ b/vlib/net/websocket/utf8.h @@ -0,0 +1,166 @@ +/* + Copyright (c) 2015, Andreas Fett + All rights reserved. + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include + +typedef int utf8_state; + +static utf8_state next_state(utf8_state, unsigned char); + +// Public API see utf8-validate.h for docs of the following function + +bool utf8_validate(utf8_state *const state, int c) +{ + assert(state); + return (*state = next_state(*state, c)) != -1; +} + +bool utf8_validate_some(utf8_state *const state, const void * const src, size_t len) +{ + assert(state); + assert(src); + for (size_t i = 0; i < len; ++i) { + *state = next_state(*state, *((unsigned char *)src + i)); + if (*state == -1) { + return false; + } + } + return true; +} + +bool utf8_validate_mem(const void * const src, size_t len) +{ + assert(src); + utf8_state state = 0; + for (size_t i = 0; i < len; ++i) { + state = next_state(state, *((unsigned char *)src + i)); + if (state == -1) { + return false; + } + } + + // detect unterminated sequence + return state == 0; +} + +bool utf8_validate_str(const char * const str) +{ + assert(str); + utf8_state state = 0; + for (size_t i = 0; str[i] != 0; ++i) { + state = next_state(state, str[i]); + if (state == -1) { + return false; + } + } + // detect unterminated sequence + return state == 0; +} + +/* Private state engine + * + * The macros below assemble the cases for a switch statement + * matching the language of the ABNF grammar given in rfc3629. + * + * Each SEQ# macro adds the states to match a # char long sequence. + * + * The SEQ#_HELPERs all have a 'fall through' to the next sequence. + * for # > 1 this is an explicit goto + */ + +#define SEQ_END(n) SEQ_ ## n ## _END + +#define SEQ1_HELPER(s, r0) \ +case (s * 4) + 0: if (r0) return 0; goto SEQ_END(s); \ +SEQ_END(s): + +#define SEQ2_HELPER(s, r0, r1) \ +case (s * 4) + 0: if (r0) { printf("ehe"); return (s * 4) + 1; } goto SEQ_END(s); \ +case (s * 4) + 1: if (r1) return 0; return -1; \ +SEQ_END(s): + +#define SEQ3_HELPER(s, r0, r1, r2) \ +case (s * 4) + 0: if (r0) return (s * 4) + 1; goto SEQ_END(s); \ +case (s * 4) + 1: if (r1) return (s * 4) + 2; return -1; \ +case (s * 4) + 2: if (r2) return 0; return -1; \ +SEQ_END(s): + +#define SEQ4_HELPER(s, r0, r1, r2, r3) \ +case (s * 4) + 0: if (r0) return (s * 4) + 1; goto SEQ_END(s); \ +case (s * 4) + 1: if (r1) return (s * 4) + 2; return -1; \ +case (s * 4) + 2: if (r2) return (s * 4) + 3; return -1; \ +case (s * 4) + 3: if (r3) return 0; return -1; \ +SEQ_END(s): + +#define SEQ1(s, r0) SEQ1_HELPER(s, r0) +#define SEQ2(s, r0, r1) SEQ2_HELPER(s, r0, r1) +#define SEQ3(s, r0, r1, r2) SEQ3_HELPER(s, r0, r1, r2) +#define SEQ4(s, r0, r1, r2, r3) SEQ4_HELPER(s, r0, r1, r2, r3) + +// Matcher macros + +#define VALUE(v) (c == v) +#define RANGE(s, e) (c >= s && c <= e) +/* workaround for "-Wtype-limits" as c >= s is allways true for + * the unsigned char in the case of c == 0 */ +#define EGNAR(s, e) ((c >= s + 1 && c <= e) || c == s) + +/* from rfc3629 + * + * UTF8-octets = *( UTF8-char ) + * UTF8-char = UTF8-1 / UTF8-2 / UTF8-3 / UTF8-4 + * UTF8-1 = %x00-7F + * UTF8-2 = %xC2-DF UTF8-tail + * UTF8-3 = %xE0 %xA0-BF UTF8-tail / %xE1-EC 2( UTF8-tail ) / + * %xED %x80-9F UTF8-tail / %xEE-EF 2( UTF8-tail ) + * UTF8-4 = %xF0 %x90-BF 2( UTF8-tail ) / %xF1-F3 3( UTF8-tail ) / + * %xF4 %x80-8F 2( UTF8-tail ) + * UTF8-tail = %x80-BF + */ + +#define TAIL RANGE(0x80, 0xBF) + +static inline utf8_state next_state(utf8_state state, unsigned char c) +{ + printf("C: %d\n", c); + switch (state) { + SEQ1(0, EGNAR(0x00, 0x7F)) + SEQ2(1, RANGE(0xC2, 0xDF), TAIL) + SEQ3(2, VALUE(0xE0), RANGE(0xA0, 0xBF), TAIL) + SEQ3(3, RANGE(0xE1, 0xEC), TAIL, TAIL) + SEQ3(4, VALUE(0xED), RANGE(0x80, 0x9F), TAIL) + SEQ3(5, RANGE(0xEE, 0xEF), TAIL, TAIL) + SEQ4(6, VALUE(0xF0), RANGE(0x90, 0xBF), TAIL, TAIL) + SEQ4(7, RANGE(0xF1, 0xF3), TAIL, TAIL, TAIL) + SEQ4(8, VALUE(0xF4), RANGE(0x80, 0x8F), TAIL, TAIL) + // no sequence start matched + break; + default: + /* + * This should not happen, unless you feed an error + * state or an uninitialized utf8_state to this function. + */ + assert(false && "invalid utf8 state"); + } + + return -1; +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/events.v b/vlib/net/websocket/ws/events.v new file mode 100644 index 0000000000..d4b0fa521d --- /dev/null +++ b/vlib/net/websocket/ws/events.v @@ -0,0 +1,40 @@ +module ws + +import ( + eventbus +) + +fn (ws &Client) send_message_event(msg Message){ + mut params := eventbus.Params{} + mut typ := "" + if msg.opcode == .text_frame { + params.put_string("payload", string(byteptr(msg.payload))) + typ = 'string' + } else if msg.opcode == .binary_frame { + params.put_custom("payload", "binary", msg.payload) + typ = 'binary' + } + params.put_string("type", typ) + params.put_int("len", msg.payload_len) + ws.eb.publish("on_message", params, ws) + l.d("sending on_message event") +} + +fn (ws &Client) send_error_event(err string) { + mut params := eventbus.Params{} + params.put_string("error", err) + ws.eb.publish("on_error", params, ws) + l.d("sending on_error event") +} + +fn (ws &Client) send_close_event() { + params := eventbus.Params{} + ws.eb.publish("on_close", params, ws) + l.d("sending on_close event") +} + +fn (ws &Client) send_open_event() { + params := eventbus.Params{} + ws.eb.publish("on_open", params, ws) + l.d("sending on_open event") +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/handshake.v b/vlib/net/websocket/ws/handshake.v new file mode 100644 index 0000000000..6bfdd12c70 --- /dev/null +++ b/vlib/net/websocket/ws/handshake.v @@ -0,0 +1,72 @@ +module ws + +fn (ws mut Client) read_handshake(seckey string){ + l.d("reading handshake...") + mut bytes_read := 0 + max_buffer := 256 + buffer_size := 1 + mut buffer := malloc(max_buffer) + + for bytes_read <= max_buffer { + res := ws.read_from_server(buffer + bytes_read, buffer_size) + if res == 0 || res == -1 { + l.f("read_handshake: Failed to read handshake.") + } + if buffer[bytes_read] == `\n` && buffer[bytes_read-1] == `\r` && buffer[bytes_read-2] == `\n` && buffer[bytes_read-3] == `\r` { + break + } + bytes_read += buffer_size + } + buffer[max_buffer+1] = `\0` + ws.handshake_handler(string(byteptr(buffer)), seckey) +} + +fn (ws mut Client) handshake_handler(handshake_response, seckey string){ + l.d("handshake_handler:\r\n${handshake_response}") + lines := handshake_response.split_into_lines() + + header := lines[0] + if !header.starts_with("HTTP/1.1 101") && !header.starts_with("HTTP/1.0 101") { + l.f("handshake_handler: invalid HTTP status response code") + } + + for i in 1..lines.len { + if lines[i].len <= 0 || lines[i] == "\r\n" { + continue + } + keys := lines[i].split(":") + + match keys[0] { + "Upgrade" { + ws.flags << Flag.has_upgrade + } + "Connection" { + ws.flags << Flag.has_connection + } + "Sec-WebSocket-Accept" { + l.d("comparing hashes") + response := create_key_challenge_response(seckey) + if keys[1].trim_space() != response { + l.e("handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.") + } + ws.flags << Flag.has_accept + unsafe { + response.free() + } + } else {} + } + unsafe { + keys.free() + } + } + if ws.flags.len < 3 { + ws.close(1002, "invalid websocket HTTP headers") + l.e("invalid websocket HTTP headers") + } + l.i("handshake successful!") + unsafe { + handshake_response.free() + lines.free() + header.free() + } +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/io.v b/vlib/net/websocket/ws/io.v new file mode 100644 index 0000000000..e344c2e918 --- /dev/null +++ b/vlib/net/websocket/ws/io.v @@ -0,0 +1,23 @@ +module ws + +fn C.write() int + +fn (ws mut Client) write_to_server(buf voidptr, len int) int { + mut bytes_written := 0 + ws.write_lock.lock() + bytes_written = if ws.is_ssl { + C.SSL_write(ws.ssl, buf, len) + } else { + C.write(ws.socket.sockfd, buf, len) + } + ws.write_lock.unlock() + return bytes_written +} + +fn (ws &Client) read_from_server(buffer byteptr, buffer_size int) int { + return if ws.is_ssl { + C.SSL_read(ws.ssl, buffer, buffer_size) + } else { + C.read(ws.socket.sockfd, buffer, buffer_size) + } +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/ssl.v b/vlib/net/websocket/ws/ssl.v new file mode 100644 index 0000000000..23da7c8382 --- /dev/null +++ b/vlib/net/websocket/ws/ssl.v @@ -0,0 +1,46 @@ +module ws + +#flag -lssl +#include +#include +#include + +struct C.SSL_CTX +struct C.SSL +struct C.SSL_METHOD +fn C.SSL_load_error_strings() +fn C.SSL_library_init() +fn C.SSLv23_client_method() &SSL_METHOD +fn C.SSL_CTX_new() &SSL_CTX +fn C.SSL_new() &SSL +fn C.SSL_set_fd() int +fn C.SSL_connect() int +fn C.SSL_shutdown() +fn C.SSL_free() +fn C.SSL_CTX_free() +fn C.SSL_write() int +fn C.SSL_read() int + +fn (ws mut Client) connect_ssl(){ + l.i("Using secure SSL connection") + C.SSL_load_error_strings() + C.SSL_library_init() + + ws.sslctx = SSL_CTX_new(SSLv23_client_method()) + if ws.sslctx == C.NULL { + l.f("Couldn't get ssl context") + } + + ws.ssl = SSL_new(ws.sslctx) + if ws.ssl == C.NULL { + l.f("Couldn't create OpenSSL instance.") + } + + if SSL_set_fd(ws.ssl, ws.socket.sockfd) != 1 { + l.f("Couldn't assign ssl to socket.") + } + + if SSL_connect(ws.ssl) != 1 { + l.f("Couldn't connect using SSL.") + } +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/utf8.v b/vlib/net/websocket/ws/utf8.v new file mode 100644 index 0000000000..9c4d0cc6d0 --- /dev/null +++ b/vlib/net/websocket/ws/utf8.v @@ -0,0 +1,75 @@ +module ws + +pub fn utf8_validate_str(str string) bool { + return utf8_validate(str.str, str.len) +} +struct Utf8State { + mut: + index int + subindex int + failed bool +} +pub fn utf8_validate(data byteptr, len int) bool { + mut state := Utf8State{} + for i := 0; i < len; i++ { + s := data[i] + if s == 0 {break} + state.next_state(s) + if state.failed { + return false + } + //i++ //fast forward + } + return !state.failed && state.subindex <= 0 +} + +fn (s mut Utf8State) seq(r0 bool, r1 bool, is_tail bool) bool { + if s.subindex == 0 || (s.index > 1 && s.subindex == 1) || (s.index >= 6 && s.subindex == 2) { + if (s.subindex == 0 && r0) || (s.subindex == 1 && r1) || (s.subindex == 2 && is_tail) { + s.subindex++ + return true + } + goto next + } + else { + s.failed = true + if is_tail { + s.index = 0 + s.subindex = 0 + s.failed = false + } + return true + } + next: + s.index++ + s.subindex = 0 + return false +} + +fn (s mut Utf8State) next_state (c byte) { + //sequence 1 + if s.index == 0 { + if ((c >= 0x00 + 1 && c <= 0x7F) || c == 0x00) { + return + } + s.index++ + s.subindex = 0 + } + is_tail := c >= 0x80 && c <= 0xBF + //sequence 2 + if s.index == 1 && s.seq(c >= 0xC2 && c <= 0xDF, false, is_tail) {return} + + //sequence 3 + if s.index == 2 && s.seq(c == 0xE0, c >= 0xA0 && c <= 0xBF, is_tail) {return} + if s.index == 3 && s.seq(c >= 0xE1 && c <= 0xEC, c >= 0x80 && c <= 0xBF, is_tail) {return} + if s.index == 4 && s.seq(c == 0xED, c >= 0x80 && c <= 0x9F, is_tail) {return} + if s.index == 5 && s.seq(c >= 0xEE && c <= 0xEF, c >= 0x80 && c <= 0xBF, is_tail) {return} + + //sequence 4 + if s.index == 6 && s.seq(c == 0xF0, c >= 0x90 && c <= 0xBF, is_tail) {return} + if s.index == 7 && s.seq(c >= 0xF1 && c <= 0xF3, c >= 0x80 && c <= 0xBF, is_tail) {return} + if s.index == 8 && s.seq(c == 0xF4, c >= 0x80 && c <= 0x8F, is_tail) {return} + + //we should never reach here + s.failed = true +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/utils.v b/vlib/net/websocket/ws/utils.v new file mode 100644 index 0000000000..2c1d9718e7 --- /dev/null +++ b/vlib/net/websocket/ws/utils.v @@ -0,0 +1,55 @@ +module ws + +import ( + time + rand + math + crypto.sha1 + encoding.base64 +) + +fn htonl64(payload_len u64) byteptr { + mut ret := malloc(8) + + ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff) + ret[1] = byte(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff) + ret[2] = byte(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff) + ret[3] = byte(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff) + ret[4] = byte(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff) + ret[5] = byte(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff) + ret[6] = byte(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff) + ret[7] = byte(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff) + return ret +} + +fn create_masking_key() []byte { + t := time.ticks() + tseq := t % 23237671 + mut rnd := rand.new_pcg32(u64(t), u64(tseq) ) + mask_bit := byte(rnd.bounded_next(u32(math.max_i32))) + buf := [`0`].repeat(4) + C.memcpy(buf.data, &mask_bit, 4) + return buf +} + +fn create_key_challenge_response(seckey string) string { + guid := "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + sha1buf := seckey + guid + hash := sha1.sum(sha1buf.bytes()) + hashstr := string(byteptr(hash.data)) + b64 := base64.encode(hashstr) + unsafe { + sha1buf.free() + hash.free() + } + return b64 +} + +fn get_nonce() string { + mut nonce := []byte + alphanum := "0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz" + for i in 0..16 { + nonce << alphanum[rand.next(61)] + } + return string(byteptr(nonce.data)) +} \ No newline at end of file diff --git a/vlib/net/websocket/ws/ws.v b/vlib/net/websocket/ws/ws.v new file mode 100644 index 0000000000..619204bfb9 --- /dev/null +++ b/vlib/net/websocket/ws/ws.v @@ -0,0 +1,652 @@ +module ws + +import ( + net + net.urllib + encoding.base64 + eventbus + sync + logger +) + +const ( + l = logger.new("ws") +) + +pub struct Client { + retry int + eb &eventbus.EventBus + is_ssl bool + lock sync.Mutex = sync.new_mutex() + write_lock sync.Mutex = sync.new_mutex() + //subprotocol_len int + //cwebsocket_subprotocol *subprotocol; + //cwebsocket_subprotocol *subprotocols[]; + mut: + state State + socket net.Socket + flags []Flag + sslctx &C.SSL_CTX + ssl &C.SSL + fragments []Fragment + pub mut: + uri string + subscriber &eventbus.Subscriber +} + +struct Fragment { + data voidptr + len u64 + code OPCode +} + +struct Message { + opcode OPCode + payload voidptr + payload_len int +} + +pub enum OPCode { + continuation = 0x00, + text_frame = 0x01, + binary_frame = 0x02, + close = 0x08, + ping = 0x09, + pong = 0x0A +} + +enum State { + connecting = 0, + connected, + open, + closing, + closed +} + +struct Uri { + mut: + hostname string + port string + resource string + querystring string +} + +enum Flag { + has_accept, + has_connection, + has_upgrade +} + +struct Frame { + mut: + fin bool + rsv1 bool + rsv2 bool + rsv3 bool + opcode OPCode + mask bool + payload_len u64 + masking_key [4]byte +} + +pub fn new(uri string) &Client { + eb := eventbus.new() + ws := &Client{ + uri: uri + state: .closed + eb: eb, + subscriber: eb.subscriber + is_ssl: uri.starts_with("wss") + ssl: C.NULL + sslctx: C.NULL + } + return ws +} +fn C.sscanf() int + +fn (ws &Client) parse_uri() &Uri { + u := urllib.parse(ws.uri) or {panic(err)} + v := u.request_uri().split("?") + querystring := if v.len > 1 {"?" + v[1]} else {""} + return &Uri { + hostname: u.hostname() + port: u.port() + resource: v[0] + querystring: querystring + } +} + +pub fn (ws mut Client) connect() int { + if ws.state == .connected { + l.f("connect: websocket already connected") + } else if ws.state == .connecting { + l.f("connect: websocket already connecting") + } else if ws.state == .open { + l.f("connect: websocket already open") + } + + ws.lock.lock() + ws.state = .connecting + ws.lock.unlock() + + uri := ws.parse_uri() + + nonce := get_nonce() + seckey := base64.encode(nonce) + + ai_family := C.AF_INET + ai_socktype := C.SOCK_STREAM + + handshake := "GET ${uri.resource}${uri.querystring} HTTP/1.1\r\nHost: ${uri.hostname}:${uri.port}\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: ${seckey}\r\nSec-WebSocket-Version: 13\r\n\r\n" + + socket := net.new_socket(ai_family, ai_socktype, 0) or { + l.f(err) + return -1 + } + ws.socket = socket + ws.socket.connect(uri.hostname, uri.port.int()) or { + l.f(err) + return -1 + } + + optval := 1 + ws.socket.setsockopt(C.SOL_SOCKET, C.SO_KEEPALIVE, &optval) or { + l.f(err) + return -1 + } + + if ws.is_ssl { + ws.connect_ssl() + } + + ws.lock.lock() + ws.state = .connected + ws.lock.unlock() + + res := ws.write_to_server(handshake.str, handshake.len) + if res <= 0 { + l.f("Handshake failed.") + } + + ws.read_handshake(seckey) + + ws.lock.lock() + ws.state = .open + ws.lock.unlock() + + ws.send_open_event() + + unsafe { + handshake.free() + nonce.free() + free(uri) + } + return 0 +} + +pub fn (ws mut Client) close(code int, message string){ + if ws.state != .closed && ws.socket.sockfd > 1 { + + ws.lock.lock() + ws.state = .closing + ws.lock.unlock() + + mut code32 := 0 + if code > 0 { + _code := C.htons(code) + message_len := message.len + 2 + mut close_frame := [`0`].repeat(message_len) + close_frame[0] = _code & 0xFF + close_frame[1] = (_code >> 8) + code32 = (close_frame[0] << 8) + close_frame[1] + for i in 0..message.len { + close_frame[i+2] = message[i] + } + ws.send_control_frame(.close, "CLOSE", close_frame) + } else { + ws.send_control_frame(.close, "CLOSE", []) + } + + if ws.ssl != C.NULL { + SSL_shutdown(ws.ssl) + SSL_free(ws.ssl) + if ws.sslctx != C.NULL { + SSL_CTX_free(ws.sslctx) + } + } else { + if C.shutdown(ws.socket.sockfd, C.SHUT_WR) == -1 { + l.e("Unabled to shutdown websocket.") + } + mut buf := [`0`] + for ws.read_from_server(buf.data, 1) > 0 { + buf[0] = `\0` + } + unsafe { + buf.free() + } + if C.close(ws.socket.sockfd) == -1 { + //ws.send_close_event()(websocket, 1011, strerror(errno)); + } + } + ws.fragments = [] + ws.send_close_event() + + ws.lock.lock() + ws.state = .closed + ws.lock.unlock() + unsafe { + + } + //TODO impl autoreconnect + } +} + +pub fn (ws mut Client) write(payload byteptr, payload_len int, code OPCode) int { + if ws.state != .open { + ws.send_error_event("WebSocket closed. Cannot write.") + goto free_data + return -1 + } + + header_len := 6 + if payload_len > 125 {2} else {0} + if payload_len > 0xffff {6} else {0} + masking_key := create_masking_key() + mut header := [`0`].repeat(header_len) + mut bytes_written := -1 + + header[0] = (int(code) | 0x80) + if payload_len <= 125 { + header[1] = (payload_len | 0x80) + header[2] = masking_key[0] + header[3] = masking_key[1] + header[4] = masking_key[2] + header[5] = masking_key[3] + } else if payload_len > 125 && payload_len <= 0xffff { + len16 := C.htons(payload_len) + header[1] = (126 | 0x80) + C.memcpy(header.data+2, &len16, 2) + header[4] = masking_key[0] + header[5] = masking_key[1] + header[6] = masking_key[2] + header[7] = masking_key[3] + } else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615 + len64 := htonl64(u64(payload_len)) + header[1] = (127 | 0x80) + C.memcpy(header.data+2, len64, 8) + header[10] = masking_key[0] + header[11] = masking_key[1] + header[12] = masking_key[2] + header[13] = masking_key[3] + } else { + l.c("write: frame too large") + ws.close(1009, "frame too large") + goto free_data + return -1 + } + + frame_len := header_len + payload_len + mut frame_buf := [`0`].repeat(frame_len) + + C.memcpy(frame_buf.data, header.data, header_len) + C.memcpy(&frame_buf.data[header_len], payload, payload_len) + + for i in 0..payload_len { + frame_buf[header_len+i] ^= masking_key[i % 4] & 0xff + } + + bytes_written = ws.write_to_server(frame_buf.data, frame_len) + if bytes_written == -1 { + err := string(byteptr(C.strerror(C.errno))) + l.e("write: there was an error writing data: ${err}") + ws.send_error_event("Error writing data") + goto free_data + return -1 + } + l.d("write: ${bytes_written} bytes written.") + free_data: + unsafe { + free(payload) + frame_buf.free() + header.free() + masking_key.free() + } + return bytes_written +} + +pub fn (ws mut Client) listen() { + l.i("Starting listener...") + for ws.state == .open { + ws.read() + } + l.i("Listener stopped as websocket was closed.") +} + +pub fn (ws mut Client) read() int { + mut bytes_read := u64(0) + + initial_buffer := u64(256) + mut header_len := 2 + header_len_offset := 2 + extended_payload16_end_byte := 4 + extended_payload64_end_byte := 10 + + mut payload_len := u64(0) + mut data := C.calloc(initial_buffer, 1)//[`0`].repeat(int(max_buffer)) + + mut frame := Frame{} + mut frame_size := u64(header_len) + + for bytes_read < frame_size && ws.state == .open { + byt := ws.read_from_server(data + int(bytes_read), 1) + match byt { + 0 { + error := "server closed the connection." + l.e("read: ${error}") + ws.send_error_event(error) + ws.close(1006, error) + goto free_data + return -1 + } + -1 { + err := string(byteptr(C.strerror(C.errno))) + l.e("read: error reading frame. ${err}") + ws.send_error_event("error reading frame") + goto free_data + return -1 + } else { + bytes_read++ + } + } + if bytes_read == u64(header_len_offset) { + frame.fin = (data[0] & 0x80) == 0x80 + frame.rsv1 = (data[0] & 0x40) == 0x40 + frame.rsv2 = (data[0] & 0x20) == 0x20 + frame.rsv3 = (data[0] & 0x10) == 0x10 + frame.opcode = OPCode(int(data[0] & 0x7F)) + frame.mask = (data[1] & 0x80) == 0x80 + frame.payload_len = u64(data[1] & 0x7F) + + //masking key + if frame.mask { + frame.masking_key[0] = data[2] + frame.masking_key[1] = data[3] + frame.masking_key[2] = data[4] + frame.masking_key[3] = data[5] + } + + payload_len = frame.payload_len + frame_size = u64(header_len) + payload_len + } + + if frame.payload_len == u64(126) && bytes_read == u64(extended_payload16_end_byte) { + header_len += 2 + + mut extended_payload_len := 0 + extended_payload_len |= data[2] << 8 + extended_payload_len |= data[3] << 0 + + //masking key + if frame.mask { + frame.masking_key[0] = data[4] + frame.masking_key[1] = data[5] + frame.masking_key[2] = data[6] + frame.masking_key[3] = data[7] + } + + payload_len = u64(extended_payload_len) + frame_size = u64(header_len) + payload_len + if frame_size > initial_buffer { + l.d("reallocating: ${frame_size}") + data = C.realloc(data, frame_size) + } + } else if frame.payload_len == u64(127) && bytes_read == u64(extended_payload64_end_byte) { + header_len += 8 //TODO Not sure... + + mut extended_payload_len := u64(0) + extended_payload_len |= u64(data[2]) << 56 + extended_payload_len |= u64(data[3]) << 48 + extended_payload_len |= u64(data[4]) << 40 + extended_payload_len |= u64(data[5]) << 32 + extended_payload_len |= u64(data[6]) << 24 + extended_payload_len |= u64(data[7]) << 16 + extended_payload_len |= u64(data[8]) << 8 + extended_payload_len |= u64(data[9]) << 0 + + //masking key + if frame.mask { + frame.masking_key[0] = data[10] + frame.masking_key[1] = data[11] + frame.masking_key[2] = data[12] + frame.masking_key[3] = data[13] + } + + payload_len = extended_payload_len + frame_size = u64(header_len) + payload_len + if frame_size > initial_buffer { + l.d("reallocating: ${frame_size}") + data = C.realloc(data, frame_size) + } + } + } + + // unmask the payload + if frame.mask { + for i in 0..payload_len { + data[header_len+i] ^= frame.masking_key[i % 4] & 0xff + } + } + + if ws.fragments.len > 0 && frame.opcode in [.text_frame, .binary_frame] { + ws.close(0, "") + goto free_data + return -1 + } else if frame.opcode in [.text_frame, .binary_frame] { + data_node: + l.d("read: recieved text_frame or binary_frame") + mut payload := malloc(sizeof(byte) * int(payload_len) + 1) + if payload == C.NULL { + l.f("out of memory") + } + C.memcpy(payload, &data[header_len], payload_len) + if frame.fin { + if ws.fragments.len > 0 { + //join fragments + ws.fragments << Fragment{ + data: payload + len: payload_len + } + mut frags := []Fragment + mut size := u64(0) + for f in ws.fragments { + if f.len > 0 { + frags << f + size += f.len + } + } + mut pl := malloc(sizeof(byte) * int(size)) + if pl == C.NULL { + l.f("out of memory") + } + mut by := 0 + for i, f in frags { + C.memcpy(pl + by, f.data, f.len) + by += int(f.len) + unsafe {free(f.data)} + } + payload = pl + frame.opcode = ws.fragments[0].code + payload_len = size + //clear the fragments + unsafe { + ws.fragments.free() + } + ws.fragments = [] + } + payload[payload_len] = `\0` + if frame.opcode == .text_frame && payload_len > 0 { + if !utf8_validate(payload, int(payload_len)) { + l.e("malformed utf8 payload") + ws.send_error_event("Recieved malformed utf8.") + ws.close(1007, "malformed utf8 payload") + goto free_data + return -1 + } + } + message := Message { + opcode: frame.opcode + payload: payload + payload_len: int(payload_len) + } + ws.send_message_event(message) + } else { + //fragment start. + ws.fragments << Fragment{ + data: payload + len: payload_len + code: frame.opcode + } + } + unsafe { + free(data) + } + return int(bytes_read) + } + else if frame.opcode == .continuation { + l.d("read: continuation") + if ws.fragments.len <= 0 { + l.e("Nothing to continue.") + ws.close(1002, "nothing to continue") + goto free_data + return -1 + } + goto data_node + return 0 + } + else if frame.opcode == .ping { + l.d("read: ping") + if !frame.fin { + ws.close(1002, "control message must not be fragmented") + goto free_data + return -1 + } + if frame.payload_len > 125 { + ws.close(1002, "control frames must not exceed 125 bytes") + goto free_data + return -1 + } + mut payload := []byte + if payload_len > 0 { + payload = [`0`].repeat(int(payload_len)) + C.memcpy(payload.data, &data[header_len], payload_len) + } + unsafe { + free(data) + } + return ws.send_control_frame(.pong, "PONG", payload) + } + else if frame.opcode == .pong { + if !frame.fin { + ws.close(1002, "control message must not be fragmented") + goto free_data + return -1 + } + unsafe { + free(data) + } + //got pong + return 0 + } + else if frame.opcode == .close { + l.d("read: close") + if frame.payload_len > 125 { + ws.close(1002, "control frames must not exceed 125 bytes") + goto free_data + return -1 + } + mut code := 0 + mut reason := "" + if payload_len > 2 { + code = (int(data[header_len]) << 8) + int(data[header_len+1]) + header_len += 2 + payload_len -= 2 + reason = string(&data[header_len]) + l.i("Closing with reason: ${reason} & code: ${code}") + if reason.len > 1 && !utf8_validate(reason.str, reason.len) { + l.e("malformed utf8 payload") + ws.send_error_event("Recieved malformed utf8.") + ws.close(1007, "malformed utf8 payload") + goto free_data + return -1 + } + } + unsafe{ + free(data) + } + ws.close(code, reason) + return 0 + } + l.e("read: Recieved unsupported opcode: ${frame.opcode} fin: ${frame.fin} uri: ${ws.uri}") + ws.send_error_event("Recieved unsupported opcode: ${frame.opcode}") + ws.close(1002, "Unsupported opcode") + free_data: + unsafe { + free(data) + } + return -1 +} + +fn (ws mut Client) send_control_frame(code OPCode, frame_typ string, payload []byte) int { + if ws.socket.sockfd <= 0 { + l.e("No socket opened.") + goto free_data + return -1 + } + header_len := 6 + frame_len := header_len + payload.len + mut control_frame := [`0`].repeat(frame_len) + masking_key := create_masking_key() + control_frame[0] = (int(code) | 0x80) + control_frame[1] = (payload.len | 0x80) + control_frame[2] = masking_key[0] + control_frame[3] = masking_key[1] + control_frame[4] = masking_key[2] + control_frame[5] = masking_key[3] + if code == .close { + close_code := 1000 + if payload.len > 2 { + mut parsed_payload := [`0`].repeat(payload.len + 1) + C.memcpy(parsed_payload.data, &payload[0], payload.len) + parsed_payload[payload.len] = `\0` + for i in 0..payload.len { + control_frame[6+i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff + } + unsafe { + parsed_payload.free() + } + } + } else { + for i in 0..payload.len { + control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff + } + } + mut bytes_written := -1 + bytes_written = ws.write_to_server(control_frame.data, frame_len) + free_data: + unsafe { + control_frame.free() + payload.free() + masking_key.free() + } + match bytes_written { + 0 { + l.d("send_control_frame: remote host closed the connection.") + return 0 + } + -1 { + l.c("send_control_frame: error sending ${frame_typ} control frame.") + return -1 + } else { + l.d("send_control_frame: wrote ${bytes_written} byte ${frame_typ} frame.") + return bytes_written + } + } +} \ No newline at end of file