mirror of
https://github.com/vlang/v.git
synced 2023-08-10 21:13:21 +03:00
644 lines
16 KiB
V
644 lines
16 KiB
V
module sync
|
|
|
|
import time
|
|
import rand
|
|
import sync.stdatomic
|
|
|
|
const aops_used = stdatomic.used
|
|
|
|
const (
|
|
// how often to try to get data without blocking before to wait for semaphore
|
|
spinloops = 750
|
|
spinloops_sem = 4000
|
|
)
|
|
|
|
enum BufferElemStat {
|
|
unused = 0
|
|
writing
|
|
written
|
|
reading
|
|
}
|
|
|
|
struct Subscription {
|
|
mut:
|
|
sem &Semaphore = unsafe { nil }
|
|
prev &&Subscription = unsafe { nil }
|
|
nxt &Subscription = unsafe { nil }
|
|
}
|
|
|
|
pub enum Direction {
|
|
pop
|
|
push
|
|
}
|
|
|
|
pub struct Channel {
|
|
ringbuf &u8 = unsafe { nil } // queue for buffered channels
|
|
statusbuf &u8 = unsafe { nil } // flags to synchronize write/read in ringbuf
|
|
objsize u32
|
|
mut: // atomic
|
|
writesem Semaphore // to wake thread that wanted to write, but buffer was full
|
|
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
|
|
writesem_im Semaphore
|
|
readsem_im Semaphore
|
|
write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
|
|
read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
|
|
adr_read C.atomic_uintptr_t // used to identify origin of writesem
|
|
adr_written C.atomic_uintptr_t // used to identify origin of readsem
|
|
write_free u32 // for queue state
|
|
read_avail u32
|
|
buf_elem_write_idx u32
|
|
buf_elem_read_idx u32
|
|
// for select
|
|
write_subscriber &Subscription = unsafe { nil }
|
|
read_subscriber &Subscription = unsafe { nil }
|
|
write_sub_mtx u16
|
|
read_sub_mtx u16
|
|
closed u16
|
|
pub:
|
|
cap u32 // queue length in #objects
|
|
}
|
|
|
|
pub fn new_channel[T](n u32) &Channel {
|
|
st := if sizeof(T) > 0 { sizeof(T) } else { 1 }
|
|
if isreftype(T) {
|
|
return new_channel_st(n, st)
|
|
} else {
|
|
return new_channel_st_noscan(n, st)
|
|
}
|
|
}
|
|
|
|
fn new_channel_st(n u32, st u32) &Channel {
|
|
wsem := if n > 0 { n } else { 1 }
|
|
rsem := if n > 0 { u32(0) } else { 1 }
|
|
rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &u8(0) }
|
|
sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &u8(0) }
|
|
mut ch := Channel{
|
|
objsize: st
|
|
cap: n
|
|
write_free: n
|
|
read_avail: 0
|
|
ringbuf: rbuf
|
|
statusbuf: sbuf
|
|
write_subscriber: 0
|
|
read_subscriber: 0
|
|
}
|
|
ch.writesem.init(wsem)
|
|
ch.readsem.init(rsem)
|
|
ch.writesem_im.init(0)
|
|
ch.readsem_im.init(0)
|
|
return &ch
|
|
}
|
|
|
|
fn new_channel_st_noscan(n u32, st u32) &Channel {
|
|
$if gcboehm_opt ? {
|
|
wsem := if n > 0 { n } else { 1 }
|
|
rsem := if n > 0 { u32(0) } else { 1 }
|
|
rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &u8(0) }
|
|
sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &u8(0) }
|
|
mut ch := Channel{
|
|
objsize: st
|
|
cap: n
|
|
write_free: n
|
|
read_avail: 0
|
|
ringbuf: rbuf
|
|
statusbuf: sbuf
|
|
write_subscriber: 0
|
|
read_subscriber: 0
|
|
}
|
|
ch.writesem.init(wsem)
|
|
ch.readsem.init(rsem)
|
|
ch.writesem_im.init(0)
|
|
ch.readsem_im.init(0)
|
|
return &ch
|
|
} $else {
|
|
return new_channel_st(n, st)
|
|
}
|
|
}
|
|
|
|
pub fn (ch &Channel) auto_str(typename string) string {
|
|
return 'chan ${typename}{cap: ${ch.cap}, closed: ${ch.closed}}'
|
|
}
|
|
|
|
pub fn (mut ch Channel) close() {
|
|
open_val := u16(0)
|
|
if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) {
|
|
return
|
|
}
|
|
mut nulladr := unsafe { nil }
|
|
for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, &nulladr,
|
|
voidptr(-1)) {
|
|
nulladr = unsafe { nil }
|
|
}
|
|
ch.readsem_im.post()
|
|
ch.readsem.post()
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
if ch.read_subscriber != unsafe { nil } {
|
|
ch.read_subscriber.sem.post()
|
|
}
|
|
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
|
|
null16 = u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
if ch.write_subscriber != unsafe { nil } {
|
|
ch.write_subscriber.sem.post()
|
|
}
|
|
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
|
|
ch.writesem.post()
|
|
if ch.cap == 0 {
|
|
C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil })
|
|
}
|
|
ch.writesem_im.post()
|
|
}
|
|
|
|
[inline]
|
|
pub fn (mut ch Channel) len() int {
|
|
return int(C.atomic_load_u32(&ch.read_avail))
|
|
}
|
|
|
|
[inline]
|
|
pub fn (mut ch Channel) closed() bool {
|
|
return C.atomic_load_u16(&ch.closed) != 0
|
|
}
|
|
|
|
[inline]
|
|
pub fn (mut ch Channel) push(src voidptr) {
|
|
if ch.try_push_priv(src, false) == .closed {
|
|
panic('push on closed channel')
|
|
}
|
|
}
|
|
|
|
[inline]
|
|
pub fn (mut ch Channel) try_push(src voidptr) ChanState {
|
|
return ch.try_push_priv(src, true)
|
|
}
|
|
|
|
fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
|
|
if C.atomic_load_u16(&ch.closed) != 0 {
|
|
return .closed
|
|
}
|
|
spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem }
|
|
mut have_swapped := false
|
|
for {
|
|
mut got_sem := false
|
|
mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
|
|
for wradr != C.NULL {
|
|
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) },
|
|
&wradr, unsafe { nil })
|
|
{
|
|
// there is a reader waiting for us
|
|
unsafe { C.memcpy(wradr, src, ch.objsize) }
|
|
mut nulladr := unsafe { nil }
|
|
for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) },
|
|
&nulladr, wradr) {
|
|
nulladr = unsafe { nil }
|
|
}
|
|
ch.readsem_im.post()
|
|
return .success
|
|
}
|
|
}
|
|
if no_block && ch.cap == 0 {
|
|
return .not_ready
|
|
}
|
|
// get token to read
|
|
for _ in 0 .. spinloops_sem_ {
|
|
if got_sem {
|
|
break
|
|
}
|
|
got_sem = ch.writesem.try_wait()
|
|
}
|
|
if !got_sem {
|
|
if no_block {
|
|
return .not_ready
|
|
}
|
|
ch.writesem.wait()
|
|
}
|
|
if C.atomic_load_u16(&ch.closed) != 0 {
|
|
ch.writesem.post()
|
|
return .closed
|
|
}
|
|
if ch.cap == 0 {
|
|
// try to advertise current object as readable
|
|
mut read_in_progress := false
|
|
C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src)
|
|
wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
|
|
if wradr != C.NULL {
|
|
mut src2 := src
|
|
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) },
|
|
&src2, unsafe { nil })
|
|
{
|
|
ch.writesem.post()
|
|
continue
|
|
} else {
|
|
read_in_progress = true
|
|
}
|
|
}
|
|
if !read_in_progress {
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16,
|
|
u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
if ch.read_subscriber != unsafe { nil } {
|
|
ch.read_subscriber.sem.post()
|
|
}
|
|
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
|
|
}
|
|
mut src2 := src
|
|
for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
|
|
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) },
|
|
&src2, unsafe { nil })
|
|
{
|
|
have_swapped = true
|
|
read_in_progress = true
|
|
break
|
|
}
|
|
src2 = src
|
|
}
|
|
mut got_im_sem := false
|
|
for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ {
|
|
got_im_sem = ch.writesem_im.try_wait()
|
|
if got_im_sem {
|
|
break
|
|
}
|
|
}
|
|
for {
|
|
if got_im_sem {
|
|
got_im_sem = false
|
|
} else {
|
|
ch.writesem_im.wait()
|
|
}
|
|
if C.atomic_load_u16(&ch.closed) != 0 {
|
|
if have_swapped
|
|
|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, unsafe { nil }) {
|
|
ch.writesem.post()
|
|
return .success
|
|
} else {
|
|
return .closed
|
|
}
|
|
}
|
|
if have_swapped
|
|
|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, unsafe { nil }) {
|
|
ch.writesem.post()
|
|
break
|
|
} else {
|
|
// this semaphore was not for us - repost in
|
|
ch.writesem_im.post()
|
|
if src2 == voidptr(-1) {
|
|
ch.readsem.post()
|
|
return .closed
|
|
}
|
|
src2 = src
|
|
}
|
|
}
|
|
return .success
|
|
} else {
|
|
// buffered channel
|
|
mut space_in_queue := false
|
|
mut wr_free := C.atomic_load_u32(&ch.write_free)
|
|
for wr_free > 0 {
|
|
space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free,
|
|
wr_free - 1)
|
|
if space_in_queue {
|
|
break
|
|
}
|
|
}
|
|
if space_in_queue {
|
|
mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx)
|
|
for {
|
|
mut new_wr_idx := wr_idx + 1
|
|
for new_wr_idx >= ch.cap {
|
|
new_wr_idx -= ch.cap
|
|
}
|
|
if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx,
|
|
new_wr_idx)
|
|
{
|
|
break
|
|
}
|
|
}
|
|
mut wr_ptr := ch.ringbuf
|
|
mut status_adr := ch.statusbuf
|
|
unsafe {
|
|
wr_ptr += (wr_idx * ch.objsize)
|
|
status_adr += wr_idx * sizeof(u16)
|
|
}
|
|
mut expected_status := u16(BufferElemStat.unused)
|
|
for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status,
|
|
u16(BufferElemStat.writing)) {
|
|
expected_status = u16(BufferElemStat.unused)
|
|
}
|
|
unsafe {
|
|
C.memcpy(wr_ptr, src, ch.objsize)
|
|
}
|
|
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written))
|
|
C.atomic_fetch_add_u32(&ch.read_avail, 1)
|
|
ch.readsem.post()
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
if ch.read_subscriber != unsafe { nil } {
|
|
ch.read_subscriber.sem.post()
|
|
}
|
|
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
|
|
return .success
|
|
} else {
|
|
if no_block {
|
|
return .not_ready
|
|
}
|
|
ch.writesem.post()
|
|
}
|
|
}
|
|
}
|
|
// we should not get here but the V compiler want's to see a return statement
|
|
panic('unknown `try_push_priv` state')
|
|
}
|
|
|
|
[inline]
|
|
pub fn (mut ch Channel) pop(dest voidptr) bool {
|
|
return ch.try_pop_priv(dest, false) == .success
|
|
}
|
|
|
|
[inline]
|
|
pub fn (mut ch Channel) try_pop(dest voidptr) ChanState {
|
|
return ch.try_pop_priv(dest, true)
|
|
}
|
|
|
|
fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
|
|
spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem }
|
|
mut have_swapped := false
|
|
mut write_in_progress := false
|
|
for {
|
|
mut got_sem := false
|
|
if ch.cap == 0 {
|
|
// unbuffered channel - first see if a `push()` has adversized
|
|
mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
|
|
for rdadr != C.NULL {
|
|
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) },
|
|
&rdadr, unsafe { nil })
|
|
{
|
|
// there is a writer waiting for us
|
|
unsafe { C.memcpy(dest, rdadr, ch.objsize) }
|
|
mut nulladr := unsafe { nil }
|
|
for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_read) },
|
|
&nulladr, rdadr) {
|
|
nulladr = unsafe { nil }
|
|
}
|
|
ch.writesem_im.post()
|
|
return .success
|
|
}
|
|
}
|
|
if no_block {
|
|
if C.atomic_load_u16(&ch.closed) == 0 {
|
|
return .not_ready
|
|
} else {
|
|
return .closed
|
|
}
|
|
}
|
|
}
|
|
// get token to read
|
|
for _ in 0 .. spinloops_sem_ {
|
|
if got_sem {
|
|
break
|
|
}
|
|
got_sem = ch.readsem.try_wait()
|
|
}
|
|
if !got_sem {
|
|
if no_block {
|
|
if C.atomic_load_u16(&ch.closed) == 0 {
|
|
return .not_ready
|
|
} else {
|
|
return .closed
|
|
}
|
|
}
|
|
ch.readsem.wait()
|
|
}
|
|
if ch.cap > 0 {
|
|
// try to get buffer token
|
|
mut obj_in_queue := false
|
|
mut rd_avail := C.atomic_load_u32(&ch.read_avail)
|
|
for rd_avail > 0 {
|
|
obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail,
|
|
rd_avail - 1)
|
|
if obj_in_queue {
|
|
break
|
|
}
|
|
}
|
|
if obj_in_queue {
|
|
mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx)
|
|
for {
|
|
mut new_rd_idx := rd_idx + 1
|
|
for new_rd_idx >= ch.cap {
|
|
new_rd_idx -= ch.cap
|
|
}
|
|
if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx,
|
|
new_rd_idx)
|
|
{
|
|
break
|
|
}
|
|
}
|
|
mut rd_ptr := ch.ringbuf
|
|
mut status_adr := ch.statusbuf
|
|
unsafe {
|
|
rd_ptr += rd_idx * ch.objsize
|
|
status_adr += rd_idx * sizeof(u16)
|
|
}
|
|
mut expected_status := u16(BufferElemStat.written)
|
|
for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status,
|
|
u16(BufferElemStat.reading)) {
|
|
expected_status = u16(BufferElemStat.written)
|
|
}
|
|
unsafe {
|
|
C.memcpy(dest, rd_ptr, ch.objsize)
|
|
}
|
|
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused))
|
|
C.atomic_fetch_add_u32(&ch.write_free, 1)
|
|
ch.writesem.post()
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
if ch.write_subscriber != unsafe { nil } {
|
|
ch.write_subscriber.sem.post()
|
|
}
|
|
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
|
|
return .success
|
|
}
|
|
}
|
|
// try to advertise `dest` as writable
|
|
C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest)
|
|
if ch.cap == 0 {
|
|
mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
|
|
if rdadr != C.NULL {
|
|
mut dest2 := dest
|
|
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) },
|
|
&dest2, unsafe { nil })
|
|
{
|
|
ch.readsem.post()
|
|
continue
|
|
} else {
|
|
write_in_progress = true
|
|
}
|
|
}
|
|
}
|
|
if ch.cap == 0 && !write_in_progress {
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
if ch.write_subscriber != unsafe { nil } {
|
|
ch.write_subscriber.sem.post()
|
|
}
|
|
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
|
|
}
|
|
mut dest2 := dest
|
|
for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
|
|
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) },
|
|
&dest2, unsafe { nil })
|
|
{
|
|
have_swapped = true
|
|
break
|
|
} else if dest2 == voidptr(-1) {
|
|
ch.readsem.post()
|
|
return .closed
|
|
}
|
|
dest2 = dest
|
|
}
|
|
mut got_im_sem := false
|
|
for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ {
|
|
got_im_sem = ch.readsem_im.try_wait()
|
|
if got_im_sem {
|
|
break
|
|
}
|
|
}
|
|
for {
|
|
if got_im_sem {
|
|
got_im_sem = false
|
|
} else {
|
|
ch.readsem_im.wait()
|
|
}
|
|
if have_swapped
|
|
|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, &dest2, unsafe { nil }) {
|
|
ch.readsem.post()
|
|
break
|
|
} else {
|
|
// this semaphore was not for us - repost in
|
|
ch.readsem_im.post()
|
|
if dest2 == voidptr(-1) {
|
|
ch.readsem.post()
|
|
return .closed
|
|
}
|
|
dest2 = dest
|
|
}
|
|
}
|
|
break
|
|
}
|
|
return .success
|
|
}
|
|
|
|
// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`)
|
|
// object referenced by `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. `timeout <= 0` means return
|
|
// immediately if no transaction can be performed without waiting.
|
|
// return value: the index of the channel on which a transaction has taken place
|
|
// -1 if waiting for a transaction has exceeded timeout
|
|
// -2 if all channels are closed
|
|
|
|
pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int {
|
|
$if debug {
|
|
assert channels.len == dir.len
|
|
assert dir.len == objrefs.len
|
|
}
|
|
mut subscr := []Subscription{len: channels.len}
|
|
mut sem := unsafe { Semaphore{} }
|
|
sem.init(0)
|
|
for i, ch in channels {
|
|
subscr[i].sem = unsafe { &sem }
|
|
sub_mtx, subscriber := if dir[i] == .push {
|
|
&ch.write_sub_mtx, &ch.write_subscriber
|
|
} else {
|
|
&ch.read_sub_mtx, &ch.read_subscriber
|
|
}
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
subscr[i].prev = unsafe { subscriber }
|
|
unsafe {
|
|
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber),
|
|
&subscr[i]))
|
|
}
|
|
if voidptr(subscr[i].nxt) != unsafe { nil } {
|
|
subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
|
|
}
|
|
C.atomic_store_u16(sub_mtx, u16(0))
|
|
}
|
|
stopwatch := if timeout == time.infinite || timeout <= 0 {
|
|
time.StopWatch{}
|
|
} else {
|
|
time.new_stopwatch()
|
|
}
|
|
mut event_idx := -1 // negative index means `timed out`
|
|
|
|
outer: for {
|
|
rnd := rand.intn(channels.len) or { 0 }
|
|
mut num_closed := 0
|
|
for j, _ in channels {
|
|
mut i := j + rnd
|
|
if i >= channels.len {
|
|
i -= channels.len
|
|
}
|
|
stat := if dir[i] == .push {
|
|
channels[i].try_push_priv(objrefs[i], true)
|
|
} else {
|
|
channels[i].try_pop_priv(objrefs[i], true)
|
|
}
|
|
if stat == .success {
|
|
event_idx = i
|
|
break outer
|
|
} else if stat == .closed {
|
|
num_closed++
|
|
}
|
|
}
|
|
if num_closed == channels.len {
|
|
event_idx = -2
|
|
break outer
|
|
}
|
|
if timeout <= 0 {
|
|
break outer
|
|
}
|
|
if timeout != time.infinite {
|
|
remaining := timeout - stopwatch.elapsed()
|
|
if !sem.timed_wait(remaining) {
|
|
break outer
|
|
}
|
|
} else {
|
|
sem.wait()
|
|
}
|
|
}
|
|
// reset subscribers
|
|
for i, ch in channels {
|
|
sub_mtx := if dir[i] == .push {
|
|
&ch.write_sub_mtx
|
|
} else {
|
|
&ch.read_sub_mtx
|
|
}
|
|
mut null16 := u16(0)
|
|
for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
|
|
null16 = u16(0)
|
|
}
|
|
unsafe {
|
|
*subscr[i].prev = subscr[i].nxt
|
|
}
|
|
if unsafe { subscr[i].nxt != 0 } {
|
|
subscr[i].nxt.prev = subscr[i].prev
|
|
subscr[i].nxt.sem.post()
|
|
}
|
|
C.atomic_store_u16(sub_mtx, u16(0))
|
|
}
|
|
sem.destroy()
|
|
return event_idx
|
|
}
|