From c3cdfa1c96766e6252dd1ce3b3a370d70a9d78b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uwe=20Kr=C3=BCger?= <45282134+UweKrueger@users.noreply.github.com> Date: Mon, 10 Aug 2020 18:06:42 +0200 Subject: [PATCH] sync/channels: provide `try_push(), try_pop() as public methods (#6101) --- doc/upcoming.md | 2 +- vlib/sync/bench/results.md | 39 ++++++++++++++++++++++++++++ vlib/sync/channel_select_test.v | 2 +- vlib/sync/channels.v | 46 +++++++++++++++++++++++++-------- vlib/sync/select_close_test.v | 3 ++- vlib/time/time.v | 7 ++--- 6 files changed, 82 insertions(+), 17 deletions(-) create mode 100644 vlib/sync/bench/results.md diff --git a/doc/upcoming.md b/doc/upcoming.md index 29030861a8..ff881d1a21 100644 --- a/doc/upcoming.md +++ b/doc/upcoming.md @@ -252,7 +252,7 @@ if ch.pop(&m) { The select call is somewhat tricky. The `channel_select()` function needs three arrays that contain the channels, the directions (pop/push) and the object references and -a timeout of type `time.Duration` (or `0` to wait unlimited) as parameters. It returns the +a timeout of type `time.Duration` (`time.infinite` or `-1` to wait unlimited) as parameters. It returns the index of the object that was pushed or popped or `-1` for timeout. ```v diff --git a/vlib/sync/bench/results.md b/vlib/sync/bench/results.md new file mode 100644 index 0000000000..3298861c06 --- /dev/null +++ b/vlib/sync/bench/results.md @@ -0,0 +1,39 @@ +# Channel Benchmark Results + +This documents lists several benchmark results for different platforms in order to +identify performance regressions and improvements. + +The are measured using the command + +``` +> channel_bench_* + +nsend ... number of threads that push objects into the channel +nrec .... number of threads that pop objects from the channel +buflen .. length of channel buffer queue - `0` means unbuffered channel +nobj .... number of objects to pass thru the channel +``` + +## AMD Ryzen 7 3800X, Ubuntu-20.04 x86_64 + +10000000 Objects transfered, results in Objects/µs + +| nsend | nrec | buflen | **V (gcc -O2)** | **V (tcc)** | **Go (glang)** | **Go (gccgo -O2)** | +| :---: | :---:| :---: | :---: | :---: | :---: | :---: | +| 1 | 1 | 0 | 0.95 | 0.66 | 4.65 | 0.56 | +| 1 | 1 | 100 | 3.26 | 2.24 | 18.90 | 6.08 | +| 4 | 4 | 0 | 0.25 | 0.24 | 1.84 | 0.84 | +| 4 | 4 | 100 | 2.78 | 2.63 | 7.43 | 3.71 | + +## Raspberry Pi 3B+, Void Linux musl 32 bit + +10000000 Objects transfered, results in Objects/µs + +| nsend | nrec | buflen | **V (gcc -O2)** | **Go (glang)** | +| :---: | :---:| :---: | :---: | :---: | +| 1 | 1 | 0 | 0.37 | 0.21 | +| 1 | 1 | 100 | 1.03 | 0.74 | +| 4 | 4 | 0 | 0.04 | 0.38 | +| 4 | 4 | 100 | 2.78 | 2.63 | +| 2 | 2 | 0 | 0.05 | 0.38 | +| 2 | 2 | 100 | 1.26 | 0.75 | diff --git a/vlib/sync/channel_select_test.v b/vlib/sync/channel_select_test.v index 10e2a0ec8c..3131bc5658 100644 --- a/vlib/sync/channel_select_test.v +++ b/vlib/sync/channel_select_test.v @@ -48,7 +48,7 @@ fn test_select() { mut sl := i64(0) mut objs := [voidptr(&ri), &sl, &rl, &rb] for _ in 0 .. 1200 { - idx := sync.channel_select(mut channels, directions, mut objs, 0) + idx := sync.channel_select(mut channels, directions, mut objs, -1) match idx { 0 { sum += ri diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v index 4dfb765065..a3f6910771 100644 --- a/vlib/sync/channels.v +++ b/vlib/sync/channels.v @@ -152,17 +152,26 @@ pub fn (mut ch Channel) close() { C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) } +[inline] pub fn (mut ch Channel) push(src voidptr) { - if ch.try_push(src, false) == .closed { + if ch.try_push_priv(src, false) == .closed { panic('push on closed channel') } } -fn (mut ch Channel) try_push(src voidptr, no_block bool) TransactionState { +[inline] +pub fn (mut ch Channel) try_push(src voidptr) TransactionState { + return ch.try_push_priv(src, false) +} + +fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState { if C.atomic_load_u16(&ch.closed) != 0 { return .closed } - spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem } + mut spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 } + $if macos { + spinloops_sem_ = 1 + } mut have_swapped := false for { mut got_sem := false @@ -307,12 +316,21 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) TransactionState { } } +[inline] pub fn (mut ch Channel) pop(dest voidptr) bool { - return ch.try_pop(dest, false) == .success + return ch.try_pop_priv(dest, false) == .success } -fn (mut ch Channel) try_pop(dest voidptr, no_block bool) TransactionState { - spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem } +[inline] +pub fn (mut ch Channel) try_pop(dest voidptr) TransactionState { + return ch.try_pop_priv(dest, false) +} + +fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { + mut spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 } + $if macos { + spinloops_sem_ = 1 + } mut have_swapped := false mut write_in_progress := false for { @@ -465,7 +483,11 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) TransactionState { } // Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`) -// object referenced by `objrefs[i]`. `timeout = 0` means wait unlimited time +// object referenced by `objrefs[i]`. `timeout < 0` means wait unlimited time. `timeout == 0` means return immediately +// if no transaction can be performed without waiting. +// return value: the index of the channel on which a transaction has taken place +// -1 if waiting for a transaction has exceeded timeout +// -2 if all channels are closed pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { assert channels.len == dir.len @@ -499,7 +521,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) } } - stopwatch := if timeout == 0 { time.StopWatch{} } else { time.new_stopwatch({}) } + stopwatch := if timeout <= 0 { time.StopWatch{} } else { time.new_stopwatch({}) } mut event_idx := -1 // negative index means `timed out` for { rnd := rand.u32_in_range(0, u32(channels.len)) @@ -510,7 +532,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo i -= channels.len } if dir[i] == .push { - stat := channels[i].try_push(objrefs[i], true) + stat := channels[i].try_push_priv(objrefs[i], true) if stat == .success { event_idx = i goto restore @@ -518,7 +540,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo num_closed++ } } else { - stat := channels[i].try_pop(objrefs[i], true) + stat := channels[i].try_pop_priv(objrefs[i], true) if stat == .success { event_idx = i goto restore @@ -531,7 +553,9 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo event_idx = -2 goto restore } - if timeout > 0 { + if timeout == 0 { + goto restore + } else if timeout > 0 { remaining := timeout - stopwatch.elapsed() if !sem.timed_wait(remaining) { goto restore diff --git a/vlib/sync/select_close_test.v b/vlib/sync/select_close_test.v index b4ec30ed21..6b5e5e2b99 100644 --- a/vlib/sync/select_close_test.v +++ b/vlib/sync/select_close_test.v @@ -1,4 +1,5 @@ import sync +import time fn do_rec_i64(mut ch sync.Channel) { mut sum := i64(0) @@ -55,7 +56,7 @@ fn test_select() { mut sl := i64(0) mut objs := [voidptr(&ri), &sl, &rl, &rb] for j in 0 .. 1101 { - idx := sync.channel_select(mut channels, directions, mut objs, 0) + idx := sync.channel_select(mut channels, directions, mut objs, time.infinite) match idx { 0 { sum += ri diff --git a/vlib/time/time.v b/vlib/time/time.v index be9ea75165..28df6c7abb 100644 --- a/vlib/time/time.v +++ b/vlib/time/time.v @@ -363,9 +363,10 @@ pub const( nanosecond = Duration(1) microsecond = Duration(1000) * nanosecond millisecond = Duration(1000) * microsecond - second = Duration(1000) * millisecond - minute = Duration(60) * second - hour = Duration(60) * minute + second = Duration(1000) * millisecond + minute = Duration(60) * second + hour = Duration(60) * minute + infinite = Duration(-1) ) // nanoseconds returns the duration as an integer number of nanoseconds.