1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

sync: channel implementation (#6074)

This commit is contained in:
Uwe Krüger 2020-08-06 15:28:19 +02:00 committed by GitHub
parent 09f1362305
commit 863cf8af60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1532 additions and 51 deletions

View File

@ -12,6 +12,7 @@ for the current state of V***
* [Weaknesses](#weaknesses)
* [Compatibility](#compatibility)
* [Automatic Lock](#automatic-lock)
* [Channels](#channels)
## Concurrency
@ -190,3 +191,73 @@ are sometimes surprising. Each statement should be seen as a single
transaction that is unrelated to the previous or following
statement. Therefore - but also for performance reasons - it's often
better to group consecutive coherent statements in an explicit `lock` block.
### Channels
Channels in V work basically like those in Go. You can `push()` objects into
a channel and `pop()` objects from a channel. They can be buffered or unbuffered
and it is possible to `select` from multiple channels.
#### Syntax and Usage
There is no support for channels in the core language (yet), so all functions
are in the `sync` library. Channels must be created as `mut` objects.
```v
mut ch := sync.new_channel<int>(0) // unbuffered
mut ch2 := sync.new_channel<f64>(100) // buffer length 100
```
Channels can be passed to coroutines like normal `mut` variables:
```v
fn f(mut ch sync.Channel) {
...
}
fn main() {
...
go f(mut ch)
...
}
```
The routines `push()` and `pop()` both use *references* to objects. This way
unnecessary copies of large objects are avoided and the call to `cannel_select()`
(see below) is simpler:
```v
n := 5
x := 7.3
ch.push(&n)
ch2.push(&x)
mut m := int(0)
mut y := f64(0.0)
ch.pop(&m)
ch2.pop(&y)
```
The select call is somewhat tricky. The `channel_select()` function needs three arrays that
contain the channels, the directions (pop/push) and the object references and
a timeout of type `time.Duration` (or `0` to wait unlimited) as parameters. It returns the
index of the object that was pushed or popped or `-1` for timeout.
```v
mut chans := [ch, ch2] // the channels to monitor
directions := [false, false] // `true` means push, `false` means pop
mut objs := [voidptr(&m), &y] // the objects to push or pop
// idx contains the index of the object that was pushed or popped, -1 means timeout occured
idx := sync.channel_select(mut chans, directions, mut objs, 0) // wait unlimited
match idx {
0 {
println('got $m')
}
1 {
println('got $y')
}
else {
// idx = -1
println('Timeout')
}
}
```

View File

@ -1,8 +1,11 @@
/*
Compability header for stdatomic.h that works for all compilers supported
by V. For TCC the atomic features missing are implemented using mutex locks
by V. For TCC libatomic from the operating system is used
*/
#ifndef __ATOMIC_H
#define __ATOMIC_H
#ifndef __cplusplus
// If C just use stdatomic.h
#ifndef __TINYC__
@ -14,41 +17,419 @@
#endif
#ifdef __TINYC__
#include <pthread.h>
typedef intptr_t atomic_llong;
typedef intptr_t atomic_ullong;
typedef volatile long long atomic_llong;
typedef volatile unsigned long long atomic_ullong;
typedef volatile uintptr_t atomic_uintptr_t;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// use functions for 64, 32 and 8 bit from libatomic directly
// since tcc is not capible to use "generic" C functions
// there is no header file for libatomic so we provide function declarations here
/*
Wrapper for TCC to use mutex locks since it lacks the atomic functions
*/
static inline intptr_t atomic_fetch_add_explicit(intptr_t *x, size_t offset, int mo)
{
pthread_mutex_lock(&lock);
extern unsigned long long __atomic_load_8(unsigned long long* x, int mo);
extern void __atomic_store_8(unsigned long long* x, unsigned long long y, int mo);
extern _Bool __atomic_compare_exchange_8(unsigned long long* x, unsigned long long* expected, unsigned long long y, int mo, int mo2);
extern _Bool __atomic_compare_exchange_8(unsigned long long* x, unsigned long long* expected, unsigned long long y, int mo, int mo2);
extern unsigned long long __atomic_exchange_8(unsigned long long* x, unsigned long long y, int mo);
extern unsigned long long __atomic_fetch_add_8(unsigned long long* x, unsigned long long y, int mo);
extern unsigned long long __atomic_fetch_sub_8(unsigned long long* x, unsigned long long y, int mo);
extern unsigned long long __atomic_fetch_and_8(unsigned long long* x, unsigned long long y, int mo);
extern unsigned long long __atomic_fetch_or_8(unsigned long long* x, unsigned long long y, int mo);
extern unsigned long long __atomic_fetch_xor_8(unsigned long long* x, unsigned long long y, int mo);
intptr_t old_value = *x;
*x = *x + offset;
extern unsigned int __atomic_load_4(unsigned int* x, int mo);
extern void __atomic_store_4(unsigned int* x, unsigned int y, int mo);
extern _Bool __atomic_compare_exchange_4(unsigned int* x, unsigned int* expected, unsigned int y, int mo, int mo2);
extern _Bool __atomic_compare_exchange_4(unsigned int* x, unsigned int* expected, unsigned int y, int mo, int mo2);
extern unsigned int __atomic_exchange_4(unsigned int* x, unsigned int y, int mo);
extern unsigned int __atomic_fetch_add_4(unsigned int* x, unsigned int y, int mo);
extern unsigned int __atomic_fetch_sub_4(unsigned int* x, unsigned int y, int mo);
extern unsigned int __atomic_fetch_and_4(unsigned int* x, unsigned int y, int mo);
extern unsigned int __atomic_fetch_or_4(unsigned int* x, unsigned int y, int mo);
extern unsigned int __atomic_fetch_xor_4(unsigned int* x, unsigned int y, int mo);
pthread_mutex_unlock(&lock);
extern unsigned short __atomic_load_2(unsigned short* x, int mo);
extern void __atomic_store_2(unsigned short* x, unsigned short y, int mo);
extern _Bool __atomic_compare_exchange_2(unsigned short* x, unsigned short* expected, unsigned short y, int mo, int mo2);
extern _Bool __atomic_compare_exchange_2(unsigned short* x, unsigned short* expected, unsigned short y, int mo, int mo2);
extern unsigned short __atomic_exchange_2(unsigned short* x, unsigned short y, int mo);
extern unsigned short __atomic_fetch_add_2(unsigned short* x, unsigned short y, int mo);
extern unsigned short __atomic_fetch_sub_2(unsigned short* x, unsigned short y, int mo);
extern unsigned short __atomic_fetch_and_2(unsigned short* x, unsigned short y, int mo);
extern unsigned short __atomic_fetch_or_2(unsigned short* x, unsigned short y, int mo);
extern unsigned short __atomic_fetch_xor_2(unsigned short* x, unsigned short y, int mo);
return old_value;
extern unsigned char __atomic_load_1(unsigned char* x, int mo);
extern void __atomic_store_1(unsigned char* x, unsigned char y, int mo);
extern _Bool __atomic_compare_exchange_1(unsigned char* x, unsigned char* expected, unsigned char y, int mo, int mo2);
extern _Bool __atomic_compare_exchange_1(unsigned char* x, unsigned char* expected, unsigned char y, int mo, int mo2);
extern unsigned char __atomic_exchange_1(unsigned char* x, unsigned char y, int mo);
extern unsigned char __atomic_fetch_add_1(unsigned char* x, unsigned char y, int mo);
extern unsigned char __atomic_fetch_sub_1(unsigned char* x, unsigned char y, int mo);
extern unsigned char __atomic_fetch_and_1(unsigned char* x, unsigned char y, int mo);
extern unsigned char __atomic_fetch_or_1(unsigned char* x, unsigned char y, int mo);
extern unsigned char __atomic_fetch_xor_1(unsigned char* x, unsigned char y, int mo);
// The default functions should work with pointers so we have to decide based on pointer size
#if UINTPTR_MAX == 0xFFFFFFFF
#define atomic_load_explicit __atomic_load_4
#define atomic_store_explicit __atomic_store_4
#define atomic_compare_exchange_weak_explicit __atomic_compare_exchange_4
#define atomic_compare_exchange_strong_explicit __atomic_compare_exchange_4
#define atomic_exchange_explicit __atomic_exchange_4
#define atomic_fetch_add_explicit __atomic_fetch_add_4
#define atomic_fetch_sub_explicit __atomic_sub_fetch_4
#else
#define atomic_load_explicit __atomic_load_8
#define atomic_store_explicit __atomic_store_8
#define atomic_compare_exchange_weak_explicit __atomic_compare_exchange_8
#define atomic_compare_exchange_strong_explicit __atomic_compare_exchange_8
#define atomic_exchange_explicit __atomic_exchange_8
#define atomic_fetch_add_explicit __atomic_fetch_add_8
#define atomic_fetch_sub_explicit __atomic_sub_fetch_8
#endif
// memory order policies - we use "sequentially consistent" by default
#define memory_order_relaxed 0
#define memory_order_consume 1
#define memory_order_acquire 2
#define memory_order_release 3
#define memory_order_acq_rel 4
#define memory_order_seq_cst 5
static inline uintptr_t atomic_load(uintptr_t* x) {
return atomic_load_explicit(x, memory_order_seq_cst);
}
static inline void atomic_store(uintptr_t* x, uintptr_t y) {
atomic_store_explicit(x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak(uintptr_t* x, uintptr_t* expected, uintptr_t y) {
return (int)atomic_compare_exchange_weak_explicit(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong(uintptr_t* x, uintptr_t* expected, uintptr_t y) {
return (int)atomic_compare_exchange_strong_explicit(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline uintptr_t atomic_exchange(uintptr_t* x, uintptr_t y) {
return atomic_exchange_explicit(x, y, memory_order_seq_cst);
}
static inline uintptr_t atomic_fetch_add(uintptr_t* x, uintptr_t y) {
return atomic_fetch_add_explicit(x, y, memory_order_seq_cst);
}
static inline uintptr_t atomic_fetch_sub(uintptr_t* x, uintptr_t y) {
return atomic_fetch_sub_explicit(x, y, memory_order_seq_cst);
}
static inline uintptr_t atomic_fetch_and(uintptr_t* x, uintptr_t y) {
return atomic_fetch_and_explicit(x, y, memory_order_seq_cst);
}
static inline uintptr_t atomic_fetch_or(uintptr_t* x, uintptr_t y) {
return atomic_fetch_or_explicit(x, y, memory_order_seq_cst);
}
static inline uintptr_t atomic_fetch_xor(uintptr_t* x, uintptr_t y) {
return atomic_fetch_xor_explicit(x, y, memory_order_seq_cst);
}
/*
Wrapper for TCC to use mutex locks since it lacks the atomic functions
*/
static inline intptr_t atomic_fetch_sub_explicit(intptr_t *x, size_t offset, int mo)
{
pthread_mutex_lock(&lock);
#define atomic_load_ptr atomic_load
#define atomic_store_ptr atomic_store
#define atomic_compare_exchange_weak_ptr atomic_compare_exchange_weak
#define atomic_compare_exchange_strong_ptr atomic_compare_exchange_strong
#define atomic_exchange_ptr atomic_exchange
#define atomic_fetch_add_ptr atomic_fetch_add
#define atomic_fetch_sub_ptr atomic_fetch_sub
#define atomic_fetch_and_ptr atomic_fetch_and
#define atomic_fetch_or_ptr atomic_fetch_or
#define atomic_fetch_xor_ptr atomic_fetch_xor
intptr_t old_value = *x;
*x = *x - offset;
// specialized versions for 64 bit
pthread_mutex_unlock(&lock);
static inline unsigned long long atomic_load_u64(unsigned long long* x) {
return __atomic_load_8(x, memory_order_seq_cst);
}
static inline void atomic_store_u64(unsigned long long* x, unsigned long long y) {
__atomic_store_8(x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) {
return (int)__atomic_compare_exchange_8(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) {
return (int)__atomic_compare_exchange_8(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned long long atomic_exchange_u64(unsigned long long* x, unsigned long long y) {
return __atomic_exchange_8(x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_add_u64(unsigned long long* x, unsigned long long y) {
return __atomic_fetch_add_8(x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_sub_u64(unsigned long long* x, unsigned long long y) {
return __atomic_fetch_sub_8(x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_and_u64(unsigned long long* x, unsigned long long y) {
return __atomic_fetch_and_8(x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_or_u64(unsigned long long* x, unsigned long long y) {
return __atomic_fetch_or_8(x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_xor_u64(unsigned long long* x, unsigned long long y) {
return __atomic_fetch_xor_8(x, y, memory_order_seq_cst);
}
return old_value;
static inline unsigned atomic_load_u32(unsigned* x) {
return __atomic_load_4(x, memory_order_seq_cst);
}
static inline void atomic_store_u32(unsigned* x, unsigned y) {
__atomic_store_4(x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_u32(unsigned* x, unsigned* expected, unsigned y) {
return (int)__atomic_compare_exchange_4(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_u32(unsigned* x, unsigned* expected, unsigned y) {
return (int)__atomic_compare_exchange_4(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned atomic_exchange_u32(unsigned* x, unsigned y) {
return __atomic_exchange_4(x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_add_u32(unsigned* x, unsigned y) {
return __atomic_fetch_add_4(x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_sub_u32(unsigned* x, unsigned y) {
return __atomic_fetch_sub_4(x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_and_u32(unsigned* x, unsigned y) {
return __atomic_fetch_and_4(x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_or_u32(unsigned* x, unsigned y) {
return __atomic_fetch_or_4(x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_xor_u32(unsigned* x, unsigned y) {
return __atomic_fetch_xor_4(x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_load_u16(unsigned short* x) {
return __atomic_load_2(x, memory_order_seq_cst);
}
static inline void atomic_store_u16(unsigned short* x, unsigned short y) {
__atomic_store_2(x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_u16(unsigned short* x, unsigned short* expected, unsigned short y) {
return (int)__atomic_compare_exchange_2(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_u16(unsigned short* x, unsigned short* expected, unsigned short y) {
return (int)__atomic_compare_exchange_2(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned short atomic_exchange_u16(unsigned short* x, unsigned short y) {
return __atomic_exchange_2(x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_add_u16(unsigned short* x, unsigned short y) {
return __atomic_fetch_add_2(x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_sub_u16(unsigned short* x, unsigned short y) {
return __atomic_fetch_sub_2(x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_and_u16(unsigned short* x, unsigned short y) {
return __atomic_fetch_and_2(x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_or_u16(unsigned short* x, unsigned short y) {
return __atomic_fetch_or_2(x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_xor_u16(unsigned short* x, unsigned short y) {
return __atomic_fetch_xor_2(x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_load_byte(unsigned char* x) {
return __atomic_load_1(x, memory_order_seq_cst);
}
static inline void atomic_store_byte(unsigned char* x, unsigned char y) {
__atomic_store_1(x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_byte(unsigned char* x, unsigned char* expected, unsigned char y) {
return __atomic_compare_exchange_1(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_byte(unsigned char* x, unsigned char* expected, unsigned char y) {
return __atomic_compare_exchange_1(x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned char atomic_exchange_byte(unsigned char* x, unsigned char y) {
return __atomic_exchange_1(x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_add_byte(unsigned char* x, unsigned char y) {
return __atomic_fetch_add_1(x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_sub_byte(unsigned char* x, unsigned char y) {
return __atomic_fetch_sub_1(x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_and_byte(unsigned char* x, unsigned char y) {
return __atomic_fetch_and_1(x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_or_byte(unsigned char* x, unsigned char y) {
return __atomic_fetch_or_1(x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_xor_byte(unsigned char* x, unsigned char y) {
return __atomic_fetch_xor_1(x, y, memory_order_seq_cst);
}
#else
// Since V might be confused with "generic" C functions either we provide special versions
// for gcc/clang, too
static inline unsigned long long atomic_load_u64(unsigned long long* x) {
return atomic_load_explicit((_Atomic unsigned long long*)x, memory_order_seq_cst);
}
static inline void atomic_store_u64(unsigned long long* x, unsigned long long y) {
atomic_store_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) {
return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned long long*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_u64(unsigned long long* x, unsigned long long* expected, unsigned long long y) {
return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned long long*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned long long atomic_exchange_u64(unsigned long long* x, unsigned long long y) {
return atomic_exchange_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_add_u64(unsigned long long* x, unsigned long long y) {
return atomic_fetch_add_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_sub_u64(unsigned long long* x, unsigned long long y) {
return atomic_fetch_sub_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_and_u64(unsigned long long* x, unsigned long long y) {
return atomic_fetch_and_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_or_u64(unsigned long long* x, unsigned long long y) {
return atomic_fetch_or_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline unsigned long long atomic_fetch_xor_u64(unsigned long long* x, unsigned long long y) {
return atomic_fetch_xor_explicit((_Atomic unsigned long long*)x, y, memory_order_seq_cst);
}
static inline void* atomic_load_ptr(void** x) {
return atomic_load_explicit((_Atomic uintptr_t*)x, memory_order_seq_cst);
}
static inline void atomic_store_ptr(void** x, void* y) {
atomic_store_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_ptr(void** x, void** expected, void* y) {
return (int)atomic_compare_exchange_weak_explicit((_Atomic uintptr_t*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_ptr(void** x, void** expected, void* y) {
return (int)atomic_compare_exchange_strong_explicit((_Atomic uintptr_t*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline void* atomic_exchange_ptr(void** x, void* y) {
return atomic_exchange_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline void* atomic_fetch_add_ptr(void** x, void* y) {
return atomic_fetch_add_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline void* atomic_fetch_sub_ptr(void** x, void* y) {
return atomic_fetch_sub_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline void* atomic_fetch_and_ptr(void** x, void* y) {
return atomic_fetch_and_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline void* atomic_fetch_or_ptr(void** x, void* y) {
return atomic_fetch_or_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline void* atomic_fetch_xor_ptr(void** x, void* y) {
return atomic_fetch_xor_explicit((_Atomic uintptr_t*)x, y, memory_order_seq_cst);
}
static inline unsigned atomic_load_u32(unsigned* x) {
return atomic_load_explicit((_Atomic unsigned*)x, memory_order_seq_cst);
}
static inline void atomic_store_u32(unsigned* x, unsigned y) {
atomic_store_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_u32(unsigned* x, unsigned* expected, unsigned y) {
return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_u32(unsigned* x, unsigned* expected, unsigned y) {
return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned atomic_exchange_u32(unsigned* x, unsigned y) {
return atomic_exchange_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_add_u32(unsigned* x, unsigned y) {
return atomic_fetch_add_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_sub_u32(unsigned* x, unsigned y) {
return atomic_fetch_sub_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_and_u32(unsigned* x, unsigned y) {
return atomic_fetch_and_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_or_u32(unsigned* x, unsigned y) {
return atomic_fetch_or_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline unsigned atomic_fetch_xor_u32(unsigned* x, unsigned y) {
return atomic_fetch_xor_explicit((_Atomic unsigned*)x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_load_u16(unsigned short* x) {
return atomic_load_explicit((_Atomic unsigned short*)x, memory_order_seq_cst);
}
static inline void atomic_store_u16(unsigned short* x, unsigned short y) {
atomic_store_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_u16(unsigned short* x, unsigned short* expected, unsigned short y) {
return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned short*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_u16(unsigned short* x, unsigned short* expected, unsigned short y) {
return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned short*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned short atomic_exchange_u16(unsigned short* x, unsigned short y) {
return atomic_exchange_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_add_u16(unsigned short* x, unsigned short y) {
return atomic_fetch_add_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_sub_u16(unsigned short* x, unsigned short y) {
return atomic_fetch_sub_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_and_u16(unsigned short* x, unsigned short y) {
return atomic_fetch_and_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_or_u16(unsigned short* x, unsigned short y) {
return atomic_fetch_or_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline unsigned short atomic_fetch_xor_u16(unsigned short* x, unsigned short y) {
return atomic_fetch_xor_explicit((_Atomic unsigned short*)x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_load_byte(unsigned char* x) {
return atomic_load_explicit((_Atomic unsigned char*)x, memory_order_seq_cst);
}
static inline void atomic_store_byte(unsigned char* x, unsigned char y) {
atomic_store_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_weak_byte(unsigned char* x, unsigned char* expected, unsigned char y) {
return (int)atomic_compare_exchange_weak_explicit((_Atomic unsigned char*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline int atomic_compare_exchange_strong_byte(unsigned char* x, unsigned char* expected, unsigned char y) {
return (int)atomic_compare_exchange_strong_explicit((_Atomic unsigned char*)x, expected, y, memory_order_seq_cst, memory_order_seq_cst);
}
static inline unsigned char atomic_exchange_byte(unsigned char* x, unsigned char y) {
return atomic_exchange_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_add_byte(unsigned char* x, unsigned char y) {
return atomic_fetch_add_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_sub_byte(unsigned char* x, unsigned char y) {
return atomic_fetch_sub_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_and_byte(unsigned char* x, unsigned char y) {
return atomic_fetch_and_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_or_byte(unsigned char* x, unsigned char y) {
return atomic_fetch_or_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
static inline unsigned char atomic_fetch_xor_byte(unsigned char* x, unsigned char y) {
return atomic_fetch_xor_explicit((_Atomic unsigned char*)x, y, memory_order_seq_cst);
}
#endif
#endif

View File

@ -100,8 +100,32 @@ __CRT_INLINE LONGLONG _InterlockedExchangeAdd64(LONGLONG volatile *Addend, LONGL
return Old;
}
__CRT_INLINE LONG _InterlockedExchangeAdd(LONG volatile *Addend, LONG Value)
{
LONG Old;
do
{
Old = *Addend;
} while (InterlockedCompareExchange(Addend, Old + Value, Old) != Old);
return Old;
}
__CRT_INLINE SHORT _InterlockedExchangeAdd16(SHORT volatile *Addend, SHORT Value)
{
SHORT Old;
do
{
Old = *Addend;
} while (InterlockedCompareExchange16(Addend, Old + Value, Old) != Old);
return Old;
}
#define InterlockedIncrement64 _InterlockedExchangeAdd64
__CRT_INLINE VOID __faststorefence() {
__asm__ __volatile__ ("sfence");
}
#endif
#define atomic_store(object, desired) \
@ -121,7 +145,7 @@ __CRT_INLINE LONGLONG _InterlockedExchangeAdd64(LONGLONG volatile *Addend, LONGL
atomic_load(object)
#define atomic_exchange(object, desired) \
InterlockedExchangePointer(object, desired);
InterlockedExchangePointer(object, desired)
#define atomic_exchange_explicit(object, desired, order) \
atomic_exchange(object, desired)
@ -177,6 +201,145 @@ static inline int atomic_compare_exchange_strong(intptr_t *object, intptr_t *exp
InterlockedAnd(object, operand)
#endif /* _WIN64 */
/* specialized versions with explicit object size */
#define atomic_load_ptr atomic_load
#define atomic_store_ptr atomic_store
#define atomic_compare_exchange_weak_ptr atomic_compare_exchange_weak
#define atomic_compare_exchange_strong_ptr atomic_compare_exchange_strong
#define atomic_exchange_ptr atomic_exchange
#define atomic_fetch_add_ptr atomic_fetch_add
#define atomic_fetch_sub_ptr atomic_fetch_sub
#define atomic_fetch_and_ptr atomic_fetch_and
#define atomic_fetch_or_ptr atomic_fetch_or
#define atomic_fetch_xor_ptr atomic_fetch_xor
static inline void atomic_store_u64(unsigned long long* object, unsigned long long desired) {
do {
*(object) = (desired);
MemoryBarrier();
} while (0);
}
static inline unsigned long long atomic_load_u64(unsigned long long* object) {
return (MemoryBarrier(), *(object));
}
#define atomic_exchange_u64(object, desired) \
InterlockedExchange64(object, desired)
static inline int atomic_compare_exchange_strong_u64(unsigned long long* object, unsigned long long* expected,
unsigned long long desired)
{
unsigned long long old = *expected;
*expected = InterlockedCompareExchange64(object, desired, old);
return *expected == old;
}
#define atomic_compare_exchange_weak_u64(object, expected, desired) \
atomic_compare_exchange_strong_u64(object, expected, desired)
#define atomic_fetch_add_u64(object, operand) \
InterlockedExchangeAdd64(object, operand)
#define atomic_fetch_sub_u64(object, operand) \
InterlockedExchangeAdd64(object, -(operand))
#define atomic_fetch_or_u64(object, operand) \
InterlockedOr64(object, operand)
#define atomic_fetch_xor_u64(object, operand) \
InterlockedXor64(object, operand)
#define atomic_fetch_and_u64(object, operand) \
InterlockedAnd64(object, operand)
static inline void atomic_store_u32(unsigned* object, unsigned desired) {
do {
*(object) = (desired);
MemoryBarrier();
} while (0);
}
static inline unsigned atomic_load_u32(unsigned* object) {
return (MemoryBarrier(), *(object));
}
#define atomic_exchange_u32(object, desired) \
InterlockedExchange(object, desired)
static inline int atomic_compare_exchange_strong_u32(unsigned* object, unsigned* expected,
unsigned desired)
{
unsigned old = *expected;
*expected = InterlockedCompareExchange(object, desired, old);
return *expected == old;
}
#define atomic_compare_exchange_weak_u32(object, expected, desired) \
atomic_compare_exchange_strong_u32(object, expected, desired)
#define atomic_fetch_add_u32(object, operand) \
InterlockedExchangeAdd(object, operand)
#define atomic_fetch_sub_u32(object, operand) \
InterlockedExchangeAdd(object, -(operand))
#define atomic_fetch_or_u32(object, operand) \
InterlockedOr(object, operand)
#define atomic_fetch_xor_u32(object, operand) \
InterlockedXor(object, operand)
#define atomic_fetch_and_u32(object, operand) \
InterlockedAnd(object, operand)
static inline void atomic_store_u16(unsigned short* object, unsigned short desired) {
do {
*(object) = (desired);
MemoryBarrier();
} while (0);
}
static inline unsigned short atomic_load_u16(unsigned short* object) {
return (MemoryBarrier(), *(object));
}
#define atomic_exchange_u16(object, desired) \
InterlockedExchange16(object, desired)
static inline int atomic_compare_exchange_strong_u16(unsigned short* object, unsigned short* expected,
unsigned short desired)
{
unsigned short old = *expected;
*expected = InterlockedCompareExchange16(object, desired, old);
return *expected == old;
}
#define atomic_compare_exchange_weak_u16(object, expected, desired) \
atomic_compare_exchange_strong_u16(object, expected, desired)
#define atomic_fetch_add_u16(object, operand) \
InterlockedExchangeAdd16(object, operand)
#define atomic_fetch_sub_u16(object, operand) \
InterlockedExchangeAdd16(object, -(operand))
#define atomic_fetch_or_u16(object, operand) \
InterlockedOr16(object, operand)
#define atomic_fetch_xor_u16(object, operand) \
InterlockedXor16(object, operand)
#define atomic_fetch_and_u16(object, operand) \
InterlockedAnd16(object, operand)
#define atomic_fetch_add_explicit(object, operand, order) \
atomic_fetch_add(object, operand)
@ -204,4 +367,4 @@ static inline int atomic_compare_exchange_strong(intptr_t *object, intptr_t *exp
#define atomic_flag_clear_explicit(object, order) \
atomic_flag_clear(object)
#endif /* COMPAT_ATOMICS_WIN32_STDATOMIC_H */
#endif /* COMPAT_ATOMICS_WIN32_STDATOMIC_H */

View File

@ -348,7 +348,7 @@ fn C.MAKELANGID() int
fn C.FormatMessage() voidptr
fn C.CloseHandle()
fn C.CloseHandle(voidptr) int
fn C.GetExitCodeProcess()
@ -411,6 +411,7 @@ fn C.ReleaseSRWLockExclusive(voidptr)
fn C.pthread_mutex_init(voidptr, voidptr) int
fn C.pthread_mutex_lock(voidptr) int
fn C.pthread_mutex_unlock(voidptr) int
fn C.pthread_mutex_destroy(voidptr) int
fn C.pthread_rwlockattr_init(voidptr) int
fn C.pthread_rwlockattr_setkind_np(voidptr, int) int
@ -422,16 +423,19 @@ fn C.pthread_rwlock_unlock(voidptr) int
fn C.pthread_condattr_init(voidptr) int
fn C.pthread_condattr_setpshared(voidptr, int) int
fn C.pthread_condattr_destroy(voidptr) int
fn C.pthread_cond_init(voidptr, voidptr) int
fn C.pthread_cond_signal(voidptr) int
fn C.pthread_cond_wait(voidptr, voidptr) int
fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int
fn C.pthread_cond_destroy(voidptr) int
fn C.sem_init(voidptr, int, u32) int
fn C.sem_post(voidptr) int
fn C.sem_wait(voidptr) int
fn C.sem_trywait(voidptr) int
fn C.sem_timedwait(voidptr, voidptr) int
fn C.sem_destroy(voidptr) int
fn C.read(fd int, buf voidptr, count size_t) int
fn C.write(fd int, buf voidptr, count size_t) int

View File

@ -17,10 +17,17 @@ further tested.
#flag freebsd -I @VROOT/thirdparty/stdatomic/nix
#flag solaris -I @VROOT/thirdparty/stdatomic/nix
#include "atomic.h"
$if linux {
$if tinyc {
// most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir
#flag -L/usr/lib/gcc/x86_64-linux-gnu/8 -L/usr/lib/gcc/x86_64-linux-gnu/9 -latomic
}
}
fn C.atomic_fetch_add_explicit() int
fn C.atomic_fetch_sub_explicit() int
#include <atomic.h>
fn C.atomic_fetch_add_explicit(voidptr, i64) i64
fn C.atomic_fetch_sub_explicit(voidptr, i64) i64
[typedef]
struct C.atomic_ullong {

505
vlib/sync/channels.v Normal file
View File

@ -0,0 +1,505 @@
module sync
import time
import rand
#flag windows -I @VROOT/thirdparty/stdatomic/win
#flag linux -I @VROOT/thirdparty/stdatomic/nix
#flag darwin -I @VROOT/thirdparty/stdatomic/nix
#flag freebsd -I @VROOT/thirdparty/stdatomic/nix
#flag solaris -I @VROOT/thirdparty/stdatomic/nix
$if linux {
$if tinyc {
// most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir
#flag -L/usr/lib/gcc/x86_64-linux-gnu/8 -L/usr/lib/gcc/x86_64-linux-gnu/9 -latomic
}
}
#include <atomic.h>
// the following functions are actually generic in C
fn C.atomic_load_ptr(voidptr) voidptr
fn C.atomic_store_ptr(voidptr, voidptr)
fn C.atomic_compare_exchange_weak_ptr(voidptr, voidptr, voidptr) bool
fn C.atomic_compare_exchange_strong_ptr(voidptr, voidptr, voidptr) bool
fn C.atomic_exchange_ptr(voidptr, voidptr) voidptr
fn C.atomic_fetch_add_ptr(voidptr, voidptr) voidptr
fn C.atomic_fetch_sub_ptr(voidptr, voidptr) voidptr
fn C.atomic_load_u16(voidptr) u16
fn C.atomic_store_u16(voidptr, u16)
fn C.atomic_compare_exchange_weak_u16(voidptr, voidptr, u16) bool
fn C.atomic_compare_exchange_strong_u16(voidptr, voidptr, u16) bool
fn C.atomic_exchange_u16(voidptr, u16) u16
fn C.atomic_fetch_add_u16(voidptr, u16) u16
fn C.atomic_fetch_sub_u16(voidptr, u16) u16
fn C.atomic_load_u32(voidptr) u32
fn C.atomic_store_u32(voidptr, u32)
fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool
fn C.atomic_compare_exchange_strong_u32(voidptr, voidptr, u32) bool
fn C.atomic_exchange_u32(voidptr, u32) u32
fn C.atomic_fetch_add_u32(voidptr, u32) u32
fn C.atomic_fetch_sub_u32(voidptr, u32) u32
fn C.atomic_load_u64(voidptr) u64
fn C.atomic_store_u64(voidptr, u64)
fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool
fn C.atomic_compare_exchange_strong_u64(voidptr, voidptr, u64) bool
fn C.atomic_exchange_u64(voidptr, u64) u64
fn C.atomic_fetch_add_u64(voidptr, u64) u64
fn C.atomic_fetch_sub_u64(voidptr, u64) u64
const (
// how often to try to get data without blocking before to wait for semaphore
spinloops = 750
spinloops_sem = 4000
)
enum BufferElemStat {
unused = 0
writing
written
reading
}
struct Subscription {
mut:
sem Semaphore
prev &&Subscription
nxt &Subscription
}
struct Channel {
writesem Semaphore // to wake thread that wanted to write, but buffer was full
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
writesem_im Semaphore
readsem_im Semaphore
ringbuf byteptr // queue for buffered channels
statusbuf byteptr // flags to synchronize write/read in ringbuf
objsize u32
queue_length u32 // in #objects
mut: // atomic
write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
adr_read C.atomic_uintptr_t // used to identify origin of writesem
adr_written C.atomic_uintptr_t // used to identify origin of readsem
write_free u32 // for queue state
read_avail u32
buf_elem_write_idx u32
buf_elem_read_idx u32
// for select
write_subscriber &Subscription
read_subscriber &Subscription
write_sub_mtx u16
read_sub_mtx u16
}
pub fn new_channel<T>(n u32) &Channel {
return &Channel{
writesem: new_semaphore_init(if n > 0 { n + 1 } else { 1 })
readsem: new_semaphore_init(if n > 0 { u32(0) } else { 1 })
writesem_im: new_semaphore()
readsem_im: new_semaphore()
objsize: sizeof(T)
queue_length: n
write_free: n
read_avail: 0
ringbuf: if n > 0 { malloc(int(n * sizeof(T))) } else { byteptr(0) }
statusbuf: if n > 0 { vcalloc(int(n * sizeof(u16))) } else { byteptr(0) }
write_subscriber: 0
read_subscriber: 0
}
}
pub fn (mut ch Channel) push(src voidptr) {
ch.try_push(src, false)
}
fn (mut ch Channel) try_push(src voidptr, no_block bool) bool {
spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
mut have_swapped := false
for {
mut got_sem := false
mut wradr := C.atomic_load_ptr(&ch.write_adr)
for wradr != C.NULL {
if C.atomic_compare_exchange_strong_ptr(&ch.write_adr, &wradr, voidptr(0)) {
// there is a reader waiting for us
unsafe { C.memcpy(wradr, src, ch.objsize) }
mut nulladr := voidptr(0)
for !C.atomic_compare_exchange_weak_ptr(&ch.adr_written, &nulladr, wradr) {
nulladr = voidptr(0)
}
ch.readsem_im.post()
return true
}
}
if no_block && ch.queue_length == 0 {
return false
}
// get token to read
for _ in 0 .. spinloops_sem_ {
if got_sem {
break
}
got_sem = ch.writesem.try_wait()
}
if !got_sem {
ch.writesem.wait()
}
if ch.queue_length == 0 {
// try to advertise current object as readable
mut read_in_progress := false
C.atomic_store_ptr(&ch.read_adr, src)
wradr = C.atomic_load_ptr(&ch.write_adr)
if wradr != C.NULL {
mut src2 := src
if C.atomic_compare_exchange_strong_ptr(&ch.read_adr, &src2, voidptr(0)) {
ch.writesem.post()
continue
} else {
read_in_progress = true
}
}
if !read_in_progress {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != voidptr(0) {
ch.read_subscriber.sem.post()
}
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
}
mut src2 := src
for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
if C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) {
have_swapped = true
read_in_progress = true
break
}
src2 = src
}
mut got_im_sem := false
for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ {
got_im_sem = ch.writesem_im.try_wait()
if got_im_sem {
break
}
}
for {
if got_im_sem {
got_im_sem = false
} else {
ch.writesem_im.wait()
}
if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) {
ch.writesem.post()
break
} else {
// this semaphore was not for us - repost in
ch.writesem_im.post()
src2 = src
}
}
return true
} else {
// buffered channel
mut space_in_queue := false
mut wr_free := C.atomic_load_u32(&ch.write_free)
for wr_free > 0 {
space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, wr_free-1)
if space_in_queue {
break
}
}
if space_in_queue {
mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx)
for {
mut new_wr_idx := wr_idx + 1
for new_wr_idx >= ch.queue_length {
new_wr_idx -= ch.queue_length
}
if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, new_wr_idx) {
break
}
}
mut wr_ptr := ch.ringbuf
mut status_adr := ch.statusbuf
unsafe {
wr_ptr += wr_idx * ch.objsize
status_adr += wr_idx * sizeof(u16)
}
mut expected_status := u16(BufferElemStat.unused)
for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, u16(BufferElemStat.writing)) {
expected_status = u16(BufferElemStat.unused)
}
unsafe {
C.memcpy(wr_ptr, src, ch.objsize)
}
C.atomic_store_u16(status_adr, u16(BufferElemStat.written))
old_read_avail := C.atomic_fetch_add_u32(&ch.read_avail, 1)
ch.readsem.post()
if old_read_avail == 0 {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != voidptr(0) {
ch.read_subscriber.sem.post()
}
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
}
return true
} else {
ch.writesem.post()
}
}
}
}
pub fn (mut ch Channel) pop(dest voidptr) {
ch.try_pop(dest, false)
}
fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
mut have_swapped := false
mut write_in_progress := false
for {
mut got_sem := false
if ch.queue_length == 0 {
// unbuffered channel - first see if a `push()` has adversized
mut rdadr := C.atomic_load_ptr(&ch.read_adr)
for rdadr != C.NULL {
if C.atomic_compare_exchange_strong_ptr(&ch.read_adr, &rdadr, voidptr(0)) {
// there is a writer waiting for us
unsafe { C.memcpy(dest, rdadr, ch.objsize) }
mut nulladr := voidptr(0)
for !C.atomic_compare_exchange_weak_ptr(&ch.adr_read, &nulladr, rdadr) {
nulladr = voidptr(0)
}
ch.writesem_im.post()
return true
}
}
if no_block {
return false
}
}
// get token to read
for _ in 0 .. spinloops_sem_ {
if got_sem {
break
}
got_sem = ch.readsem.try_wait()
}
if !got_sem {
if no_block {
return false
}
ch.readsem.wait()
}
if ch.queue_length > 0 {
// try to get buffer token
mut obj_in_queue := false
mut rd_avail := C.atomic_load_u32(&ch.read_avail)
for rd_avail > 0 {
obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, rd_avail-1)
if obj_in_queue {
break
}
}
if obj_in_queue {
mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx)
for {
mut new_rd_idx := rd_idx + 1
for new_rd_idx >= ch.queue_length {
new_rd_idx -= ch.queue_length
}
if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, new_rd_idx) {
break
}
}
mut rd_ptr := ch.ringbuf
mut status_adr := ch.statusbuf
unsafe {
rd_ptr += rd_idx * ch.objsize
status_adr += rd_idx * sizeof(u16)
}
mut expected_status := u16(BufferElemStat.written)
for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, u16(BufferElemStat.reading)) {
expected_status = u16(BufferElemStat.written)
}
unsafe {
C.memcpy(dest, rd_ptr, ch.objsize)
}
C.atomic_store_u16(status_adr, u16(BufferElemStat.unused))
old_write_free := C.atomic_fetch_add_u32(&ch.write_free, 1)
ch.writesem.post()
if old_write_free == 0 {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.write_subscriber != voidptr(0) {
ch.write_subscriber.sem.post()
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
}
return true
}
}
// try to advertise `dest` as writable
C.atomic_store_ptr(&ch.write_adr, dest)
if ch.queue_length == 0 {
mut rdadr := C.atomic_load_ptr(&ch.read_adr)
if rdadr != C.NULL {
mut dest2 := dest
if C.atomic_compare_exchange_strong_ptr(&ch.write_adr, &dest2, voidptr(0)) {
ch.readsem.post()
continue
} else {
write_in_progress = true
}
}
}
if ch.queue_length == 0 && !write_in_progress {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.write_subscriber != voidptr(0) {
ch.write_subscriber.sem.post()
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
}
mut dest2 := dest
for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
if C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) {
have_swapped = true
break
}
dest2 = dest
}
mut got_im_sem := false
for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ {
got_im_sem = ch.readsem_im.try_wait()
if got_im_sem {
break
}
}
for {
if got_im_sem {
got_im_sem = false
} else {
ch.readsem_im.wait()
}
if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) {
ch.readsem.post()
break
} else {
// this semaphore was not for us - repost in
ch.readsem_im.post()
dest2 = dest
}
}
return true
}
}
// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`)
// object referenced by `objrefs[i]`. `timeout = 0` means wait unlimited time
pub fn channel_select(mut channels []&Channel, is_push []bool, mut objrefs []voidptr, timeout time.Duration) int {
assert channels.len == is_push.len
assert is_push.len == objrefs.len
mut subscr := []Subscription{len: channels.len}
sem := new_semaphore()
for i, ch in channels {
if is_push[i] {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
subscr[i].sem = sem
subscr[i].prev = &ch.write_subscriber
subscr[i].nxt = C.atomic_exchange_ptr(&ch.write_subscriber, &subscr[i])
if voidptr(subscr[i].nxt) != voidptr(0) {
subscr[i].nxt.prev = &subscr[i]
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
} else {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
subscr[i].sem = sem
subscr[i].prev = &ch.read_subscriber
subscr[i].nxt = C.atomic_exchange_ptr(&ch.read_subscriber, &subscr[i])
if voidptr(subscr[i].nxt) != voidptr(0) {
subscr[i].nxt.prev = &subscr[i]
}
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
}
}
stopwatch := if timeout == 0 { time.StopWatch{} } else { time.new_stopwatch({}) }
mut event_idx := -1 // negative index means `timed out`
for {
rnd := rand.u32_in_range(0, u32(channels.len))
for j, _ in channels {
mut i := j + int(rnd)
if i >= channels.len {
i -= channels.len
}
if is_push[i] {
if channels[i].try_push(objrefs[i], true) {
event_idx = i
goto restore
}
} else {
if channels[i].try_pop(objrefs[i], true) {
event_idx = i
goto restore
}
}
}
if timeout > 0 {
remaining := timeout - stopwatch.elapsed()
if !sem.timed_wait(remaining) {
goto restore
}
} else {
sem.wait()
}
}
restore:
// reset subscribers
for i, ch in channels {
if is_push[i] {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
subscr[i].prev = subscr[i].nxt
if subscr[i].nxt != 0 {
// just in case we have missed a semaphore during restore
subscr[i].nxt.sem.post()
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
} else {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
subscr[i].prev = subscr[i].nxt
if subscr[i].nxt != 0 {
subscr[i].nxt.sem.post()
}
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
}
}
sem.destroy()
return event_idx
}

View File

@ -26,11 +26,13 @@ struct RwMutexAttr {
/* MacOSX has no unnamed semaphores and no `timed_wait()` at all
so we emulate the behaviour with other devices */
[ref_only]
struct MacOSX_Semaphore {
mtx C.pthread_mutex_t
cond C.pthread_cond_t
attr C.pthread_condattr_t
mut:
count int
count u32
}
[ref_only]
@ -38,19 +40,8 @@ struct PosixSemaphore {
sem C.sem_t
}
[ref_only]
struct CondAttr {
attr C.pthread_condattr_t
}
pub struct Semaphore {
/*
$if macos {
sem &MacOSX_Semaphore
} $else {
sem &PosixSemaphore
}
*/
mut:
sem voidptr // since the above does not work, yet
}
@ -99,22 +90,26 @@ pub fn (mut m RwMutex) w_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
[inline]
pub fn new_semaphore() Semaphore {
return new_semaphore_init(0)
}
pub fn new_semaphore_init(n u32) Semaphore {
$if macos {
s := Semaphore{
sem: &MacOSX_Semaphore{count: 0}
sem: &MacOSX_Semaphore{count: n}
}
C.pthread_mutex_init(&&MacOSX_Semaphore(s.sem).mtx, C.NULL)
a := &CondAttr{}
C.pthread_condattr_init(&a.attr)
C.pthread_condattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_cond_init(&&MacOSX_Semaphore(s.sem).cond, &a.attr)
C.pthread_condattr_init(&&MacOSX_Semaphore(s.sem).attr)
C.pthread_condattr_setpshared(&&MacOSX_Semaphore(s.sem).attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_cond_init(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).attr)
return s
} $else {
s := Semaphore{
sem: &PosixSemaphore{}
}
unsafe { C.sem_init(&&PosixSemaphore(s.sem).sem, 0, 0) }
unsafe { C.sem_init(&&PosixSemaphore(s.sem).sem, 0, n) }
return s
}
}
@ -186,3 +181,13 @@ pub fn (s Semaphore) timed_wait(timeout time.Duration) bool {
return unsafe { C.sem_timedwait(&&PosixSemaphore(s.sem).sem, &t_spec) == 0 }
}
}
pub fn (s Semaphore) destroy() bool {
$if macos {
return C.pthread_cond_destroy(&&MacOSX_Semaphore(s.sem).cond) == 0 &&
C.pthread_condattr_destroy(&&MacOSX_Semaphore(s.sem).attr) == 0 &&
C.pthread_mutex_destroy(&&MacOSX_Semaphore(s.sem).mtx) == 0
} $else {
return unsafe { C.sem_destroy(&&PosixSemaphore(s.sem).sem) == 0 }
}
}

View File

@ -128,9 +128,14 @@ pub fn (mut m Mutex) destroy() {
m.state = .destroyed // setting up reference to invalid state
}
[inline]
pub fn new_semaphore() Semaphore {
return new_semaphore_init(0)
}
pub fn new_semaphore_init(n u32) Semaphore {
return Semaphore{
sem: SHANDLE(C.CreateSemaphore(0, 0, C.INT32_MAX, 0))
sem: SHANDLE(C.CreateSemaphore(0, n, C.INT32_MAX, 0))
}
}
@ -149,3 +154,7 @@ pub fn (s Semaphore) try_wait() bool {
pub fn (s Semaphore) timed_wait(timeout time.Duration) bool {
return C.WaitForSingleObject(s.sem, timeout / time.millisecond) == 0
}
pub fn (s Semaphore) destroy() bool {
return C.CloseHandle(s.sem) != 0
}

View File

@ -0,0 +1,68 @@
package main
import "fmt"
import "log"
import "os"
import "time"
import "strconv"
func assert_eq(a, b int64) {
if a != b {
log.Fatalf("assertion failed\nleft: %d, right: %d\n", a, b)
}
}
func do_rec(ch chan int32, resch chan int64, n int32) {
var sum int64
var i int32
for i = 0; i < n; i++ {
sum += int64(<- ch)
}
fmt.Println(sum)
resch <- sum
}
func do_send(ch chan int32, start, end int32) {
for i := start; i < end; i++ {
ch <- i
}
}
func main() {
if len(os.Args) != 5 {
log.Fatalf("usage:\n\t%s <nsend> <nrec> <buflen> <nobj>\n", os.Args[0])
}
nsend, _ := strconv.Atoi(os.Args[1])
nrec, _ := strconv.Atoi(os.Args[2])
buflen, _ := strconv.Atoi(os.Args[3])
nobj, _ := strconv.Atoi(os.Args[4])
stopwatch := time.Now()
ch := make(chan int32, buflen)
resch := make(chan int64, 0)
no := nobj
for i := 0; i < nrec; i++ {
n := no / (nrec - i)
go do_rec(ch, resch, int32(n))
no -= n
}
assert_eq(int64(no), 0)
no = nobj
for i := 0; i < nsend; i++ {
n := no / (nsend - i)
end := no
no -= n
go do_send(ch, int32(no), int32(end))
}
assert_eq(int64(no), 0)
var sum int64
for i := 0; i < nrec; i++ {
sum += <-resch
}
elapsed := time.Now().Sub(stopwatch)
rate := float64(nobj)/float64(elapsed.Nanoseconds())*1000.0
duration := 1.0e-09 * float64(elapsed.Nanoseconds())
fmt.Printf("%d objects in %g s (%.2f objs/µs)\n", nobj, duration, rate)
expected_sum := int64(nobj)*int64(nobj-1)/2
fmt.Printf("got: %d, expected: %d\n", sum, expected_sum)
assert_eq(sum, expected_sum)
}

View File

@ -0,0 +1,70 @@
// Channel Benchmark
//
// `nobj` integers are sent thru a channel with queue length`buflen`
// using `nsend` sender threads and `nrec` receiver threads.
//
// The receive threads add all received numbers and send them to the
// main thread where the total sum is compare to the expected value.
import sync
import time
import os
fn do_rec(mut ch sync.Channel, mut resch sync.Channel, n int) {
mut sum := i64(0)
for _ in 0 .. n {
mut a := 0
ch.pop(&a)
sum += a
}
println(sum)
resch.push(&sum)
}
fn do_send(mut ch sync.Channel, start, end int) {
for i in start .. end {
ch.push(&i)
}
}
fn main() {
if os.args.len != 5 {
eprintln('usage:\n\t${os.args[0]} <nsend> <nrec> <buflen> <nobj>')
exit(1)
}
nsend := os.args[1].int()
nrec := os.args[2].int()
buflen := os.args[3].int()
nobj := os.args[4].int()
stopwatch := time.new_stopwatch({})
mut ch := sync.new_channel<int>(buflen)
mut resch := sync.new_channel<i64>(0)
mut no := nobj
for i in 0 .. nrec {
n := no / (nrec - i)
go do_rec(mut ch, mut resch, n)
no -= n
}
assert no == 0
no = nobj
for i in 0 .. nsend {
n := no / (nsend - i)
end := no
no -= n
go do_send(mut ch, no, end)
}
assert no == 0
mut sum := i64(0)
for _ in 0 .. nrec {
mut r := i64(0)
resch.pop(&r)
sum += r
}
elapsed := stopwatch.elapsed()
rate := f64(nobj)/elapsed*time.microsecond
println('$nobj objects in ${f64(elapsed)/time.second} s (${rate:.2f} objs/µs)')
// use sum formula by Gauß to calculate the expected result
expected_sum := i64(nobj)*(nobj-1)/2
println('got: $sum, expected: $expected_sum')
assert sum == expected_sum
}

View File

@ -0,0 +1,23 @@
import sync
const (
num_iterations = 10000
)
fn do_send(mut ch sync.Channel) {
for i in 0 .. num_iterations {
ch.push(&i)
}
}
fn test_channel_buffered() {
mut ch := sync.new_channel<int>(1000)
go do_send(mut ch)
mut sum := i64(0)
for _ in 0 .. num_iterations {
a := 0
ch.pop(&a)
sum += a
}
assert sum == u64(num_iterations)*(num_iterations-1)/2
}

View File

@ -0,0 +1,23 @@
import sync
const (
num_iterations = 10000
)
fn do_send(mut ch sync.Channel) {
for i in 0 .. num_iterations {
ch.push(&i)
}
}
fn test_channel_unbuffered() {
mut ch := sync.new_channel<int>(0)
go do_send(mut ch)
mut sum := i64(0)
for _ in 0 .. num_iterations {
a := 0
ch.pop(&a)
sum += a
}
assert sum == u64(num_iterations)*(num_iterations-1)/2
}

View File

@ -0,0 +1,38 @@
import sync
fn do_rec(mut ch sync.Channel, mut resch sync.Channel) {
mut sum := i64(0)
for _ in 0 .. 2000 {
mut a := 0
ch.pop(&a)
sum += a
}
println(sum)
resch.push(&sum)
}
fn do_send(mut ch sync.Channel) {
for i in 0 .. 2000 {
ch.push(&i)
}
}
fn test_channel_multi_unbuffered() {
mut ch := sync.new_channel<int>(0)
mut resch := sync.new_channel<i64>(0)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_send(mut ch)
go do_send(mut ch)
go do_send(mut ch)
go do_send(mut ch)
mut sum := i64(0)
for _ in 0 .. 4 {
mut r := i64(0)
resch.pop(&r)
sum += r
}
assert sum == i64(4) * 2000 * (2000 - 1) / 2
}

View File

@ -0,0 +1,38 @@
import sync
fn do_rec(mut ch sync.Channel, mut resch sync.Channel) {
mut sum := i64(0)
for _ in 0 .. 2000 {
mut a := 0
ch.pop(&a)
sum += a
}
println(sum)
resch.push(&sum)
}
fn do_send(mut ch sync.Channel) {
for i in 0 .. 2000 {
ch.push(&i)
}
}
fn test_channel_multi_buffered() {
mut ch := sync.new_channel<int>(100)
mut resch := sync.new_channel<i64>(0)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_send(mut ch)
go do_send(mut ch)
go do_send(mut ch)
go do_send(mut ch)
mut sum := i64(0)
for _ in 0 .. 4 {
mut r := i64(0)
resch.pop(&r)
sum += r
}
assert sum == i64(4) * 2000 * (2000 - 1) / 2
}

View File

@ -0,0 +1,76 @@
import sync
fn do_rec_i64(mut ch sync.Channel) {
mut sum := i64(0)
for _ in 0 .. 300 {
mut a := i64(0)
ch.pop(&a)
sum += a
}
assert sum == 300 * (300 - 1) / 2
}
fn do_send_int(mut ch sync.Channel) {
for i in 0 .. 300 {
ch.push(&i)
}
}
fn do_send_byte(mut ch sync.Channel) {
for i in 0 .. 300 {
ii := byte(i)
ch.push(&ii)
}
}
fn do_send_i64(mut ch sync.Channel) {
for i in 0 .. 300 {
ii := i64(i)
ch.push(&ii)
}
}
fn test_select() {
mut chi := sync.new_channel<int>(0)
mut chl := sync.new_channel<i64>(1)
mut chb := sync.new_channel<byte>(10)
mut recch := sync.new_channel<i64>(0)
go do_rec_i64(mut recch)
go do_send_int(mut chi)
go do_send_byte(mut chb)
go do_send_i64(mut chl)
mut channels := [chi, recch, chl, chb]
directions := [false, true, false, false]
mut sum := i64(0)
mut rl := i64(0)
mut ri := int(0)
mut rb := byte(0)
mut sl := i64(0)
mut objs := [voidptr(&ri), &sl, &rl, &rb]
for _ in 0 .. 1200 {
idx := sync.channel_select(mut channels, directions, mut objs, 0)
match idx {
0 {
sum += ri
}
1 {
sl++
}
2 {
sum += rl
}
3 {
sum += rb
}
else {
println('got $idx (timeout)')
}
}
}
// Use Gauß' formula for the first 2 contributions
expected_sum := 2 * (300 * (300 - 1) / 2) +
// the 3rd contribution is `byte` and must be seen modulo 256
256 * (256 - 1) / 2 +
44 * (44 - 1) / 2
assert sum == expected_sum
}