1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

sync: don't force Mutex and Semaphore to be reference (#8331)

This commit is contained in:
Uwe Krüger 2021-01-29 19:52:14 +01:00 committed by GitHub
parent d370e4de9f
commit 4a955d9c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 541 additions and 296 deletions

View File

@ -5,7 +5,7 @@ const (
queue_fill = 763 queue_fill = 763
) )
fn do_send(ch chan int, fin sync.Semaphore) { fn do_send(ch chan int, mut fin sync.Semaphore) {
for i in 0 .. queue_fill { for i in 0 .. queue_fill {
ch <- i ch <- i
} }
@ -14,8 +14,8 @@ fn do_send(ch chan int, fin sync.Semaphore) {
fn test_channel_len_cap() { fn test_channel_len_cap() {
ch := chan int{cap: queue_len} ch := chan int{cap: queue_len}
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
go do_send(ch, sem) go do_send(ch, mut sem)
sem.wait() sem.wait()
assert ch.cap == queue_len assert ch.cap == queue_len
assert ch.len == queue_fill assert ch.len == queue_fill

View File

@ -10,7 +10,7 @@ fn get_val_from_chan(ch chan i64) ?i64 {
} }
// this function gets an array of channels for `i64` // this function gets an array of channels for `i64`
fn do_rec_calc_send(chs []chan i64, sem sync.Semaphore) { fn do_rec_calc_send(chs []chan i64, mut sem sync.Semaphore) {
mut msg := '' mut msg := ''
for { for {
mut s := get_val_from_chan(chs[0]) or { mut s := get_val_from_chan(chs[0]) or {
@ -26,8 +26,8 @@ fn do_rec_calc_send(chs []chan i64, sem sync.Semaphore) {
fn test_channel_array_mut() { fn test_channel_array_mut() {
mut chs := [chan i64{}, chan i64{cap: 10}] mut chs := [chan i64{}, chan i64{cap: 10}]
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
go do_rec_calc_send(chs, sem) go do_rec_calc_send(chs, mut sem)
mut t := i64(100) mut t := i64(100)
for _ in 0 .. num_iterations { for _ in 0 .. num_iterations {
chs[0] <- t chs[0] <- t

View File

@ -9,7 +9,7 @@ fn getint() int {
return 8 return 8
} }
fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, sem sync.Semaphore) { fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, mut sem sync.Semaphore) {
mut a := 5 mut a := 5
select { select {
a = <-ch3 { a = <-ch3 {
@ -40,7 +40,7 @@ fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, sem s
sem.post() sem.post()
} }
fn f2(ch1 chan St, ch2 chan int, sem sync.Semaphore) { fn f2(ch1 chan St, ch2 chan int, mut sem sync.Semaphore) {
mut r := 23 mut r := 23
for i in 0 .. 2 { for i in 0 .. 2 {
select { select {
@ -66,7 +66,7 @@ fn test_select_blocks() {
ch3 := chan int{} ch3 := chan int{}
ch4 := chan int{} ch4 := chan int{}
ch5 := chan int{} ch5 := chan int{}
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
mut r := false mut r := false
t := select { t := select {
b := <-ch1 { b := <-ch1 {
@ -79,7 +79,7 @@ fn test_select_blocks() {
} }
assert r == true assert r == true
assert t == true assert t == true
go f2(ch2, ch3, sem) go f2(ch2, ch3, mut sem)
n := <-ch3 n := <-ch3
assert n == 23 assert n == 23
ch2 <- St{ ch2 <- St{
@ -87,7 +87,7 @@ fn test_select_blocks() {
} }
sem.wait() sem.wait()
stopwatch := time.new_stopwatch({}) stopwatch := time.new_stopwatch({})
go f1(ch1, ch2, ch3, ch4, ch5, sem) go f1(ch1, ch2, ch3, ch4, ch5, mut sem)
sem.wait() sem.wait()
elapsed_ms := f64(stopwatch.elapsed()) / time.millisecond elapsed_ms := f64(stopwatch.elapsed()) / time.millisecond
// https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/high-resolution-timers // https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/high-resolution-timers

View File

@ -66,7 +66,7 @@ enum BufferElemStat {
struct Subscription { struct Subscription {
mut: mut:
sem Semaphore sem &Semaphore
prev &&Subscription prev &&Subscription
nxt &Subscription nxt &Subscription
} }
@ -77,14 +77,14 @@ enum Direction {
} }
struct Channel { struct Channel {
writesem Semaphore // to wake thread that wanted to write, but buffer was full
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
writesem_im Semaphore
readsem_im Semaphore
ringbuf byteptr // queue for buffered channels ringbuf byteptr // queue for buffered channels
statusbuf byteptr // flags to synchronize write/read in ringbuf statusbuf byteptr // flags to synchronize write/read in ringbuf
objsize u32 objsize u32
mut: // atomic mut: // atomic
writesem Semaphore // to wake thread that wanted to write, but buffer was full
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
writesem_im Semaphore
readsem_im Semaphore
write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
adr_read C.atomic_uintptr_t // used to identify origin of writesem adr_read C.atomic_uintptr_t // used to identify origin of writesem
@ -113,11 +113,7 @@ fn new_channel_st(n u32, st u32) &Channel {
rsem := if n > 0 { u32(0) } else { 1 } rsem := if n > 0 { u32(0) } else { 1 }
rbuf := if n > 0 { malloc(int(n * st)) } else { byteptr(0) } rbuf := if n > 0 { malloc(int(n * st)) } else { byteptr(0) }
sbuf := if n > 0 { vcalloc(int(n * 2)) } else { byteptr(0) } sbuf := if n > 0 { vcalloc(int(n * 2)) } else { byteptr(0) }
return &Channel{ mut ch := &Channel{
writesem: new_semaphore_init(wsem)
readsem: new_semaphore_init(rsem)
writesem_im: new_semaphore()
readsem_im: new_semaphore()
objsize: st objsize: st
cap: n cap: n
write_free: n write_free: n
@ -127,6 +123,11 @@ fn new_channel_st(n u32, st u32) &Channel {
write_subscriber: 0 write_subscriber: 0
read_subscriber: 0 read_subscriber: 0
} }
ch.writesem.init(wsem)
ch.readsem.init(rsem)
ch.writesem_im.init(0)
ch.readsem_im.init(0)
return ch
} }
pub fn (mut ch Channel) close() { pub fn (mut ch Channel) close() {
@ -528,9 +529,10 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
assert channels.len == dir.len assert channels.len == dir.len
assert dir.len == objrefs.len assert dir.len == objrefs.len
mut subscr := []Subscription{len: channels.len} mut subscr := []Subscription{len: channels.len}
sem := new_semaphore() mut sem := Semaphore{}
sem.init(0)
for i, ch in channels { for i, ch in channels {
subscr[i].sem = sem subscr[i].sem = &sem
if dir[i] == .push { if dir[i] == .push {
mut null16 := u16(0) mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {

134
vlib/sync/sync_default.c.v Normal file
View File

@ -0,0 +1,134 @@
// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved.
// Use of this source code is governed by an MIT license
// that can be found in the LICENSE file.
module sync
import time
#flag -lpthread
#include <semaphore.h>
[trusted]
fn C.pthread_mutex_init(voidptr, voidptr) int
fn C.pthread_mutex_lock(voidptr) int
fn C.pthread_mutex_unlock(voidptr) int
fn C.pthread_mutex_destroy(voidptr) int
fn C.pthread_rwlockattr_init(voidptr) int
fn C.pthread_rwlockattr_setkind_np(voidptr, int) int
fn C.pthread_rwlockattr_setpshared(voidptr, int) int
fn C.pthread_rwlock_init(voidptr, voidptr) int
fn C.pthread_rwlock_rdlock(voidptr) int
fn C.pthread_rwlock_wrlock(voidptr) int
fn C.pthread_rwlock_unlock(voidptr) int
fn C.sem_init(voidptr, int, u32) int
fn C.sem_post(voidptr) int
fn C.sem_wait(voidptr) int
fn C.sem_trywait(voidptr) int
fn C.sem_timedwait(voidptr, voidptr) int
fn C.sem_destroy(voidptr) int
// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
pub struct Mutex {
mutex C.pthread_mutex_t
}
pub struct RwMutex {
mutex C.pthread_rwlock_t
}
struct RwMutexAttr {
attr C.pthread_rwlockattr_t
}
struct Semaphore {
sem C.sem_t
}
pub fn new_mutex() &Mutex {
mut m := &Mutex{}
m.init()
return m
}
pub fn (mut m Mutex) init() {
C.pthread_mutex_init(&m.mutex, C.NULL)
}
pub fn new_rwmutex() &RwMutex {
mut m := &RwMutex{}
m.init()
return m
}
pub fn (mut m RwMutex) init() {
a := RwMutexAttr{}
C.pthread_rwlockattr_init(&a.attr)
// Give writer priority over readers
C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_rwlock_init(&m.mutex, &a.attr)
}
// m_lock(), for *manual* mutex handling, since `lock` is a keyword
pub fn (mut m Mutex) m_lock() {
C.pthread_mutex_lock(&m.mutex)
}
pub fn (mut m Mutex) unlock() {
C.pthread_mutex_unlock(&m.mutex)
}
// RwMutex has separate read- and write locks
pub fn (mut m RwMutex) r_lock() {
C.pthread_rwlock_rdlock(&m.mutex)
}
pub fn (mut m RwMutex) w_lock() {
C.pthread_rwlock_wrlock(&m.mutex)
}
// Windows SRWLocks have different function to unlock
// So provide two functions here, too, to have a common interface
pub fn (mut m RwMutex) r_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
pub fn (mut m RwMutex) w_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
[inline]
pub fn new_semaphore() &Semaphore {
return new_semaphore_init(0)
}
pub fn new_semaphore_init(n u32) &Semaphore {
mut sem := &Semaphore{}
sem.init(n)
return sem
}
pub fn (mut sem Semaphore) init(n u32) {
C.sem_init(&sem.sem, 0, n)
}
pub fn (mut sem Semaphore) post() {
C.sem_post(&sem.sem)
}
pub fn (mut sem Semaphore) wait() {
C.sem_wait(&sem.sem)
}
pub fn (mut sem Semaphore) try_wait() bool {
return C.sem_trywait(&sem.sem) == 0
}
pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
t_spec := timeout.timespec()
return C.sem_timedwait(&sem.sem, &t_spec) == 0
}
pub fn (sem Semaphore) destroy() bool {
return C.sem_destroy(&sem.sem) == 0
}

212
vlib/sync/sync_macos.c.v Normal file
View File

@ -0,0 +1,212 @@
// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved.
// Use of this source code is governed by an MIT license
// that can be found in the LICENSE file.
module sync
import time
#flag -lpthread
#include <semaphore.h>
[trusted]
fn C.pthread_mutex_init(voidptr, voidptr) int
fn C.pthread_mutex_lock(voidptr) int
fn C.pthread_mutex_unlock(voidptr) int
fn C.pthread_mutex_destroy(voidptr) int
fn C.pthread_rwlockattr_init(voidptr) int
fn C.pthread_rwlockattr_setkind_np(voidptr, int) int
fn C.pthread_rwlockattr_setpshared(voidptr, int) int
fn C.pthread_rwlock_init(voidptr, voidptr) int
fn C.pthread_rwlock_rdlock(voidptr) int
fn C.pthread_rwlock_wrlock(voidptr) int
fn C.pthread_rwlock_unlock(voidptr) int
fn C.pthread_condattr_init(voidptr) int
fn C.pthread_condattr_setpshared(voidptr, int) int
fn C.pthread_condattr_destroy(voidptr) int
fn C.pthread_cond_init(voidptr, voidptr) int
fn C.pthread_cond_signal(voidptr) int
fn C.pthread_cond_wait(voidptr, voidptr) int
fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int
fn C.pthread_cond_destroy(voidptr) int
// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
pub struct Mutex {
mutex C.pthread_mutex_t
}
pub struct RwMutex {
mutex C.pthread_rwlock_t
}
struct RwMutexAttr {
attr C.pthread_rwlockattr_t
}
struct CondAttr {
attr C.pthread_condattr_t
}
/* MacOSX has no unnamed semaphores and no `timed_wait()` at all
so we emulate the behaviour with other devices */
struct Semaphore {
mtx C.pthread_mutex_t
cond C.pthread_cond_t
mut:
count u32
}
pub fn new_mutex() &Mutex {
mut m := &Mutex{}
m.init()
return m
}
pub fn (mut m Mutex) init() {
C.pthread_mutex_init(&m.mutex, C.NULL)
}
pub fn new_rwmutex() &RwMutex {
mut m := &RwMutex{}
m.init()
return m
}
pub fn (mut m RwMutex) init() {
a := RwMutexAttr{}
C.pthread_rwlockattr_init(&a.attr)
// Give writer priority over readers
C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_rwlock_init(&m.mutex, &a.attr)
}
// m_lock(), for *manual* mutex handling, since `lock` is a keyword
pub fn (mut m Mutex) m_lock() {
C.pthread_mutex_lock(&m.mutex)
}
pub fn (mut m Mutex) unlock() {
C.pthread_mutex_unlock(&m.mutex)
}
// RwMutex has separate read- and write locks
pub fn (mut m RwMutex) r_lock() {
C.pthread_rwlock_rdlock(&m.mutex)
}
pub fn (mut m RwMutex) w_lock() {
C.pthread_rwlock_wrlock(&m.mutex)
}
// Windows SRWLocks have different function to unlock
// So provide two functions here, too, to have a common interface
pub fn (mut m RwMutex) r_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
pub fn (mut m RwMutex) w_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
[inline]
pub fn new_semaphore() &Semaphore {
return new_semaphore_init(0)
}
pub fn new_semaphore_init(n u32) &Semaphore {
mut sem := &Semaphore{}
sem.init(n)
return sem
}
pub fn (mut sem Semaphore) init(n u32) {
C.atomic_store_u32(&sem.count, n)
C.pthread_mutex_init(&sem.mtx, C.NULL)
attr := CondAttr{}
C.pthread_condattr_init(&attr.attr)
C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_cond_init(&sem.cond, &attr.attr)
C.pthread_condattr_destroy(&attr.attr)
}
pub fn (mut sem Semaphore) post() {
mut c := C.atomic_load_u32(&sem.count)
for c > 1 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c+1) { return }
}
C.pthread_mutex_lock(&sem.mtx)
c = C.atomic_fetch_add_u32(&sem.count, 1)
if c == 0 {
C.pthread_cond_signal(&sem.cond)
}
C.pthread_mutex_unlock(&sem.mtx)
}
pub fn (mut sem Semaphore) wait() {
mut c := C.atomic_load_u32(&sem.count)
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return }
}
C.pthread_mutex_lock(&sem.mtx)
c = C.atomic_load_u32(&sem.count)
for {
if c == 0 {
C.pthread_cond_wait(&sem.cond, &sem.mtx)
c = C.atomic_load_u32(&sem.count)
}
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) {
if c > 1 {
C.pthread_cond_signal(&sem.cond)
}
goto unlock
}
}
}
unlock:
C.pthread_mutex_unlock(&sem.mtx)
}
pub fn (mut sem Semaphore) try_wait() bool {
mut c := C.atomic_load_u32(&sem.count)
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true }
}
return false
}
pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
mut c := C.atomic_load_u32(&sem.count)
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true }
}
C.pthread_mutex_lock(&sem.mtx)
t_spec := timeout.timespec()
mut res := 0
c = C.atomic_load_u32(&sem.count)
for {
if c == 0 {
res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec)
if res == C.ETIMEDOUT {
goto unlock
}
c = C.atomic_load_u32(&sem.count)
}
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) {
if c > 1 {
C.pthread_cond_signal(&sem.cond)
}
goto unlock
}
}
}
unlock:
C.pthread_mutex_unlock(&sem.mtx)
return res == 0
}
pub fn (mut sem Semaphore) destroy() bool {
return C.pthread_cond_destroy(&sem.cond) == 0 &&
C.pthread_mutex_destroy(&sem.mtx) == 0
}

View File

@ -1,153 +0,0 @@
// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved.
// Use of this source code is governed by an MIT license
// that can be found in the LICENSE file.
module sync
import time
#flag -lpthread
$if macos {
#include <dispatch/dispatch.h>
}
#include <semaphore.h>
// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
[ref_only]
pub struct Mutex {
mutex C.pthread_mutex_t
}
[ref_only]
pub struct RwMutex {
mutex C.pthread_rwlock_t
}
[ref_only]
struct RwMutexAttr {
attr C.pthread_rwlockattr_t
}
/* MacOSX has no unnamed semaphores and no `timed_wait()` at all
so we emulate the behaviour with other devices */
// [ref_only]
// struct MacOSX_Semaphore {
// sem C.dispatch_semaphore_t
// }
[ref_only]
struct PosixSemaphore {
sem C.sem_t
}
pub struct Semaphore {
mut:
sem voidptr
}
pub fn new_mutex() &Mutex {
m := &Mutex{}
C.pthread_mutex_init(&m.mutex, C.NULL)
return m
}
pub fn new_rwmutex() &RwMutex {
m := &RwMutex{}
a := &RwMutexAttr{}
C.pthread_rwlockattr_init(&a.attr)
// Give writer priority over readers
C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_rwlock_init(&m.mutex, &a.attr)
return m
}
// m_lock(), for *manual* mutex handling, since `lock` is a keyword
pub fn (mut m Mutex) m_lock() {
C.pthread_mutex_lock(&m.mutex)
}
pub fn (mut m Mutex) unlock() {
C.pthread_mutex_unlock(&m.mutex)
}
// RwMutex has separate read- and write locks
pub fn (mut m RwMutex) r_lock() {
C.pthread_rwlock_rdlock(&m.mutex)
}
pub fn (mut m RwMutex) w_lock() {
C.pthread_rwlock_wrlock(&m.mutex)
}
// Windows SRWLocks have different function to unlock
// So provide two functions here, too, to have a common interface
pub fn (mut m RwMutex) r_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
pub fn (mut m RwMutex) w_unlock() {
C.pthread_rwlock_unlock(&m.mutex)
}
[inline]
pub fn new_semaphore() Semaphore {
return new_semaphore_init(0)
}
pub fn new_semaphore_init(n u32) Semaphore {
$if macos {
s := Semaphore{
sem: C.dispatch_semaphore_create(n)
}
return s
} $else {
s := Semaphore{
sem: &PosixSemaphore{}
}
unsafe { C.sem_init(&&PosixSemaphore(s.sem).sem, 0, n) }
return s
}
}
pub fn (s Semaphore) post() {
$if macos {
C.dispatch_semaphore_signal(s.sem)
} $else {
unsafe { C.sem_post(&&PosixSemaphore(s.sem).sem) }
}
}
pub fn (s Semaphore) wait() {
$if macos {
C.dispatch_semaphore_wait(s.sem, C.DISPATCH_TIME_FOREVER)
} $else {
unsafe { C.sem_wait(&&PosixSemaphore(s.sem).sem) }
}
}
pub fn (s Semaphore) try_wait() bool {
$if macos {
return C.dispatch_semaphore_wait(s.sem, C.DISPATCH_TIME_NOW) == 0
} $else {
return unsafe { C.sem_trywait(&&PosixSemaphore(s.sem).sem) == 0 }
}
}
pub fn (s Semaphore) timed_wait(timeout time.Duration) bool {
$if macos {
return C.dispatch_semaphore_wait(s.sem, C.dispatch_time(C.DISPATCH_TIME_NOW, i64(timeout))) == 0
} $else {
t_spec := timeout.timespec()
return unsafe { C.sem_timedwait(&&PosixSemaphore(s.sem).sem, &t_spec) == 0 }
}
}
pub fn (s Semaphore) destroy() bool {
$if macos {
for s.try_wait() {}
C.dispatch_release(s.sem)
return true
} $else {
return unsafe { C.sem_destroy(&&PosixSemaphore(s.sem).sem) == 0 }
}
}

View File

@ -5,6 +5,10 @@ module sync
import time import time
fn C.InitializeConditionVariable(voidptr)
fn C.WakeConditionVariable(voidptr)
fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int
// TODO: The suggestion of using CriticalSection instead of mutex // TODO: The suggestion of using CriticalSection instead of mutex
// was discussed. Needs consideration. // was discussed. Needs consideration.
@ -15,90 +19,50 @@ type SHANDLE = voidptr
//[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. //[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
[ref_only] // `SRWLOCK` is much more performant that `Mutex` on Windows, so use that in both cases since we don't want to share with other processes
pub struct Mutex { pub struct Mutex {
mut: mut:
mx MHANDLE // mutex handle mx C.SRWLOCK // mutex handle
state MutexState // mutex state
cycle_wait i64 // waiting cycles (implemented only with atomic)
cycle_woken i64 // woken cycles ^
reader_sem u32 // reader semarphone
writer_sem u32 // writer semaphones
} }
[ref_only]
pub struct RwMutex { pub struct RwMutex {
mut: mut:
mx C.SRWLOCK // mutex handle mx C.SRWLOCK // mutex handle
} }
pub struct Semaphore { struct Semaphore {
mtx C.SRWLOCK
cond C.CONDITION_VARIABLE
mut: mut:
sem SHANDLE count u32
}
enum MutexState {
broken
waiting
released
abandoned
destroyed
} }
pub fn new_mutex() &Mutex { pub fn new_mutex() &Mutex {
sm := &Mutex{} mut m := &Mutex{}
unsafe { m.init()
mut m := sm
m.mx = MHANDLE(C.CreateMutex(0, false, 0))
if isnil(m.mx) {
m.state = .broken // handle broken and mutex state are broken
return sm
}
}
return sm
}
pub fn new_rwmutex() &RwMutex {
m := &RwMutex{}
C.InitializeSRWLock(&m.mx)
return m return m
} }
pub fn new_rwmutex() &RwMutex {
mut m := &RwMutex{}
m.init()
return m
}
pub fn (mut m Mutex) init() {
C.InitializeSRWLock(&m.mx)
}
pub fn (mut m RwMutex) init() {
C.InitializeSRWLock(&m.mx)
}
pub fn (mut m Mutex) m_lock() { pub fn (mut m Mutex) m_lock() {
// if mutex handle not initalized C.AcquireSRWLockExclusive(&m.mx)
if isnil(m.mx) {
m.mx = MHANDLE(C.CreateMutex(0, false, 0))
if isnil(m.mx) {
m.state = .broken // handle broken and mutex state are broken
return
}
}
state := C.WaitForSingleObject(m.mx, C.INFINITE) // infinite wait
/* TODO fix match/enum combo
m.state = match state {
C.WAIT_ABANDONED { .abandoned }
C.WAIT_OBJECT_0 { .waiting }
else { .broken }
}
*/
if state == C.WAIT_ABANDONED {
m.state = .abandoned
// FIXME Use C constant instead
} else if state == 0 /* C.WAIT_OBJECT_0 */ {
m.state = .waiting
} else {
m.state = .broken
}
} }
pub fn (mut m Mutex) unlock() { pub fn (mut m Mutex) unlock() {
if m.state == .waiting { C.ReleaseSRWLockExclusive(&m.mx)
if C.ReleaseMutex(m.mx) {
m.state = .broken
return
}
}
m.state = .released
} }
// RwMutex has separate read- and write locks // RwMutex has separate read- and write locks
@ -121,40 +85,103 @@ pub fn (mut m RwMutex) w_unlock() {
} }
pub fn (mut m Mutex) destroy() { pub fn (mut m Mutex) destroy() {
if m.state == .waiting { // nothing to do
m.unlock() // unlock mutex before destroying
}
C.CloseHandle(m.mx) // destroy mutex
m.state = .destroyed // setting up reference to invalid state
} }
[inline] [inline]
pub fn new_semaphore() Semaphore { pub fn new_semaphore() &Semaphore {
return new_semaphore_init(0) return new_semaphore_init(0)
} }
pub fn new_semaphore_init(n u32) Semaphore { pub fn new_semaphore_init(n u32) &Semaphore {
return Semaphore{ mut sem := &Semaphore{}
sem: SHANDLE(C.CreateSemaphore(0, n, C.INT32_MAX, 0)) sem.init(n)
return sem
}
pub fn (mut sem Semaphore) init(n u32) {
C.atomic_store_u32(&sem.count, n)
C.InitializeSRWLock(&sem.mtx)
C.InitializeConditionVariable(&sem.cond)
}
pub fn (mut sem Semaphore) post() {
mut c := C.atomic_load_u32(&sem.count)
for c > 1 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c+1) { return }
} }
C.AcquireSRWLockExclusive(&sem.mtx)
c = C.atomic_fetch_add_u32(&sem.count, 1)
if c == 0 {
C.WakeConditionVariable(&sem.cond)
}
C.ReleaseSRWLockExclusive(&sem.mtx)
} }
pub fn (s Semaphore) post() { pub fn (mut sem Semaphore) wait() {
C.ReleaseSemaphore(s.sem, 1, 0) mut c := C.atomic_load_u32(&sem.count)
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return }
}
C.AcquireSRWLockExclusive(&sem.mtx)
c = C.atomic_load_u32(&sem.count)
for {
if c == 0 {
C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, C.INFINITE, 0)
c = C.atomic_load_u32(&sem.count)
}
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) {
if c > 1 {
C.WakeConditionVariable(&sem.cond)
}
goto unlock
}
}
}
unlock:
C.ReleaseSRWLockExclusive(&sem.mtx)
} }
pub fn (s Semaphore) wait() { pub fn (mut sem Semaphore) try_wait() bool {
C.WaitForSingleObject(s.sem, C.INFINITE) mut c := C.atomic_load_u32(&sem.count)
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true }
}
return false
} }
pub fn (s Semaphore) try_wait() bool { pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
return C.WaitForSingleObject(s.sem, 0) == 0 mut c := C.atomic_load_u32(&sem.count)
} for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true }
pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { }
return C.WaitForSingleObject(s.sem, timeout / time.millisecond) == 0 C.AcquireSRWLockExclusive(&sem.mtx)
t_ms := u32(timeout / time.millisecond)
mut res := 0
c = C.atomic_load_u32(&sem.count)
for {
if c == 0 {
res = C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, t_ms, 0)
if res == 0 {
goto unlock
}
c = C.atomic_load_u32(&sem.count)
}
for c > 0 {
if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) {
if c > 1 {
C.WakeConditionVariable(&sem.cond)
}
goto unlock
}
}
}
unlock:
C.ReleaseSRWLockExclusive(&sem.mtx)
return res != 0
} }
pub fn (s Semaphore) destroy() bool { pub fn (s Semaphore) destroy() bool {
return C.CloseHandle(s.sem) != 0 return true
} }

View File

@ -14,7 +14,7 @@ module sync
struct WaitGroup { struct WaitGroup {
mut: mut:
task_count int // current task count task_count int // current task count
task_count_mutex &Mutex = &Mutex(0) // This mutex protects the task_count count in add() task_count_mutex &Mutex // This mutex protects the task_count count in add()
wait_blocker &Waiter = &Waiter(0) // This blocks the wait() until released by add() wait_blocker &Waiter = &Waiter(0) // This blocks the wait() until released by add()
} }

View File

@ -1,5 +1,5 @@
vlib/v/checker/tests/chan_ref.vv:10:8: error: cannot push non-reference `St` on `chan &St` vlib/v/checker/tests/chan_ref.vv:10:8: error: cannot push non-reference `St` on `chan &St`
8 | fn f(ch chan &St, sem sync.Semaphore) { 8 | fn f(ch chan &St, mut sem sync.Semaphore) {
9 | w := St{} 9 | w := St{}
10 | ch <- w 10 | ch <- w
| ^ | ^

View File

@ -5,7 +5,7 @@ mut:
n int n int
} }
fn f(ch chan &St, sem sync.Semaphore) { fn f(ch chan &St, mut sem sync.Semaphore) {
w := St{} w := St{}
ch <- w ch <- w
mut x := St{} mut x := St{}
@ -21,8 +21,8 @@ fn f(ch chan &St, sem sync.Semaphore) {
fn main() { fn main() {
c := chan &St{} c := chan &St{}
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
go f(c, sem) go f(c, mut sem)
y := <-c y := <-c
// this should fail // this should fail
mut z := <-c mut z := <-c

View File

@ -17,7 +17,7 @@ fn (mut g Gen) array_init(it ast.ArrayInit) {
if g.is_shared { if g.is_shared {
mut shared_typ := it.typ.set_flag(.shared_f) mut shared_typ := it.typ.set_flag(.shared_f)
shared_styp = g.typ(shared_typ) shared_styp = g.typ(shared_typ)
g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ') g.writeln('($shared_styp*)__dup_shared_array(&($shared_styp){.val = ')
} else { } else {
g.write('($styp*)memdup(ADDR($styp, ') g.write('($styp*)memdup(ADDR($styp, ')
} }
@ -95,7 +95,7 @@ fn (mut g Gen) array_init(it ast.ArrayInit) {
} }
if is_amp { if is_amp {
if g.is_shared { if g.is_shared {
g.write(', .mtx = sync__new_rwmutex()}, sizeof($shared_styp))') g.write('}, sizeof($shared_styp))')
} else { } else {
g.write('), sizeof($styp))') g.write('), sizeof($styp))')
} }

View File

@ -324,6 +324,7 @@ pub fn cgen(files []ast.File, table &table.Table, pref &pref.Preferences) string
if g.shared_types.len > 0 { if g.shared_types.len > 0 {
b.writeln('\n// V shared types:') b.writeln('\n// V shared types:')
b.write(g.shared_types.str()) b.write(g.shared_types.str())
b.write(c_concurrency_helpers)
} }
if g.channel_definitions.len > 0 { if g.channel_definitions.len > 0 {
b.writeln('\n// V channel code:') b.writeln('\n// V channel code:')
@ -572,7 +573,12 @@ fn (mut g Gen) find_or_register_shared(t table.Type, base string) string {
return sh_typ return sh_typ
} }
mtx_typ := 'sync__RwMutex' mtx_typ := 'sync__RwMutex'
g.shared_types.writeln('struct $sh_typ { $base val; $mtx_typ* mtx; };') g.shared_types.writeln('struct $sh_typ { $base val; $mtx_typ mtx; };')
g.shared_types.writeln('static inline voidptr __dup${sh_typ}(voidptr src, int sz) {')
g.shared_types.writeln('\t$sh_typ* dest = memdup(src, sz);')
g.shared_types.writeln('\tsync__RwMutex_init(&dest->mtx);')
g.shared_types.writeln('\treturn dest;')
g.shared_types.writeln('}')
g.typedefs2.writeln('typedef struct $sh_typ $sh_typ;') g.typedefs2.writeln('typedef struct $sh_typ $sh_typ;')
// println('registered shared type $sh_typ') // println('registered shared type $sh_typ')
g.shareds << t_idx g.shareds << t_idx
@ -2707,7 +2713,7 @@ fn (mut g Gen) expr(node ast.Expr) {
if g.is_shared { if g.is_shared {
mut shared_typ := node.typ.set_flag(.shared_f) mut shared_typ := node.typ.set_flag(.shared_f)
shared_styp = g.typ(shared_typ) shared_styp = g.typ(shared_typ)
g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ') g.writeln('($shared_styp*)__dup_shared_map(&($shared_styp){.val = ')
} else { } else {
styp = g.typ(node.typ) styp = g.typ(node.typ)
g.write('($styp*)memdup(ADDR($styp, ') g.write('($styp*)memdup(ADDR($styp, ')
@ -2742,7 +2748,7 @@ fn (mut g Gen) expr(node ast.Expr) {
g.write('new_map_2(sizeof($key_typ_str), sizeof($value_typ_str), $hash_fn, $key_eq_fn, $clone_fn, $free_fn)') g.write('new_map_2(sizeof($key_typ_str), sizeof($value_typ_str), $hash_fn, $key_eq_fn, $clone_fn, $free_fn)')
} }
if g.is_shared { if g.is_shared {
g.write(', .mtx = sync__new_rwmutex()}') g.write('}')
if is_amp { if is_amp {
g.write(', sizeof($shared_styp))') g.write(', sizeof($shared_styp))')
} }
@ -2763,7 +2769,7 @@ fn (mut g Gen) expr(node ast.Expr) {
} }
ast.PostfixExpr { ast.PostfixExpr {
if node.auto_locked != '' { if node.auto_locked != '' {
g.writeln('sync__RwMutex_w_lock($node.auto_locked->mtx);') g.writeln('sync__RwMutex_w_lock(&$node.auto_locked->mtx);')
} }
g.inside_map_postfix = true g.inside_map_postfix = true
g.expr(node.expr) g.expr(node.expr)
@ -2771,7 +2777,7 @@ fn (mut g Gen) expr(node ast.Expr) {
g.write(node.op.str()) g.write(node.op.str())
if node.auto_locked != '' { if node.auto_locked != '' {
g.writeln(';') g.writeln(';')
g.write('sync__RwMutex_w_unlock($node.auto_locked->mtx)') g.write('sync__RwMutex_w_unlock(&$node.auto_locked->mtx)')
} }
} }
ast.PrefixExpr { ast.PrefixExpr {
@ -3022,7 +3028,7 @@ fn (mut g Gen) infix_expr(node ast.InfixExpr) {
// string + string, string == string etc // string + string, string == string etc
// g.infix_op = node.op // g.infix_op = node.op
if node.auto_locked != '' { if node.auto_locked != '' {
g.writeln('sync__RwMutex_w_lock($node.auto_locked->mtx);') g.writeln('sync__RwMutex_w_lock(&$node.auto_locked->mtx);')
} }
left_type := g.unwrap_generic(node.left_type) left_type := g.unwrap_generic(node.left_type)
left_sym := g.table.get_type_symbol(left_type) left_sym := g.table.get_type_symbol(left_type)
@ -3412,7 +3418,7 @@ fn (mut g Gen) infix_expr(node ast.InfixExpr) {
} }
if node.auto_locked != '' { if node.auto_locked != '' {
g.writeln(';') g.writeln(';')
g.write('sync__RwMutex_w_unlock($node.auto_locked->mtx)') g.write('sync__RwMutex_w_unlock(&$node.auto_locked->mtx)')
} }
} }
@ -3423,7 +3429,7 @@ fn (mut g Gen) lock_expr(node ast.LockExpr) {
deref := if id.is_mut { '->' } else { '.' } deref := if id.is_mut { '->' } else { '.' }
lock_prefix := if node.is_rlock { `r` } else { `w` } lock_prefix := if node.is_rlock { `r` } else { `w` }
lock_prefixes << lock_prefix // keep for unlock lock_prefixes << lock_prefix // keep for unlock
g.writeln('sync__RwMutex_${lock_prefix:c}_lock($name${deref}mtx);') g.writeln('sync__RwMutex_${lock_prefix:c}_lock(&$name${deref}mtx);')
} }
g.stmts(node.stmts) g.stmts(node.stmts)
// unlock in reverse order // unlock in reverse order
@ -3432,7 +3438,7 @@ fn (mut g Gen) lock_expr(node ast.LockExpr) {
lock_prefix := lock_prefixes[i] lock_prefix := lock_prefixes[i]
name := id.name name := id.name
deref := if id.is_mut { '->' } else { '.' } deref := if id.is_mut { '->' } else { '.' }
g.writeln('sync__RwMutex_${lock_prefix:c}_unlock($name${deref}mtx);') g.writeln('sync__RwMutex_${lock_prefix:c}_unlock(&$name${deref}mtx);')
} }
} }
@ -4715,7 +4721,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) {
if g.is_shared { if g.is_shared {
mut shared_typ := struct_init.typ.set_flag(.shared_f) mut shared_typ := struct_init.typ.set_flag(.shared_f)
shared_styp = g.typ(shared_typ) shared_styp = g.typ(shared_typ)
g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ($styp){') g.writeln('($shared_styp*)__dup${shared_styp}(&($shared_styp){.val = ($styp){')
} else { } else {
g.write('($styp*)memdup(&($styp){') g.write('($styp*)memdup(&($styp){')
} }
@ -4728,6 +4734,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) {
} }
} else { } else {
if g.is_shared { if g.is_shared {
// TODO: non-ref shared should be forbidden
g.writeln('{.val = {') g.writeln('{.val = {')
} else if is_multiline { } else if is_multiline {
g.writeln('($styp){') g.writeln('($styp){')
@ -4897,7 +4904,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) {
} }
g.write('}') g.write('}')
if g.is_shared { if g.is_shared {
g.write(', .mtx = sync__new_rwmutex()}') g.write('}')
if is_amp { if is_amp {
g.write(', sizeof($shared_styp))') g.write(', sizeof($shared_styp))')
} }

View File

@ -15,6 +15,22 @@ const (
#ifndef V_CURRENT_COMMIT_HASH #ifndef V_CURRENT_COMMIT_HASH
#define V_CURRENT_COMMIT_HASH "@@@" #define V_CURRENT_COMMIT_HASH "@@@"
#endif #endif
'
c_concurrency_helpers = '
typedef struct __shared_map __shared_map;
struct __shared_map { map val; sync__RwMutex mtx; };
static inline voidptr __dup_shared_map(voidptr src, int sz) {
__shared_map* dest = memdup(src, sz);
sync__RwMutex_init(&dest->mtx);
return dest;
}
typedef struct __shared_array __shared_array;
struct __shared_array { array val; sync__RwMutex mtx; };
static inline voidptr __dup_shared_array(voidptr src, int sz) {
__shared_array* dest = memdup(src, sz);
sync__RwMutex_init(&dest->mtx);
return dest;
}
' '
c_common_macros = ' c_common_macros = '
#define EMPTY_VARG_INITIALIZATION 0 #define EMPTY_VARG_INITIALIZATION 0

View File

@ -4,7 +4,7 @@ const (
iterations_per_thread2 = 100000 iterations_per_thread2 = 100000
) )
fn inc_elements(shared foo []int, n int, sem sync.Semaphore) { fn inc_elements(shared foo []int, n int, mut sem sync.Semaphore) {
for _ in 0 .. iterations_per_thread2 { for _ in 0 .. iterations_per_thread2 {
foo[n]++ foo[n]++
} }
@ -13,9 +13,9 @@ fn inc_elements(shared foo []int, n int, sem sync.Semaphore) {
fn test_autolocked_array_2() { fn test_autolocked_array_2() {
shared abc := &[0, 0, 0] shared abc := &[0, 0, 0]
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
go inc_elements(shared abc, 1, sem) go inc_elements(shared abc, 1, mut sem)
go inc_elements(shared abc, 2, sem) go inc_elements(shared abc, 2, mut sem)
for _ in 0 .. iterations_per_thread2 { for _ in 0 .. iterations_per_thread2 {
unsafe { unsafe {
abc[2]++ abc[2]++

View File

@ -4,7 +4,7 @@ const (
signals_per_thread = 100000 signals_per_thread = 100000
) )
fn send_signals(sem sync.Semaphore, sem_end sync.Semaphore) { fn send_signals(mut sem sync.Semaphore, mut sem_end sync.Semaphore) {
for _ in 0 .. signals_per_thread { for _ in 0 .. signals_per_thread {
sem.post() sem.post()
} }
@ -12,10 +12,10 @@ fn send_signals(sem sync.Semaphore, sem_end sync.Semaphore) {
} }
fn test_semaphores() { fn test_semaphores() {
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
sem_end := sync.new_semaphore() mut sem_end := sync.new_semaphore()
go send_signals(sem, sem_end) go send_signals(mut sem, mut sem_end)
go send_signals(sem, sem_end) go send_signals(mut sem, mut sem_end)
for _ in 0 .. 2 * signals_per_thread { for _ in 0 .. 2 * signals_per_thread {
sem.wait() sem.wait()
} }

View File

@ -1,7 +1,7 @@
import sync import sync
import time import time
fn run_forever(shared foo []int, sem sync.Semaphore) { fn run_forever(shared foo []int, mut sem sync.Semaphore) {
for { for {
foo[0]++ foo[0]++
} }
@ -10,8 +10,8 @@ fn run_forever(shared foo []int, sem sync.Semaphore) {
fn test_semaphore() { fn test_semaphore() {
shared abc := &[0] shared abc := &[0]
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
go run_forever(shared abc, sem) go run_forever(shared abc, mut sem)
for _ in 0 .. 1000 { for _ in 0 .. 1000 {
unsafe { abc[0]-- } unsafe { abc[0]-- }
} }

View File

@ -1,6 +1,6 @@
import sync import sync
fn incr(shared foo map[string]int, key string, sem sync.Semaphore) { fn incr(shared foo map[string]int, key string, mut sem sync.Semaphore) {
for _ in 0 .. 100000 { for _ in 0 .. 100000 {
lock foo { lock foo {
foo[key] = foo[key] + 1 foo[key] = foo[key] + 1
@ -14,11 +14,11 @@ fn test_shared_array() {
lock foo { lock foo {
foo['q'] = 20 foo['q'] = 20
} }
sem := sync.new_semaphore() mut sem := sync.new_semaphore()
go incr(shared foo, 'p', sem) go incr(shared foo, 'p', mut sem)
go incr(shared foo, 'q', sem) go incr(shared foo, 'q', mut sem)
go incr(shared foo, 'p', sem) go incr(shared foo, 'p', mut sem)
go incr(shared foo, 'q', sem) go incr(shared foo, 'q', mut sem)
for _ in 0 .. 50000 { for _ in 0 .. 50000 {
lock foo { lock foo {
foo['p'] -= 2 foo['p'] -= 2