1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

channels: refactor the channel_select function (#18711)

This commit is contained in:
Herman 2023-07-02 15:45:30 +03:00 committed by GitHub
parent 329e063752
commit af38f8b3ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -556,35 +556,24 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
sem.init(0) sem.init(0)
for i, ch in channels { for i, ch in channels {
subscr[i].sem = unsafe { &sem } subscr[i].sem = unsafe { &sem }
if dir[i] == .push { sub_mtx, subscriber := if dir[i] == .push {
mut null16 := u16(0) &ch.write_sub_mtx, &ch.write_subscriber
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
subscr[i].prev = unsafe { &ch.write_subscriber }
unsafe {
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber),
&subscr[i]))
}
if voidptr(subscr[i].nxt) != unsafe { nil } {
subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
} else { } else {
&ch.read_sub_mtx, &ch.read_subscriber
}
mut null16 := u16(0) mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
null16 = u16(0) null16 = u16(0)
} }
subscr[i].prev = unsafe { &ch.read_subscriber } subscr[i].prev = unsafe { subscriber }
unsafe { unsafe {
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber), subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber),
&subscr[i])) &subscr[i]))
} }
if voidptr(subscr[i].nxt) != unsafe { nil } { if voidptr(subscr[i].nxt) != unsafe { nil } {
subscr[i].nxt.prev = unsafe { &subscr[i].nxt } subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
} }
C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) C.atomic_store_u16(sub_mtx, u16(0))
}
} }
stopwatch := if timeout == time.infinite || timeout <= 0 { stopwatch := if timeout == time.infinite || timeout <= 0 {
time.StopWatch{} time.StopWatch{}
@ -601,16 +590,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
if i >= channels.len { if i >= channels.len {
i -= channels.len i -= channels.len
} }
if dir[i] == .push { stat := if dir[i] == .push {
stat := channels[i].try_push_priv(objrefs[i], true) channels[i].try_push_priv(objrefs[i], true)
if stat == .success {
event_idx = i
break outer
} else if stat == .closed {
num_closed++
}
} else { } else {
stat := channels[i].try_pop_priv(objrefs[i], true) channels[i].try_pop_priv(objrefs[i], true)
}
if stat == .success { if stat == .success {
event_idx = i event_idx = i
break outer break outer
@ -618,7 +602,6 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
num_closed++ num_closed++
} }
} }
}
if num_closed == channels.len { if num_closed == channels.len {
event_idx = -2 event_idx = -2
break outer break outer
@ -637,23 +620,13 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
} }
// reset subscribers // reset subscribers
for i, ch in channels { for i, ch in channels {
if dir[i] == .push { sub_mtx := if dir[i] == .push {
mut null16 := u16(0) &ch.write_sub_mtx
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
unsafe {
*subscr[i].prev = subscr[i].nxt
}
if unsafe { subscr[i].nxt != 0 } {
subscr[i].nxt.prev = subscr[i].prev
// just in case we have missed a semaphore during restore
subscr[i].nxt.sem.post()
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
} else { } else {
&ch.read_sub_mtx
}
mut null16 := u16(0) mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
null16 = u16(0) null16 = u16(0)
} }
unsafe { unsafe {
@ -663,8 +636,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
subscr[i].nxt.prev = subscr[i].prev subscr[i].nxt.prev = subscr[i].prev
subscr[i].nxt.sem.post() subscr[i].nxt.sem.post()
} }
C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) C.atomic_store_u16(sub_mtx, u16(0))
}
} }
sem.destroy() sem.destroy()
return event_idx return event_idx