mirror of
https://github.com/vlang/v.git
synced 2023-08-10 21:13:21 +03:00
sync/channels: provide `try_push(), try_pop() as public methods (#6101)
This commit is contained in:
@ -152,17 +152,26 @@ pub fn (mut ch Channel) close() {
|
||||
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
|
||||
}
|
||||
|
||||
[inline]
|
||||
pub fn (mut ch Channel) push(src voidptr) {
|
||||
if ch.try_push(src, false) == .closed {
|
||||
if ch.try_push_priv(src, false) == .closed {
|
||||
panic('push on closed channel')
|
||||
}
|
||||
}
|
||||
|
||||
fn (mut ch Channel) try_push(src voidptr, no_block bool) TransactionState {
|
||||
[inline]
|
||||
pub fn (mut ch Channel) try_push(src voidptr) TransactionState {
|
||||
return ch.try_push_priv(src, false)
|
||||
}
|
||||
|
||||
fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState {
|
||||
if C.atomic_load_u16(&ch.closed) != 0 {
|
||||
return .closed
|
||||
}
|
||||
spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
|
||||
mut spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 }
|
||||
$if macos {
|
||||
spinloops_sem_ = 1
|
||||
}
|
||||
mut have_swapped := false
|
||||
for {
|
||||
mut got_sem := false
|
||||
@ -307,12 +316,21 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) TransactionState {
|
||||
}
|
||||
}
|
||||
|
||||
[inline]
|
||||
pub fn (mut ch Channel) pop(dest voidptr) bool {
|
||||
return ch.try_pop(dest, false) == .success
|
||||
return ch.try_pop_priv(dest, false) == .success
|
||||
}
|
||||
|
||||
fn (mut ch Channel) try_pop(dest voidptr, no_block bool) TransactionState {
|
||||
spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
|
||||
[inline]
|
||||
pub fn (mut ch Channel) try_pop(dest voidptr) TransactionState {
|
||||
return ch.try_pop_priv(dest, false)
|
||||
}
|
||||
|
||||
fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
|
||||
mut spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 }
|
||||
$if macos {
|
||||
spinloops_sem_ = 1
|
||||
}
|
||||
mut have_swapped := false
|
||||
mut write_in_progress := false
|
||||
for {
|
||||
@ -465,7 +483,11 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) TransactionState {
|
||||
}
|
||||
|
||||
// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`)
|
||||
// object referenced by `objrefs[i]`. `timeout = 0` means wait unlimited time
|
||||
// object referenced by `objrefs[i]`. `timeout < 0` means wait unlimited time. `timeout == 0` means return immediately
|
||||
// if no transaction can be performed without waiting.
|
||||
// return value: the index of the channel on which a transaction has taken place
|
||||
// -1 if waiting for a transaction has exceeded timeout
|
||||
// -2 if all channels are closed
|
||||
|
||||
pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int {
|
||||
assert channels.len == dir.len
|
||||
@ -499,7 +521,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
|
||||
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
|
||||
}
|
||||
}
|
||||
stopwatch := if timeout == 0 { time.StopWatch{} } else { time.new_stopwatch({}) }
|
||||
stopwatch := if timeout <= 0 { time.StopWatch{} } else { time.new_stopwatch({}) }
|
||||
mut event_idx := -1 // negative index means `timed out`
|
||||
for {
|
||||
rnd := rand.u32_in_range(0, u32(channels.len))
|
||||
@ -510,7 +532,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
|
||||
i -= channels.len
|
||||
}
|
||||
if dir[i] == .push {
|
||||
stat := channels[i].try_push(objrefs[i], true)
|
||||
stat := channels[i].try_push_priv(objrefs[i], true)
|
||||
if stat == .success {
|
||||
event_idx = i
|
||||
goto restore
|
||||
@ -518,7 +540,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
|
||||
num_closed++
|
||||
}
|
||||
} else {
|
||||
stat := channels[i].try_pop(objrefs[i], true)
|
||||
stat := channels[i].try_pop_priv(objrefs[i], true)
|
||||
if stat == .success {
|
||||
event_idx = i
|
||||
goto restore
|
||||
@ -531,7 +553,9 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
|
||||
event_idx = -2
|
||||
goto restore
|
||||
}
|
||||
if timeout > 0 {
|
||||
if timeout == 0 {
|
||||
goto restore
|
||||
} else if timeout > 0 {
|
||||
remaining := timeout - stopwatch.elapsed()
|
||||
if !sem.timed_wait(remaining) {
|
||||
goto restore
|
||||
|
Reference in New Issue
Block a user