From 863cf8af60e37c6145f64aa48419fd43c2b3f95d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uwe=20Kr=C3=BCger?= <45282134+UweKrueger@users.noreply.github.com> Date: Thu, 6 Aug 2020 15:28:19 +0200 Subject: [PATCH] sync: channel implementation (#6074) --- doc/upcoming.md | 71 ++++ thirdparty/stdatomic/nix/atomic.h | 431 +++++++++++++++++++-- thirdparty/stdatomic/win/atomic.h | 167 +++++++- vlib/builtin/cfns.c.v | 6 +- vlib/sync/atomic2/atomic.v | 13 +- vlib/sync/channels.v | 505 +++++++++++++++++++++++++ vlib/sync/sync_nix.c.v | 43 ++- vlib/sync/sync_windows.c.v | 11 +- vlib/v/tests/bench/channel_bench_go.go | 68 ++++ vlib/v/tests/bench/channel_bench_v.v | 70 ++++ vlib/v/tests/channel_1_test.v | 23 ++ vlib/v/tests/channel_2_test.v | 23 ++ vlib/v/tests/channel_3_test.v | 38 ++ vlib/v/tests/channel_4_test.v | 38 ++ vlib/v/tests/channel_select_test.v | 76 ++++ 15 files changed, 1532 insertions(+), 51 deletions(-) create mode 100644 vlib/sync/channels.v create mode 100644 vlib/v/tests/bench/channel_bench_go.go create mode 100644 vlib/v/tests/bench/channel_bench_v.v create mode 100644 vlib/v/tests/channel_1_test.v create mode 100644 vlib/v/tests/channel_2_test.v create mode 100644 vlib/v/tests/channel_3_test.v create mode 100644 vlib/v/tests/channel_4_test.v create mode 100644 vlib/v/tests/channel_select_test.v diff --git a/doc/upcoming.md b/doc/upcoming.md index db41c71000..867da4eeca 100644 --- a/doc/upcoming.md +++ b/doc/upcoming.md @@ -12,6 +12,7 @@ for the current state of V*** * [Weaknesses](#weaknesses) * [Compatibility](#compatibility) * [Automatic Lock](#automatic-lock) + * [Channels](#channels) ## Concurrency @@ -190,3 +191,73 @@ are sometimes surprising. Each statement should be seen as a single transaction that is unrelated to the previous or following statement. Therefore - but also for performance reasons - it's often better to group consecutive coherent statements in an explicit `lock` block. + +### Channels +Channels in V work basically like those in Go. You can `push()` objects into +a channel and `pop()` objects from a channel. They can be buffered or unbuffered +and it is possible to `select` from multiple channels. + +#### Syntax and Usage +There is no support for channels in the core language (yet), so all functions +are in the `sync` library. Channels must be created as `mut` objects. + +```v +mut ch := sync.new_channel(0) // unbuffered +mut ch2 := sync.new_channel(100) // buffer length 100 +``` + +Channels can be passed to coroutines like normal `mut` variables: + +```v +fn f(mut ch sync.Channel) { + ... +} + +fn main() { + ... + go f(mut ch) + ... +} +``` + +The routines `push()` and `pop()` both use *references* to objects. This way +unnecessary copies of large objects are avoided and the call to `cannel_select()` +(see below) is simpler: + +```v +n := 5 +x := 7.3 +ch.push(&n) +ch2.push(&x) + +mut m := int(0) +mut y := f64(0.0) +ch.pop(&m) +ch2.pop(&y) +``` + +The select call is somewhat tricky. The `channel_select()` function needs three arrays that +contain the channels, the directions (pop/push) and the object references and +a timeout of type `time.Duration` (or `0` to wait unlimited) as parameters. It returns the +index of the object that was pushed or popped or `-1` for timeout. + +```v +mut chans := [ch, ch2] // the channels to monitor +directions := [false, false] // `true` means push, `false` means pop +mut objs := [voidptr(&m), &y] // the objects to push or pop + +// idx contains the index of the object that was pushed or popped, -1 means timeout occured +idx := sync.channel_select(mut chans, directions, mut objs, 0) // wait unlimited +match idx { + 0 { + println('got $m') + } + 1 { + println('got $y') + } + else { + // idx = -1 + println('Timeout') + } +} +``` diff --git a/thirdparty/stdatomic/nix/atomic.h b/thirdparty/stdatomic/nix/atomic.h index c34a8237fb..886d21aded 100644 --- a/thirdparty/stdatomic/nix/atomic.h +++ b/thirdparty/stdatomic/nix/atomic.h @@ -1,8 +1,11 @@ /* Compability header for stdatomic.h that works for all compilers supported - by V. For TCC the atomic features missing are implemented using mutex locks + by V. For TCC libatomic from the operating system is used */ +#ifndef __ATOMIC_H +#define __ATOMIC_H + #ifndef __cplusplus // If C just use stdatomic.h #ifndef __TINYC__ @@ -14,41 +17,419 @@ #endif #ifdef __TINYC__ -#include -typedef intptr_t atomic_llong; -typedef intptr_t atomic_ullong; +typedef volatile long long atomic_llong; +typedef volatile unsigned long long atomic_ullong; +typedef volatile uintptr_t atomic_uintptr_t; -pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +// use functions for 64, 32 and 8 bit from libatomic directly +// since tcc is not capible to use "generic" C functions +// there is no header file for libatomic so we provide function declarations here -/* - Wrapper for TCC to use mutex locks since it lacks the atomic functions -*/ -static inline intptr_t atomic_fetch_add_explicit(intptr_t *x, size_t offset, int mo) -{ - pthread_mutex_lock(&lock); +extern unsigned long long __atomic_load_8(unsigned long long* x, int mo); +extern void __atomic_store_8(unsigned long long* x, unsigned long long y, int mo); +extern _Bool __atomic_compare_exchange_8(unsigned long long* x, unsigned long long* expected, unsigned long long y, int mo, int mo2); +extern _Bool __atomic_compare_exchange_8(unsigned long long* x, unsigned long long* expected, unsigned long long y, int mo, int mo2); +extern unsigned long long __atomic_exchange_8(unsigned long long* x, unsigned long long y, int mo); +extern unsigned long long __atomic_fetch_add_8(unsigned long long* x, unsigned long long y, int mo); +extern unsigned long long __atomic_fetch_sub_8(unsigned long long* x, unsigned long long y, int mo); +extern unsigned long long __atomic_fetch_and_8(unsigned long long* x, unsigned long long y, int mo); +extern unsigned long long __atomic_fetch_or_8(unsigned long long* x, unsigned long long y, int mo); +extern unsigned long long __atomic_fetch_xor_8(unsigned long long* x, unsigned long long y, int mo); - intptr_t old_value = *x; - *x = *x + offset; +extern unsigned int __atomic_load_4(unsigned int* x, int mo); +extern void __atomic_store_4(unsigned int* x, unsigned int y, int mo); +extern _Bool __atomic_compare_exchange_4(unsigned int* x, unsigned int* expected, unsigned int y, int mo, int mo2); +extern _Bool __atomic_compare_exchange_4(unsigned int* x, unsigned int* expected, unsigned int y, int mo, int mo2); +extern unsigned int __atomic_exchange_4(unsigned int* x, unsigned int y, int mo); +extern unsigned int __atomic_fetch_add_4(unsigned int* x, unsigned int y, int mo); +extern unsigned int __atomic_fetch_sub_4(unsigned int* x, unsigned int y, int mo); +extern unsigned int __atomic_fetch_and_4(unsigned int* x, unsigned int y, int mo); +extern unsigned int __atomic_fetch_or_4(unsigned int* x, unsigned int y, int mo); +extern unsigned int __atomic_fetch_xor_4(unsigned int* x, unsigned int y, int mo); - pthread_mutex_unlock(&lock); +extern unsigned short __atomic_load_2(unsigned short* x, int mo); +extern void __atomic_store_2(unsigned short* x, unsigned short y, int mo); +extern _Bool __atomic_compare_exchange_2(unsigned short* x, unsigned short* expected, unsigned short y, int mo, int mo2); +extern _Bool __atomic_compare_exchange_2(unsigned short* x, unsigned short* expected, unsigned short y, int mo, int mo2); +extern unsigned short __atomic_exchange_2(unsigned short* x, unsigned short y, int mo); +extern unsigned short __atomic_fetch_add_2(unsigned short* x, unsigned short y, int mo); +extern unsigned short __atomic_fetch_sub_2(unsigned short* x, unsigned short y, int mo); +extern unsigned short __atomic_fetch_and_2(unsigned short* x, unsigned short y, int mo); +extern unsigned short __atomic_fetch_or_2(unsigned short* x, unsigned short y, int mo); +extern unsigned short __atomic_fetch_xor_2(unsigned short* x, unsigned short y, int mo); - return old_value; +extern unsigned char __atomic_load_1(unsigned char* x, int mo); +extern void __atomic_store_1(unsigned char* x, unsigned char y, int mo); +extern _Bool __atomic_compare_exchange_1(unsigned char* x, unsigned char* expected, unsigned char y, int mo, int mo2); +extern _Bool __atomic_compare_exchange_1(unsigned char* x, unsigned char* expected, unsigned char y, int mo, int mo2); +extern unsigned char __atomic_exchange_1(unsigned char* x, unsigned char y, int mo); +extern unsigned char __atomic_fetch_add_1(unsigned char* x, unsigned char y, int mo); +extern unsigned char __atomic_fetch_sub_1(unsigned char* x, unsigned char y, int mo); +extern unsigned char __atomic_fetch_and_1(unsigned char* x, unsigned char y, int mo); +extern unsigned char __atomic_fetch_or_1(unsigned char* x, unsigned char y, int mo); +extern unsigned char __atomic_fetch_xor_1(unsigned char* x, unsigned char y, int mo); + +// The default functions should work with pointers so we have to decide based on pointer size +#if UINTPTR_MAX == 0xFFFFFFFF + +#define atomic_load_explicit __atomic_load_4 +#define atomic_store_explicit __atomic_store_4 +#define atomic_compare_exchange_weak_explicit __atomic_compare_exchange_4 +#define atomic_compare_exchange_strong_explicit __atomic_compare_exchange_4 +#define atomic_exchange_explicit __atomic_exchange_4 +#define atomic_fetch_add_explicit __atomic_fetch_add_4 +#define atomic_fetch_sub_explicit __atomic_sub_fetch_4 + +#else + +#define atomic_load_explicit __atomic_load_8 +#define atomic_store_explicit __atomic_store_8 +#define atomic_compare_exchange_weak_explicit __atomic_compare_exchange_8 +#define atomic_compare_exchange_strong_explicit __atomic_compare_exchange_8 +#define atomic_exchange_explicit __atomic_exchange_8 +#define atomic_fetch_add_explicit __atomic_fetch_add_8 +#define atomic_fetch_sub_explicit __atomic_sub_fetch_8 + +#endif + +// memory order policies - we use "sequentially consistent" by default + +#define memory_order_relaxed 0 +#define memory_order_consume 1 +#define memory_order_acquire 2 +#define memory_order_release 3 +#define memory_order_acq_rel 4 +#define memory_order_seq_cst 5 + +static inline uintptr_t atomic_load(uintptr_t* x) { + return atomic_load_explicit(x, memory_order_seq_cst); +} +static inline void atomic_store(uintptr_t* x, uintptr_t y) { + atomic_store_explicit(x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak(uintptr_t* x, uintptr_t* expected, uintptr_t y) { + return (int)atomic_compare_exchange_weak_explicit(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong(uintptr_t* x, uintptr_t* expected, uintptr_t y) { + return (int)atomic_compare_exchange_strong_explicit(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline uintptr_t atomic_exchange(uintptr_t* x, uintptr_t y) { + return atomic_exchange_explicit(x, y, memory_order_seq_cst); +} +static inline uintptr_t atomic_fetch_add(uintptr_t* x, uintptr_t y) { + return atomic_fetch_add_explicit(x, y, memory_order_seq_cst); +} +static inline uintptr_t atomic_fetch_sub(uintptr_t* x, uintptr_t y) { + return atomic_fetch_sub_explicit(x, y, memory_order_seq_cst); +} +static inline uintptr_t atomic_fetch_and(uintptr_t* x, uintptr_t y) { + return atomic_fetch_and_explicit(x, y, memory_order_seq_cst); +} +static inline uintptr_t atomic_fetch_or(uintptr_t* x, uintptr_t y) { + return atomic_fetch_or_explicit(x, y, memory_order_seq_cst); +} +static inline uintptr_t atomic_fetch_xor(uintptr_t* x, uintptr_t y) { + return atomic_fetch_xor_explicit(x, y, memory_order_seq_cst); } -/* - Wrapper for TCC to use mutex locks since it lacks the atomic functions -*/ -static inline intptr_t atomic_fetch_sub_explicit(intptr_t *x, size_t offset, int mo) -{ - pthread_mutex_lock(&lock); +#define atomic_load_ptr atomic_load +#define atomic_store_ptr atomic_store +#define atomic_compare_exchange_weak_ptr atomic_compare_exchange_weak +#define atomic_compare_exchange_strong_ptr atomic_compare_exchange_strong +#define atomic_exchange_ptr atomic_exchange +#define atomic_fetch_add_ptr atomic_fetch_add +#define atomic_fetch_sub_ptr atomic_fetch_sub +#define atomic_fetch_and_ptr atomic_fetch_and +#define atomic_fetch_or_ptr atomic_fetch_or +#define atomic_fetch_xor_ptr atomic_fetch_xor - intptr_t old_value = *x; - *x = *x - offset; +// specialized versions for 64 bit - pthread_mutex_unlock(&lock); +static inline unsigned long long atomic_load_u64(unsigned long long* x) { + return __atomic_load_8(x, memory_order_seq_cst); +} +static inline void atomic_store_u64(unsigned long long* x, unsigned long long y) { + __atomic_store_8(x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) { + return (int)__atomic_compare_exchange_8(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) { + return (int)__atomic_compare_exchange_8(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned long long atomic_exchange_u64(unsigned long long* x, unsigned long long y) { + return __atomic_exchange_8(x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_add_u64(unsigned long long* x, unsigned long long y) { + return __atomic_fetch_add_8(x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_sub_u64(unsigned long long* x, unsigned long long y) { + return __atomic_fetch_sub_8(x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_and_u64(unsigned long long* x, unsigned long long y) { + return __atomic_fetch_and_8(x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_or_u64(unsigned long long* x, unsigned long long y) { + return __atomic_fetch_or_8(x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_xor_u64(unsigned long long* x, unsigned long long y) { + return __atomic_fetch_xor_8(x, y, memory_order_seq_cst); +} - return old_value; +static inline unsigned atomic_load_u32(unsigned* x) { + return __atomic_load_4(x, memory_order_seq_cst); +} +static inline void atomic_store_u32(unsigned* x, unsigned y) { + __atomic_store_4(x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_u32(unsigned* x, unsigned* expected, unsigned y) { + return (int)__atomic_compare_exchange_4(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_u32(unsigned* x, unsigned* expected, unsigned y) { + return (int)__atomic_compare_exchange_4(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned atomic_exchange_u32(unsigned* x, unsigned y) { + return __atomic_exchange_4(x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_add_u32(unsigned* x, unsigned y) { + return __atomic_fetch_add_4(x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_sub_u32(unsigned* x, unsigned y) { + return __atomic_fetch_sub_4(x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_and_u32(unsigned* x, unsigned y) { + return __atomic_fetch_and_4(x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_or_u32(unsigned* x, unsigned y) { + return __atomic_fetch_or_4(x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_xor_u32(unsigned* x, unsigned y) { + return __atomic_fetch_xor_4(x, y, memory_order_seq_cst); +} + +static inline unsigned short atomic_load_u16(unsigned short* x) { + return __atomic_load_2(x, memory_order_seq_cst); +} +static inline void atomic_store_u16(unsigned short* x, unsigned short y) { + __atomic_store_2(x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_u16(unsigned short* x, unsigned short* expected, unsigned short y) { + return (int)__atomic_compare_exchange_2(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_u16(unsigned short* x, unsigned short* expected, unsigned short y) { + return (int)__atomic_compare_exchange_2(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned short atomic_exchange_u16(unsigned short* x, unsigned short y) { + return __atomic_exchange_2(x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_add_u16(unsigned short* x, unsigned short y) { + return __atomic_fetch_add_2(x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_sub_u16(unsigned short* x, unsigned short y) { + return __atomic_fetch_sub_2(x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_and_u16(unsigned short* x, unsigned short y) { + return __atomic_fetch_and_2(x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_or_u16(unsigned short* x, unsigned short y) { + return __atomic_fetch_or_2(x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_xor_u16(unsigned short* x, unsigned short y) { + return __atomic_fetch_xor_2(x, y, memory_order_seq_cst); +} + +static inline unsigned char atomic_load_byte(unsigned char* x) { + return __atomic_load_1(x, memory_order_seq_cst); +} +static inline void atomic_store_byte(unsigned char* x, unsigned char y) { + __atomic_store_1(x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_byte(unsigned char* x, unsigned char* expected, unsigned char y) { + return __atomic_compare_exchange_1(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_byte(unsigned char* x, unsigned char* expected, unsigned char y) { + return __atomic_compare_exchange_1(x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned char atomic_exchange_byte(unsigned char* x, unsigned char y) { + return __atomic_exchange_1(x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_add_byte(unsigned char* x, unsigned char y) { + return __atomic_fetch_add_1(x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_sub_byte(unsigned char* x, unsigned char y) { + return __atomic_fetch_sub_1(x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_and_byte(unsigned char* x, unsigned char y) { + return __atomic_fetch_and_1(x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_or_byte(unsigned char* x, unsigned char y) { + return __atomic_fetch_or_1(x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_xor_byte(unsigned char* x, unsigned char y) { + return __atomic_fetch_xor_1(x, y, memory_order_seq_cst); +} + +#else + +// Since V might be confused with "generic" C functions either we provide special versions +// for gcc/clang, too +static inline unsigned long long atomic_load_u64(unsigned long long* x) { + return atomic_load_explicit((_Atomic unsigned long long*)x, memory_order_seq_cst); +} +static inline void atomic_store_u64(unsigned long long* x, unsigned long long y) { + atomic_store_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) { + return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned long long*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) { + return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned long long*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned long long atomic_exchange_u64(unsigned long long* x, unsigned long long y) { + return atomic_exchange_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_add_u64(unsigned long long* x, unsigned long long y) { + return atomic_fetch_add_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_sub_u64(unsigned long long* x, unsigned long long y) { + return atomic_fetch_sub_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_and_u64(unsigned long long* x, unsigned long long y) { + return atomic_fetch_and_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_or_u64(unsigned long long* x, unsigned long long y) { + return atomic_fetch_or_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} +static inline unsigned long long atomic_fetch_xor_u64(unsigned long long* x, unsigned long long y) { + return atomic_fetch_xor_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst); +} + + +static inline void* atomic_load_ptr(void** x) { + return atomic_load_explicit((_Atomic uintptr_t*)x, memory_order_seq_cst); +} +static inline void atomic_store_ptr(void** x, void* y) { + atomic_store_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_ptr(void** x, void** expected, void* y) { + return (int)atomic_compare_exchange_weak_explicit((_Atomic uintptr_t*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_ptr(void** x, void** expected, void* y) { + return (int)atomic_compare_exchange_strong_explicit((_Atomic uintptr_t*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline void* atomic_exchange_ptr(void** x, void* y) { + return atomic_exchange_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} +static inline void* atomic_fetch_add_ptr(void** x, void* y) { + return atomic_fetch_add_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} +static inline void* atomic_fetch_sub_ptr(void** x, void* y) { + return atomic_fetch_sub_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} +static inline void* atomic_fetch_and_ptr(void** x, void* y) { + return atomic_fetch_and_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} +static inline void* atomic_fetch_or_ptr(void** x, void* y) { + return atomic_fetch_or_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} +static inline void* atomic_fetch_xor_ptr(void** x, void* y) { + return atomic_fetch_xor_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst); +} + + +static inline unsigned atomic_load_u32(unsigned* x) { + return atomic_load_explicit((_Atomic unsigned*)x, memory_order_seq_cst); +} +static inline void atomic_store_u32(unsigned* x, unsigned y) { + atomic_store_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_u32(unsigned* x, unsigned* expected, unsigned y) { + return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_u32(unsigned* x, unsigned* expected, unsigned y) { + return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned atomic_exchange_u32(unsigned* x, unsigned y) { + return atomic_exchange_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_add_u32(unsigned* x, unsigned y) { + return atomic_fetch_add_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_sub_u32(unsigned* x, unsigned y) { + return atomic_fetch_sub_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_and_u32(unsigned* x, unsigned y) { + return atomic_fetch_and_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_or_u32(unsigned* x, unsigned y) { + return atomic_fetch_or_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} +static inline unsigned atomic_fetch_xor_u32(unsigned* x, unsigned y) { + return atomic_fetch_xor_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst); +} + +static inline unsigned short atomic_load_u16(unsigned short* x) { + return atomic_load_explicit((_Atomic unsigned short*)x, memory_order_seq_cst); +} +static inline void atomic_store_u16(unsigned short* x, unsigned short y) { + atomic_store_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_u16(unsigned short* x, unsigned short* expected, unsigned short y) { + return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned short*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_u16(unsigned short* x, unsigned short* expected, unsigned short y) { + return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned short*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned short atomic_exchange_u16(unsigned short* x, unsigned short y) { + return atomic_exchange_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_add_u16(unsigned short* x, unsigned short y) { + return atomic_fetch_add_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_sub_u16(unsigned short* x, unsigned short y) { + return atomic_fetch_sub_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_and_u16(unsigned short* x, unsigned short y) { + return atomic_fetch_and_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_or_u16(unsigned short* x, unsigned short y) { + return atomic_fetch_or_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} +static inline unsigned short atomic_fetch_xor_u16(unsigned short* x, unsigned short y) { + return atomic_fetch_xor_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst); +} + +static inline unsigned char atomic_load_byte(unsigned char* x) { + return atomic_load_explicit((_Atomic unsigned char*)x, memory_order_seq_cst); +} +static inline void atomic_store_byte(unsigned char* x, unsigned char y) { + atomic_store_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_weak_byte(unsigned char* x, unsigned char* expected, unsigned char y) { + return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned char*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline int atomic_compare_exchange_strong_byte(unsigned char* x, unsigned char* expected, unsigned char y) { + return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned char*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst); +} +static inline unsigned char atomic_exchange_byte(unsigned char* x, unsigned char y) { + return atomic_exchange_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_add_byte(unsigned char* x, unsigned char y) { + return atomic_fetch_add_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_sub_byte(unsigned char* x, unsigned char y) { + return atomic_fetch_sub_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_and_byte(unsigned char* x, unsigned char y) { + return atomic_fetch_and_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_or_byte(unsigned char* x, unsigned char y) { + return atomic_fetch_or_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); +} +static inline unsigned char atomic_fetch_xor_byte(unsigned char* x, unsigned char y) { + return atomic_fetch_xor_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst); } #endif +#endif diff --git a/thirdparty/stdatomic/win/atomic.h b/thirdparty/stdatomic/win/atomic.h index 0a1a283f40..698a51c495 100644 --- a/thirdparty/stdatomic/win/atomic.h +++ b/thirdparty/stdatomic/win/atomic.h @@ -100,8 +100,32 @@ __CRT_INLINE LONGLONG _InterlockedExchangeAdd64(LONGLONG volatile *Addend, LONGL return Old; } +__CRT_INLINE LONG _InterlockedExchangeAdd(LONG volatile *Addend, LONG Value) +{ + LONG Old; + do + { + Old = *Addend; + } while (InterlockedCompareExchange(Addend, Old + Value, Old) != Old); + return Old; +} + +__CRT_INLINE SHORT _InterlockedExchangeAdd16(SHORT volatile *Addend, SHORT Value) +{ + SHORT Old; + do + { + Old = *Addend; + } while (InterlockedCompareExchange16(Addend, Old + Value, Old) != Old); + return Old; +} + #define InterlockedIncrement64 _InterlockedExchangeAdd64 +__CRT_INLINE VOID __faststorefence() { + __asm__ __volatile__ ("sfence"); +} + #endif #define atomic_store(object, desired) \ @@ -121,7 +145,7 @@ __CRT_INLINE LONGLONG _InterlockedExchangeAdd64(LONGLONG volatile *Addend, LONGL atomic_load(object) #define atomic_exchange(object, desired) \ - InterlockedExchangePointer(object, desired); + InterlockedExchangePointer(object, desired) #define atomic_exchange_explicit(object, desired, order) \ atomic_exchange(object, desired) @@ -177,6 +201,145 @@ static inline int atomic_compare_exchange_strong(intptr_t *object, intptr_t *exp InterlockedAnd(object, operand) #endif /* _WIN64 */ +/* specialized versions with explicit object size */ + +#define atomic_load_ptr atomic_load +#define atomic_store_ptr atomic_store +#define atomic_compare_exchange_weak_ptr atomic_compare_exchange_weak +#define atomic_compare_exchange_strong_ptr atomic_compare_exchange_strong +#define atomic_exchange_ptr atomic_exchange +#define atomic_fetch_add_ptr atomic_fetch_add +#define atomic_fetch_sub_ptr atomic_fetch_sub +#define atomic_fetch_and_ptr atomic_fetch_and +#define atomic_fetch_or_ptr atomic_fetch_or +#define atomic_fetch_xor_ptr atomic_fetch_xor + +static inline void atomic_store_u64(unsigned long long* object, unsigned long long desired) { + do { + *(object) = (desired); + MemoryBarrier(); + } while (0); +} + +static inline unsigned long long atomic_load_u64(unsigned long long* object) { + return (MemoryBarrier(), *(object)); +} + +#define atomic_exchange_u64(object, desired) \ + InterlockedExchange64(object, desired) + +static inline int atomic_compare_exchange_strong_u64(unsigned long long* object, unsigned long long* expected, + unsigned long long desired) +{ + unsigned long long old = *expected; + *expected = InterlockedCompareExchange64(object, desired, old); + return *expected == old; +} + +#define atomic_compare_exchange_weak_u64(object, expected, desired) \ + atomic_compare_exchange_strong_u64(object, expected, desired) + +#define atomic_fetch_add_u64(object, operand) \ + InterlockedExchangeAdd64(object, operand) + +#define atomic_fetch_sub_u64(object, operand) \ + InterlockedExchangeAdd64(object, -(operand)) + +#define atomic_fetch_or_u64(object, operand) \ + InterlockedOr64(object, operand) + +#define atomic_fetch_xor_u64(object, operand) \ + InterlockedXor64(object, operand) + +#define atomic_fetch_and_u64(object, operand) \ + InterlockedAnd64(object, operand) + + + +static inline void atomic_store_u32(unsigned* object, unsigned desired) { + do { + *(object) = (desired); + MemoryBarrier(); + } while (0); +} + +static inline unsigned atomic_load_u32(unsigned* object) { + return (MemoryBarrier(), *(object)); +} + +#define atomic_exchange_u32(object, desired) \ + InterlockedExchange(object, desired) + +static inline int atomic_compare_exchange_strong_u32(unsigned* object, unsigned* expected, + unsigned desired) +{ + unsigned old = *expected; + *expected = InterlockedCompareExchange(object, desired, old); + return *expected == old; +} + +#define atomic_compare_exchange_weak_u32(object, expected, desired) \ + atomic_compare_exchange_strong_u32(object, expected, desired) + +#define atomic_fetch_add_u32(object, operand) \ + InterlockedExchangeAdd(object, operand) + +#define atomic_fetch_sub_u32(object, operand) \ + InterlockedExchangeAdd(object, -(operand)) + +#define atomic_fetch_or_u32(object, operand) \ + InterlockedOr(object, operand) + +#define atomic_fetch_xor_u32(object, operand) \ + InterlockedXor(object, operand) + +#define atomic_fetch_and_u32(object, operand) \ + InterlockedAnd(object, operand) + + + +static inline void atomic_store_u16(unsigned short* object, unsigned short desired) { + do { + *(object) = (desired); + MemoryBarrier(); + } while (0); +} + +static inline unsigned short atomic_load_u16(unsigned short* object) { + return (MemoryBarrier(), *(object)); +} + +#define atomic_exchange_u16(object, desired) \ + InterlockedExchange16(object, desired) + +static inline int atomic_compare_exchange_strong_u16(unsigned short* object, unsigned short* expected, + unsigned short desired) +{ + unsigned short old = *expected; + *expected = InterlockedCompareExchange16(object, desired, old); + return *expected == old; +} + +#define atomic_compare_exchange_weak_u16(object, expected, desired) \ + atomic_compare_exchange_strong_u16(object, expected, desired) + +#define atomic_fetch_add_u16(object, operand) \ + InterlockedExchangeAdd16(object, operand) + +#define atomic_fetch_sub_u16(object, operand) \ + InterlockedExchangeAdd16(object, -(operand)) + +#define atomic_fetch_or_u16(object, operand) \ + InterlockedOr16(object, operand) + +#define atomic_fetch_xor_u16(object, operand) \ + InterlockedXor16(object, operand) + +#define atomic_fetch_and_u16(object, operand) \ + InterlockedAnd16(object, operand) + + + #define atomic_fetch_add_explicit(object, operand, order) \ atomic_fetch_add(object, operand) @@ -204,4 +367,4 @@ static inline int atomic_compare_exchange_strong(intptr_t *object, intptr_t *exp #define atomic_flag_clear_explicit(object, order) \ atomic_flag_clear(object) -#endif /* COMPAT_ATOMICS_WIN32_STDATOMIC_H */ \ No newline at end of file +#endif /* COMPAT_ATOMICS_WIN32_STDATOMIC_H */ diff --git a/vlib/builtin/cfns.c.v b/vlib/builtin/cfns.c.v index acdc56268a..34fd045655 100644 --- a/vlib/builtin/cfns.c.v +++ b/vlib/builtin/cfns.c.v @@ -348,7 +348,7 @@ fn C.MAKELANGID() int fn C.FormatMessage() voidptr -fn C.CloseHandle() +fn C.CloseHandle(voidptr) int fn C.GetExitCodeProcess() @@ -411,6 +411,7 @@ fn C.ReleaseSRWLockExclusive(voidptr) fn C.pthread_mutex_init(voidptr, voidptr) int fn C.pthread_mutex_lock(voidptr) int fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int fn C.pthread_rwlockattr_init(voidptr) int fn C.pthread_rwlockattr_setkind_np(voidptr, int) int @@ -422,16 +423,19 @@ fn C.pthread_rwlock_unlock(voidptr) int fn C.pthread_condattr_init(voidptr) int fn C.pthread_condattr_setpshared(voidptr, int) int +fn C.pthread_condattr_destroy(voidptr) int fn C.pthread_cond_init(voidptr, voidptr) int fn C.pthread_cond_signal(voidptr) int fn C.pthread_cond_wait(voidptr, voidptr) int fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int +fn C.pthread_cond_destroy(voidptr) int fn C.sem_init(voidptr, int, u32) int fn C.sem_post(voidptr) int fn C.sem_wait(voidptr) int fn C.sem_trywait(voidptr) int fn C.sem_timedwait(voidptr, voidptr) int +fn C.sem_destroy(voidptr) int fn C.read(fd int, buf voidptr, count size_t) int fn C.write(fd int, buf voidptr, count size_t) int diff --git a/vlib/sync/atomic2/atomic.v b/vlib/sync/atomic2/atomic.v index e1067edacd..e427d8a79a 100644 --- a/vlib/sync/atomic2/atomic.v +++ b/vlib/sync/atomic2/atomic.v @@ -17,10 +17,17 @@ further tested. #flag freebsd -I @VROOT/thirdparty/stdatomic/nix #flag solaris -I @VROOT/thirdparty/stdatomic/nix -#include "atomic.h" +$if linux { + $if tinyc { + // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir + #flag -L/usr/lib/gcc/x86_64-linux-gnu/8 -L/usr/lib/gcc/x86_64-linux-gnu/9 -latomic + } +} -fn C.atomic_fetch_add_explicit() int -fn C.atomic_fetch_sub_explicit() int +#include + +fn C.atomic_fetch_add_explicit(voidptr, i64) i64 +fn C.atomic_fetch_sub_explicit(voidptr, i64) i64 [typedef] struct C.atomic_ullong { diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v new file mode 100644 index 0000000000..dd7953a5be --- /dev/null +++ b/vlib/sync/channels.v @@ -0,0 +1,505 @@ +module sync + +import time +import rand + +#flag windows -I @VROOT/thirdparty/stdatomic/win +#flag linux -I @VROOT/thirdparty/stdatomic/nix +#flag darwin -I @VROOT/thirdparty/stdatomic/nix +#flag freebsd -I @VROOT/thirdparty/stdatomic/nix +#flag solaris -I @VROOT/thirdparty/stdatomic/nix + +$if linux { + $if tinyc { + // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir + #flag -L/usr/lib/gcc/x86_64-linux-gnu/8 -L/usr/lib/gcc/x86_64-linux-gnu/9 -latomic + } +} + +#include + +// the following functions are actually generic in C +fn C.atomic_load_ptr(voidptr) voidptr +fn C.atomic_store_ptr(voidptr, voidptr) +fn C.atomic_compare_exchange_weak_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_compare_exchange_strong_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_exchange_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_add_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_sub_ptr(voidptr, voidptr) voidptr + +fn C.atomic_load_u16(voidptr) u16 +fn C.atomic_store_u16(voidptr, u16) +fn C.atomic_compare_exchange_weak_u16(voidptr, voidptr, u16) bool +fn C.atomic_compare_exchange_strong_u16(voidptr, voidptr, u16) bool +fn C.atomic_exchange_u16(voidptr, u16) u16 +fn C.atomic_fetch_add_u16(voidptr, u16) u16 +fn C.atomic_fetch_sub_u16(voidptr, u16) u16 + +fn C.atomic_load_u32(voidptr) u32 +fn C.atomic_store_u32(voidptr, u32) +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool +fn C.atomic_compare_exchange_strong_u32(voidptr, voidptr, u32) bool +fn C.atomic_exchange_u32(voidptr, u32) u32 +fn C.atomic_fetch_add_u32(voidptr, u32) u32 +fn C.atomic_fetch_sub_u32(voidptr, u32) u32 + +fn C.atomic_load_u64(voidptr) u64 +fn C.atomic_store_u64(voidptr, u64) +fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool +fn C.atomic_compare_exchange_strong_u64(voidptr, voidptr, u64) bool +fn C.atomic_exchange_u64(voidptr, u64) u64 +fn C.atomic_fetch_add_u64(voidptr, u64) u64 +fn C.atomic_fetch_sub_u64(voidptr, u64) u64 + +const ( + // how often to try to get data without blocking before to wait for semaphore + spinloops = 750 + spinloops_sem = 4000 +) + +enum BufferElemStat { + unused = 0 + writing + written + reading +} + +struct Subscription { +mut: + sem Semaphore + prev &&Subscription + nxt &Subscription +} + +struct Channel { + writesem Semaphore // to wake thread that wanted to write, but buffer was full + readsem Semaphore // to wake thread that wanted to read, but buffer was empty + writesem_im Semaphore + readsem_im Semaphore + ringbuf byteptr // queue for buffered channels + statusbuf byteptr // flags to synchronize write/read in ringbuf + objsize u32 + queue_length u32 // in #objects +mut: // atomic + write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait + read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait + adr_read C.atomic_uintptr_t // used to identify origin of writesem + adr_written C.atomic_uintptr_t // used to identify origin of readsem + write_free u32 // for queue state + read_avail u32 + buf_elem_write_idx u32 + buf_elem_read_idx u32 + // for select + write_subscriber &Subscription + read_subscriber &Subscription + write_sub_mtx u16 + read_sub_mtx u16 +} + +pub fn new_channel(n u32) &Channel { + return &Channel{ + writesem: new_semaphore_init(if n > 0 { n + 1 } else { 1 }) + readsem: new_semaphore_init(if n > 0 { u32(0) } else { 1 }) + writesem_im: new_semaphore() + readsem_im: new_semaphore() + objsize: sizeof(T) + queue_length: n + write_free: n + read_avail: 0 + ringbuf: if n > 0 { malloc(int(n * sizeof(T))) } else { byteptr(0) } + statusbuf: if n > 0 { vcalloc(int(n * sizeof(u16))) } else { byteptr(0) } + write_subscriber: 0 + read_subscriber: 0 + } +} + +pub fn (mut ch Channel) push(src voidptr) { + ch.try_push(src, false) +} + +fn (mut ch Channel) try_push(src voidptr, no_block bool) bool { + spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem } + mut have_swapped := false + for { + mut got_sem := false + mut wradr := C.atomic_load_ptr(&ch.write_adr) + for wradr != C.NULL { + if C.atomic_compare_exchange_strong_ptr(&ch.write_adr, &wradr, voidptr(0)) { + // there is a reader waiting for us + unsafe { C.memcpy(wradr, src, ch.objsize) } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(&ch.adr_written, &nulladr, wradr) { + nulladr = voidptr(0) + } + ch.readsem_im.post() + return true + } + } + if no_block && ch.queue_length == 0 { + return false + } + // get token to read + for _ in 0 .. spinloops_sem_ { + if got_sem { + break + } + got_sem = ch.writesem.try_wait() + } + if !got_sem { + ch.writesem.wait() + } + if ch.queue_length == 0 { + // try to advertise current object as readable + mut read_in_progress := false + C.atomic_store_ptr(&ch.read_adr, src) + wradr = C.atomic_load_ptr(&ch.write_adr) + if wradr != C.NULL { + mut src2 := src + if C.atomic_compare_exchange_strong_ptr(&ch.read_adr, &src2, voidptr(0)) { + ch.writesem.post() + continue + } else { + read_in_progress = true + } + } + if !read_in_progress { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + mut src2 := src + for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { + if C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) { + have_swapped = true + read_in_progress = true + break + } + src2 = src + } + mut got_im_sem := false + for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ { + got_im_sem = ch.writesem_im.try_wait() + if got_im_sem { + break + } + } + for { + if got_im_sem { + got_im_sem = false + } else { + ch.writesem_im.wait() + } + if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) { + ch.writesem.post() + break + } else { + // this semaphore was not for us - repost in + ch.writesem_im.post() + src2 = src + } + } + return true + } else { + // buffered channel + mut space_in_queue := false + mut wr_free := C.atomic_load_u32(&ch.write_free) + for wr_free > 0 { + space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, wr_free-1) + if space_in_queue { + break + } + } + if space_in_queue { + mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx) + for { + mut new_wr_idx := wr_idx + 1 + for new_wr_idx >= ch.queue_length { + new_wr_idx -= ch.queue_length + } + if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, new_wr_idx) { + break + } + } + mut wr_ptr := ch.ringbuf + mut status_adr := ch.statusbuf + unsafe { + wr_ptr += wr_idx * ch.objsize + status_adr += wr_idx * sizeof(u16) + } + mut expected_status := u16(BufferElemStat.unused) + for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, u16(BufferElemStat.writing)) { + expected_status = u16(BufferElemStat.unused) + } + unsafe { + C.memcpy(wr_ptr, src, ch.objsize) + } + C.atomic_store_u16(status_adr, u16(BufferElemStat.written)) + old_read_avail := C.atomic_fetch_add_u32(&ch.read_avail, 1) + ch.readsem.post() + if old_read_avail == 0 { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + return true + } else { + ch.writesem.post() + } + } + } +} + +pub fn (mut ch Channel) pop(dest voidptr) { + ch.try_pop(dest, false) +} + +fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { + spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem } + mut have_swapped := false + mut write_in_progress := false + for { + mut got_sem := false + if ch.queue_length == 0 { + // unbuffered channel - first see if a `push()` has adversized + mut rdadr := C.atomic_load_ptr(&ch.read_adr) + for rdadr != C.NULL { + if C.atomic_compare_exchange_strong_ptr(&ch.read_adr, &rdadr, voidptr(0)) { + // there is a writer waiting for us + unsafe { C.memcpy(dest, rdadr, ch.objsize) } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(&ch.adr_read, &nulladr, rdadr) { + nulladr = voidptr(0) + } + ch.writesem_im.post() + return true + } + } + if no_block { + return false + } + } + // get token to read + for _ in 0 .. spinloops_sem_ { + if got_sem { + break + } + got_sem = ch.readsem.try_wait() + } + if !got_sem { + if no_block { + return false + } + ch.readsem.wait() + } + if ch.queue_length > 0 { + // try to get buffer token + mut obj_in_queue := false + mut rd_avail := C.atomic_load_u32(&ch.read_avail) + for rd_avail > 0 { + obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, rd_avail-1) + if obj_in_queue { + break + } + } + if obj_in_queue { + mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx) + for { + mut new_rd_idx := rd_idx + 1 + for new_rd_idx >= ch.queue_length { + new_rd_idx -= ch.queue_length + } + if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, new_rd_idx) { + break + } + } + mut rd_ptr := ch.ringbuf + mut status_adr := ch.statusbuf + unsafe { + rd_ptr += rd_idx * ch.objsize + status_adr += rd_idx * sizeof(u16) + } + mut expected_status := u16(BufferElemStat.written) + for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, u16(BufferElemStat.reading)) { + expected_status = u16(BufferElemStat.written) + } + unsafe { + C.memcpy(dest, rd_ptr, ch.objsize) + } + C.atomic_store_u16(status_adr, u16(BufferElemStat.unused)) + old_write_free := C.atomic_fetch_add_u32(&ch.write_free, 1) + ch.writesem.post() + if old_write_free == 0 { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } + return true + } + } + // try to advertise `dest` as writable + C.atomic_store_ptr(&ch.write_adr, dest) + if ch.queue_length == 0 { + mut rdadr := C.atomic_load_ptr(&ch.read_adr) + if rdadr != C.NULL { + mut dest2 := dest + if C.atomic_compare_exchange_strong_ptr(&ch.write_adr, &dest2, voidptr(0)) { + ch.readsem.post() + continue + } else { + write_in_progress = true + } + } + } + if ch.queue_length == 0 && !write_in_progress { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } + mut dest2 := dest + for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { + if C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) { + have_swapped = true + break + } + dest2 = dest + } + mut got_im_sem := false + for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ { + got_im_sem = ch.readsem_im.try_wait() + if got_im_sem { + break + } + } + for { + if got_im_sem { + got_im_sem = false + } else { + ch.readsem_im.wait() + } + if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) { + ch.readsem.post() + break + } else { + // this semaphore was not for us - repost in + ch.readsem_im.post() + dest2 = dest + } + } + return true + } +} + +// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`) +// object referenced by `objrefs[i]`. `timeout = 0` means wait unlimited time + +pub fn channel_select(mut channels []&Channel, is_push []bool, mut objrefs []voidptr, timeout time.Duration) int { + assert channels.len == is_push.len + assert is_push.len == objrefs.len + mut subscr := []Subscription{len: channels.len} + sem := new_semaphore() + for i, ch in channels { + if is_push[i] { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].sem = sem + subscr[i].prev = &ch.write_subscriber + subscr[i].nxt = C.atomic_exchange_ptr(&ch.write_subscriber, &subscr[i]) + if voidptr(subscr[i].nxt) != voidptr(0) { + subscr[i].nxt.prev = &subscr[i] + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } else { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].sem = sem + subscr[i].prev = &ch.read_subscriber + subscr[i].nxt = C.atomic_exchange_ptr(&ch.read_subscriber, &subscr[i]) + if voidptr(subscr[i].nxt) != voidptr(0) { + subscr[i].nxt.prev = &subscr[i] + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + } + stopwatch := if timeout == 0 { time.StopWatch{} } else { time.new_stopwatch({}) } + mut event_idx := -1 // negative index means `timed out` + for { + rnd := rand.u32_in_range(0, u32(channels.len)) + for j, _ in channels { + mut i := j + int(rnd) + if i >= channels.len { + i -= channels.len + } + if is_push[i] { + if channels[i].try_push(objrefs[i], true) { + event_idx = i + goto restore + } + } else { + if channels[i].try_pop(objrefs[i], true) { + event_idx = i + goto restore + } + } + } + if timeout > 0 { + remaining := timeout - stopwatch.elapsed() + if !sem.timed_wait(remaining) { + goto restore + } + } else { + sem.wait() + } + } +restore: + // reset subscribers + for i, ch in channels { + if is_push[i] { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].prev = subscr[i].nxt + if subscr[i].nxt != 0 { + // just in case we have missed a semaphore during restore + subscr[i].nxt.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } else { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].prev = subscr[i].nxt + if subscr[i].nxt != 0 { + subscr[i].nxt.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + } + sem.destroy() + return event_idx +} diff --git a/vlib/sync/sync_nix.c.v b/vlib/sync/sync_nix.c.v index 027e4dbf88..67df5a5c5b 100644 --- a/vlib/sync/sync_nix.c.v +++ b/vlib/sync/sync_nix.c.v @@ -26,11 +26,13 @@ struct RwMutexAttr { /* MacOSX has no unnamed semaphores and no `timed_wait()` at all so we emulate the behaviour with other devices */ +[ref_only] struct MacOSX_Semaphore { mtx C.pthread_mutex_t cond C.pthread_cond_t + attr C.pthread_condattr_t mut: - count int + count u32 } [ref_only] @@ -38,19 +40,8 @@ struct PosixSemaphore { sem C.sem_t } -[ref_only] -struct CondAttr { - attr C.pthread_condattr_t -} - pub struct Semaphore { - /* - $if macos { - sem &MacOSX_Semaphore - } $else { - sem &PosixSemaphore - } - */ +mut: sem voidptr // since the above does not work, yet } @@ -99,22 +90,26 @@ pub fn (mut m RwMutex) w_unlock() { C.pthread_rwlock_unlock(&m.mutex) } +[inline] pub fn new_semaphore() Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) Semaphore { $if macos { s := Semaphore{ - sem: &MacOSX_Semaphore{count: 0} + sem: &MacOSX_Semaphore{count: n} } C.pthread_mutex_init(&&MacOSX_Semaphore(s.sem).mtx, C.NULL) - a := &CondAttr{} - C.pthread_condattr_init(&a.attr) - C.pthread_condattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) - C.pthread_cond_init(&&MacOSX_Semaphore(s.sem).cond, &a.attr) + C.pthread_condattr_init(&&MacOSX_Semaphore(s.sem).attr) + C.pthread_condattr_setpshared(&&MacOSX_Semaphore(s.sem).attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_cond_init(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).attr) return s } $else { s := Semaphore{ sem: &PosixSemaphore{} } - unsafe { C.sem_init(&&PosixSemaphore(s.sem).sem, 0, 0) } + unsafe { C.sem_init(&&PosixSemaphore(s.sem).sem, 0, n) } return s } } @@ -186,3 +181,13 @@ pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { return unsafe { C.sem_timedwait(&&PosixSemaphore(s.sem).sem, &t_spec) == 0 } } } + +pub fn (s Semaphore) destroy() bool { + $if macos { + return C.pthread_cond_destroy(&&MacOSX_Semaphore(s.sem).cond) == 0 && + C.pthread_condattr_destroy(&&MacOSX_Semaphore(s.sem).attr) == 0 && + C.pthread_mutex_destroy(&&MacOSX_Semaphore(s.sem).mtx) == 0 + } $else { + return unsafe { C.sem_destroy(&&PosixSemaphore(s.sem).sem) == 0 } + } +} diff --git a/vlib/sync/sync_windows.c.v b/vlib/sync/sync_windows.c.v index 9574fa52db..d7a2ee325e 100644 --- a/vlib/sync/sync_windows.c.v +++ b/vlib/sync/sync_windows.c.v @@ -128,9 +128,14 @@ pub fn (mut m Mutex) destroy() { m.state = .destroyed // setting up reference to invalid state } +[inline] pub fn new_semaphore() Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) Semaphore { return Semaphore{ - sem: SHANDLE(C.CreateSemaphore(0, 0, C.INT32_MAX, 0)) + sem: SHANDLE(C.CreateSemaphore(0, n, C.INT32_MAX, 0)) } } @@ -149,3 +154,7 @@ pub fn (s Semaphore) try_wait() bool { pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { return C.WaitForSingleObject(s.sem, timeout / time.millisecond) == 0 } + +pub fn (s Semaphore) destroy() bool { + return C.CloseHandle(s.sem) != 0 +} diff --git a/vlib/v/tests/bench/channel_bench_go.go b/vlib/v/tests/bench/channel_bench_go.go new file mode 100644 index 0000000000..a0afbbcee1 --- /dev/null +++ b/vlib/v/tests/bench/channel_bench_go.go @@ -0,0 +1,68 @@ +package main + +import "fmt" +import "log" +import "os" +import "time" +import "strconv" + +func assert_eq(a, b int64) { + if a != b { + log.Fatalf("assertion failed\nleft: %d, right: %d\n", a, b) + } +} + +func do_rec(ch chan int32, resch chan int64, n int32) { + var sum int64 + var i int32 + for i = 0; i < n; i++ { + sum += int64(<- ch) + } + fmt.Println(sum) + resch <- sum +} + +func do_send(ch chan int32, start, end int32) { + for i := start; i < end; i++ { + ch <- i + } +} + +func main() { + if len(os.Args) != 5 { + log.Fatalf("usage:\n\t%s \n", os.Args[0]) + } + nsend, _ := strconv.Atoi(os.Args[1]) + nrec, _ := strconv.Atoi(os.Args[2]) + buflen, _ := strconv.Atoi(os.Args[3]) + nobj, _ := strconv.Atoi(os.Args[4]) + stopwatch := time.Now() + ch := make(chan int32, buflen) + resch := make(chan int64, 0) + no := nobj + for i := 0; i < nrec; i++ { + n := no / (nrec - i) + go do_rec(ch, resch, int32(n)) + no -= n + } + assert_eq(int64(no), 0) + no = nobj + for i := 0; i < nsend; i++ { + n := no / (nsend - i) + end := no + no -= n + go do_send(ch, int32(no), int32(end)) + } + assert_eq(int64(no), 0) + var sum int64 + for i := 0; i < nrec; i++ { + sum += <-resch + } + elapsed := time.Now().Sub(stopwatch) + rate := float64(nobj)/float64(elapsed.Nanoseconds())*1000.0 + duration := 1.0e-09 * float64(elapsed.Nanoseconds()) + fmt.Printf("%d objects in %g s (%.2f objs/µs)\n", nobj, duration, rate) + expected_sum := int64(nobj)*int64(nobj-1)/2 + fmt.Printf("got: %d, expected: %d\n", sum, expected_sum) + assert_eq(sum, expected_sum) +} diff --git a/vlib/v/tests/bench/channel_bench_v.v b/vlib/v/tests/bench/channel_bench_v.v new file mode 100644 index 0000000000..f390bfdb84 --- /dev/null +++ b/vlib/v/tests/bench/channel_bench_v.v @@ -0,0 +1,70 @@ +// Channel Benchmark +// +// `nobj` integers are sent thru a channel with queue length`buflen` +// using `nsend` sender threads and `nrec` receiver threads. +// +// The receive threads add all received numbers and send them to the +// main thread where the total sum is compare to the expected value. + +import sync +import time +import os + +fn do_rec(mut ch sync.Channel, mut resch sync.Channel, n int) { + mut sum := i64(0) + for _ in 0 .. n { + mut a := 0 + ch.pop(&a) + sum += a + } + println(sum) + resch.push(&sum) +} + +fn do_send(mut ch sync.Channel, start, end int) { + for i in start .. end { + ch.push(&i) + } +} + +fn main() { + if os.args.len != 5 { + eprintln('usage:\n\t${os.args[0]} ') + exit(1) + } + nsend := os.args[1].int() + nrec := os.args[2].int() + buflen := os.args[3].int() + nobj := os.args[4].int() + stopwatch := time.new_stopwatch({}) + mut ch := sync.new_channel(buflen) + mut resch := sync.new_channel(0) + mut no := nobj + for i in 0 .. nrec { + n := no / (nrec - i) + go do_rec(mut ch, mut resch, n) + no -= n + } + assert no == 0 + no = nobj + for i in 0 .. nsend { + n := no / (nsend - i) + end := no + no -= n + go do_send(mut ch, no, end) + } + assert no == 0 + mut sum := i64(0) + for _ in 0 .. nrec { + mut r := i64(0) + resch.pop(&r) + sum += r + } + elapsed := stopwatch.elapsed() + rate := f64(nobj)/elapsed*time.microsecond + println('$nobj objects in ${f64(elapsed)/time.second} s (${rate:.2f} objs/µs)') + // use sum formula by Gauß to calculate the expected result + expected_sum := i64(nobj)*(nobj-1)/2 + println('got: $sum, expected: $expected_sum') + assert sum == expected_sum +} diff --git a/vlib/v/tests/channel_1_test.v b/vlib/v/tests/channel_1_test.v new file mode 100644 index 0000000000..8fe7c9b5d8 --- /dev/null +++ b/vlib/v/tests/channel_1_test.v @@ -0,0 +1,23 @@ +import sync + +const ( + num_iterations = 10000 +) + +fn do_send(mut ch sync.Channel) { + for i in 0 .. num_iterations { + ch.push(&i) + } +} + +fn test_channel_buffered() { + mut ch := sync.new_channel(1000) + go do_send(mut ch) + mut sum := i64(0) + for _ in 0 .. num_iterations { + a := 0 + ch.pop(&a) + sum += a + } + assert sum == u64(num_iterations)*(num_iterations-1)/2 +} diff --git a/vlib/v/tests/channel_2_test.v b/vlib/v/tests/channel_2_test.v new file mode 100644 index 0000000000..be52470f81 --- /dev/null +++ b/vlib/v/tests/channel_2_test.v @@ -0,0 +1,23 @@ +import sync + +const ( + num_iterations = 10000 +) + +fn do_send(mut ch sync.Channel) { + for i in 0 .. num_iterations { + ch.push(&i) + } +} + +fn test_channel_unbuffered() { + mut ch := sync.new_channel(0) + go do_send(mut ch) + mut sum := i64(0) + for _ in 0 .. num_iterations { + a := 0 + ch.pop(&a) + sum += a + } + assert sum == u64(num_iterations)*(num_iterations-1)/2 +} diff --git a/vlib/v/tests/channel_3_test.v b/vlib/v/tests/channel_3_test.v new file mode 100644 index 0000000000..40781261f4 --- /dev/null +++ b/vlib/v/tests/channel_3_test.v @@ -0,0 +1,38 @@ +import sync + +fn do_rec(mut ch sync.Channel, mut resch sync.Channel) { + mut sum := i64(0) + for _ in 0 .. 2000 { + mut a := 0 + ch.pop(&a) + sum += a + } + println(sum) + resch.push(&sum) +} + +fn do_send(mut ch sync.Channel) { + for i in 0 .. 2000 { + ch.push(&i) + } +} + +fn test_channel_multi_unbuffered() { + mut ch := sync.new_channel(0) + mut resch := sync.new_channel(0) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_send(mut ch) + go do_send(mut ch) + go do_send(mut ch) + go do_send(mut ch) + mut sum := i64(0) + for _ in 0 .. 4 { + mut r := i64(0) + resch.pop(&r) + sum += r + } + assert sum == i64(4) * 2000 * (2000 - 1) / 2 +} diff --git a/vlib/v/tests/channel_4_test.v b/vlib/v/tests/channel_4_test.v new file mode 100644 index 0000000000..2348bcff7b --- /dev/null +++ b/vlib/v/tests/channel_4_test.v @@ -0,0 +1,38 @@ +import sync + +fn do_rec(mut ch sync.Channel, mut resch sync.Channel) { + mut sum := i64(0) + for _ in 0 .. 2000 { + mut a := 0 + ch.pop(&a) + sum += a + } + println(sum) + resch.push(&sum) +} + +fn do_send(mut ch sync.Channel) { + for i in 0 .. 2000 { + ch.push(&i) + } +} + +fn test_channel_multi_buffered() { + mut ch := sync.new_channel(100) + mut resch := sync.new_channel(0) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_send(mut ch) + go do_send(mut ch) + go do_send(mut ch) + go do_send(mut ch) + mut sum := i64(0) + for _ in 0 .. 4 { + mut r := i64(0) + resch.pop(&r) + sum += r + } + assert sum == i64(4) * 2000 * (2000 - 1) / 2 +} diff --git a/vlib/v/tests/channel_select_test.v b/vlib/v/tests/channel_select_test.v new file mode 100644 index 0000000000..05db0cbb01 --- /dev/null +++ b/vlib/v/tests/channel_select_test.v @@ -0,0 +1,76 @@ +import sync + +fn do_rec_i64(mut ch sync.Channel) { + mut sum := i64(0) + for _ in 0 .. 300 { + mut a := i64(0) + ch.pop(&a) + sum += a + } + assert sum == 300 * (300 - 1) / 2 +} + +fn do_send_int(mut ch sync.Channel) { + for i in 0 .. 300 { + ch.push(&i) + } +} + +fn do_send_byte(mut ch sync.Channel) { + for i in 0 .. 300 { + ii := byte(i) + ch.push(&ii) + } +} + +fn do_send_i64(mut ch sync.Channel) { + for i in 0 .. 300 { + ii := i64(i) + ch.push(&ii) + } +} + +fn test_select() { + mut chi := sync.new_channel(0) + mut chl := sync.new_channel(1) + mut chb := sync.new_channel(10) + mut recch := sync.new_channel(0) + go do_rec_i64(mut recch) + go do_send_int(mut chi) + go do_send_byte(mut chb) + go do_send_i64(mut chl) + mut channels := [chi, recch, chl, chb] + directions := [false, true, false, false] + mut sum := i64(0) + mut rl := i64(0) + mut ri := int(0) + mut rb := byte(0) + mut sl := i64(0) + mut objs := [voidptr(&ri), &sl, &rl, &rb] + for _ in 0 .. 1200 { + idx := sync.channel_select(mut channels, directions, mut objs, 0) + match idx { + 0 { + sum += ri + } + 1 { + sl++ + } + 2 { + sum += rl + } + 3 { + sum += rb + } + else { + println('got $idx (timeout)') + } + } + } + // Use Gauß' formula for the first 2 contributions + expected_sum := 2 * (300 * (300 - 1) / 2) + + // the 3rd contribution is `byte` and must be seen modulo 256 + 256 * (256 - 1) / 2 + + 44 * (44 - 1) / 2 + assert sum == expected_sum +}