1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

channels: fix C warnings (#9732)

This commit is contained in:
ka-weihe
2021-04-15 01:44:39 +02:00
committed by GitHub
parent 1250ce4353
commit 25a9d30a70
5 changed files with 109 additions and 142 deletions

View File

@ -60,7 +60,7 @@ fn C.atomic_fetch_sub_u64(voidptr, u64) u64
const (
// how often to try to get data without blocking before to wait for semaphore
spinloops = 750
spinloops = 750
spinloops_sem = 4000
)
@ -73,9 +73,9 @@ enum BufferElemStat {
struct Subscription {
mut:
sem &Semaphore
sem &Semaphore
prev &&Subscription
nxt &Subscription
nxt &Subscription
}
enum Direction {
@ -84,9 +84,9 @@ enum Direction {
}
struct Channel {
ringbuf byteptr // queue for buffered channels
statusbuf byteptr // flags to synchronize write/read in ringbuf
objsize u32
ringbuf &byte // queue for buffered channels
statusbuf &byte // flags to synchronize write/read in ringbuf
objsize u32
mut: // atomic
writesem Semaphore // to wake thread that wanted to write, but buffer was full
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
@ -101,13 +101,13 @@ mut: // atomic
buf_elem_write_idx u32
buf_elem_read_idx u32
// for select
write_subscriber &Subscription
read_subscriber &Subscription
write_sub_mtx u16
read_sub_mtx u16
closed u16
write_subscriber &Subscription
read_subscriber &Subscription
write_sub_mtx u16
read_sub_mtx u16
closed u16
pub:
cap u32 // queue length in #objects
cap u32 // queue length in #objects
}
pub fn new_channel<T>(n u32) &Channel {
@ -118,8 +118,8 @@ pub fn new_channel<T>(n u32) &Channel {
fn new_channel_st(n u32, st u32) &Channel {
wsem := if n > 0 { n } else { 1 }
rsem := if n > 0 { u32(0) } else { 1 }
rbuf := if n > 0 { unsafe {malloc(int(n * st))} } else { byteptr(0) }
sbuf := if n > 0 { vcalloc(int(n * 2)) } else { byteptr(0) }
rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &byte(0) }
sbuf := if n > 0 { vcalloc(int(n * 2)) } else { &byte(0) }
mut ch := &Channel{
objsize: st
cap: n
@ -143,7 +143,8 @@ pub fn (mut ch Channel) close() {
return
}
mut nulladr := voidptr(0)
for !C.atomic_compare_exchange_weak_ptr(&ch.adr_written, &nulladr, voidptr(-1)) {
for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, &nulladr,
voidptr(-1)) {
nulladr = voidptr(0)
}
ch.readsem_im.post()
@ -166,7 +167,7 @@ pub fn (mut ch Channel) close() {
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
ch.writesem.post()
if ch.cap == 0 {
C.atomic_store_ptr(&ch.read_adr, voidptr(0))
C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, voidptr(0))
}
ch.writesem_im.post()
}
@ -197,17 +198,20 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
if C.atomic_load_u16(&ch.closed) != 0 {
return .closed
}
spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem }
mut have_swapped := false
for {
mut got_sem := false
mut wradr := C.atomic_load_ptr(&ch.write_adr)
mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
for wradr != C.NULL {
if C.atomic_compare_exchange_strong_ptr(&ch.write_adr, &wradr, voidptr(0)) {
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) },
&wradr, voidptr(0))
{
// there is a reader waiting for us
unsafe { C.memcpy(wradr, src, ch.objsize) }
mut nulladr := voidptr(0)
for !C.atomic_compare_exchange_weak_ptr(&ch.adr_written, &nulladr, wradr) {
for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) },
&nulladr, wradr) {
nulladr = voidptr(0)
}
ch.readsem_im.post()
@ -237,11 +241,13 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
if ch.cap == 0 {
// try to advertise current object as readable
mut read_in_progress := false
C.atomic_store_ptr(&ch.read_adr, src)
wradr = C.atomic_load_ptr(&ch.write_adr)
C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src)
wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
if wradr != C.NULL {
mut src2 := src
if C.atomic_compare_exchange_strong_ptr(&ch.read_adr, &src2, voidptr(0)) {
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) },
&src2, voidptr(0))
{
ch.writesem.post()
continue
} else {
@ -250,7 +256,8 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
}
if !read_in_progress {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16,
u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != voidptr(0) {
@ -260,7 +267,9 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
}
mut src2 := src
for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
if C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) {
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) },
&src2, voidptr(0))
{
have_swapped = true
read_in_progress = true
break
@ -281,14 +290,16 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
ch.writesem_im.wait()
}
if C.atomic_load_u16(&ch.closed) != 0 {
if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) {
if have_swapped
|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) {
ch.writesem.post()
return .success
} else {
return .closed
}
}
if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_read, &src2, voidptr(0)) {
if have_swapped
|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) {
ch.writesem.post()
break
} else {
@ -307,7 +318,8 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
mut space_in_queue := false
mut wr_free := C.atomic_load_u32(&ch.write_free)
for wr_free > 0 {
space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, wr_free-1)
space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free,
wr_free - 1)
if space_in_queue {
break
}
@ -319,7 +331,9 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
for new_wr_idx >= ch.cap {
new_wr_idx -= ch.cap
}
if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, new_wr_idx) {
if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx,
new_wr_idx)
{
break
}
}
@ -330,13 +344,14 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
status_adr += wr_idx * sizeof(u16)
}
mut expected_status := u16(BufferElemStat.unused)
for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, u16(BufferElemStat.writing)) {
for !C.atomic_compare_exchange_weak_u16(unsafe { &u16(status_adr) },
&expected_status, u16(BufferElemStat.writing)) {
expected_status = u16(BufferElemStat.unused)
}
unsafe {
C.memcpy(wr_ptr, src, ch.objsize)
}
C.atomic_store_u16(status_adr, u16(BufferElemStat.written))
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written))
C.atomic_fetch_add_u32(&ch.read_avail, 1)
ch.readsem.post()
mut null16 := u16(0)
@ -372,20 +387,23 @@ pub fn (mut ch Channel) try_pop(dest voidptr) ChanState {
}
fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem }
mut have_swapped := false
mut write_in_progress := false
for {
mut got_sem := false
if ch.cap == 0 {
// unbuffered channel - first see if a `push()` has adversized
mut rdadr := C.atomic_load_ptr(&ch.read_adr)
mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
for rdadr != C.NULL {
if C.atomic_compare_exchange_strong_ptr(&ch.read_adr, &rdadr, voidptr(0)) {
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) },
&rdadr, voidptr(0))
{
// there is a writer waiting for us
unsafe { C.memcpy(dest, rdadr, ch.objsize) }
mut nulladr := voidptr(0)
for !C.atomic_compare_exchange_weak_ptr(&ch.adr_read, &nulladr, rdadr) {
for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_read) },
&nulladr, rdadr) {
nulladr = voidptr(0)
}
ch.writesem_im.post()
@ -422,7 +440,8 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
mut obj_in_queue := false
mut rd_avail := C.atomic_load_u32(&ch.read_avail)
for rd_avail > 0 {
obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, rd_avail-1)
obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail,
rd_avail - 1)
if obj_in_queue {
break
}
@ -434,7 +453,9 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
for new_rd_idx >= ch.cap {
new_rd_idx -= ch.cap
}
if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, new_rd_idx) {
if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx,
new_rd_idx)
{
break
}
}
@ -445,13 +466,14 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
status_adr += rd_idx * sizeof(u16)
}
mut expected_status := u16(BufferElemStat.written)
for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, u16(BufferElemStat.reading)) {
for !C.atomic_compare_exchange_weak_u16(unsafe { &u16(status_adr) },
&expected_status, u16(BufferElemStat.reading)) {
expected_status = u16(BufferElemStat.written)
}
unsafe {
C.memcpy(dest, rd_ptr, ch.objsize)
}
C.atomic_store_u16(status_adr, u16(BufferElemStat.unused))
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused))
C.atomic_fetch_add_u32(&ch.write_free, 1)
ch.writesem.post()
mut null16 := u16(0)
@ -466,12 +488,14 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
}
}
// try to advertise `dest` as writable
C.atomic_store_ptr(&ch.write_adr, dest)
C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest)
if ch.cap == 0 {
mut rdadr := C.atomic_load_ptr(&ch.read_adr)
mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
if rdadr != C.NULL {
mut dest2 := dest
if C.atomic_compare_exchange_strong_ptr(&ch.write_adr, &dest2, voidptr(0)) {
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) },
&dest2, voidptr(0))
{
ch.readsem.post()
continue
} else {
@ -491,7 +515,9 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
}
mut dest2 := dest
for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
if C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) {
if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) },
&dest2, voidptr(0))
{
have_swapped = true
break
} else if dest2 == voidptr(-1) {
@ -513,7 +539,8 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
} else {
ch.readsem_im.wait()
}
if have_swapped || C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) {
if have_swapped
|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, &dest2, voidptr(0)) {
ch.readsem.post()
break
} else {
@ -526,7 +553,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
dest2 = dest
}
}
break
break
}
return .success
}
@ -553,7 +580,8 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
}
subscr[i].prev = &ch.write_subscriber
unsafe {
subscr[i].nxt = C.atomic_exchange_ptr(&ch.write_subscriber, &subscr[i])
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber),
&subscr[i]))
}
if voidptr(subscr[i].nxt) != voidptr(0) {
subscr[i].nxt.prev = &subscr[i].nxt
@ -566,7 +594,8 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
}
subscr[i].prev = &ch.read_subscriber
unsafe {
subscr[i].nxt = C.atomic_exchange_ptr(&ch.read_subscriber, &subscr[i])
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber),
&subscr[i]))
}
if voidptr(subscr[i].nxt) != voidptr(0) {
subscr[i].nxt.prev = &subscr[i].nxt
@ -576,8 +605,8 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
}
stopwatch := if timeout <= 0 { time.StopWatch{} } else { time.new_stopwatch({}) }
mut event_idx := -1 // negative index means `timed out`
outer:
for {
outer: for {
rnd := rand.u32_in_range(0, u32(channels.len))
mut num_closed := 0
for j, _ in channels {