diff --git a/vlib/builtin/chan.v b/vlib/builtin/chan.v new file mode 100755 index 0000000000..8ee7c56d03 --- /dev/null +++ b/vlib/builtin/chan.v @@ -0,0 +1,20 @@ +module builtin + +enum ChanState { + success + not_ready // push()/pop() would have to wait, but no_block was requested + closed +} + +// The following methods are only stubs. The real implementation +// is in `vlib/sync/channels.v` + +pub fn (ch chan) close() {} + +pub fn (ch chan) try_pop(obj voidptr) ChanState { + return .success +} + +pub fn (ch chan) try_push(obj voidptr) ChanState { + return .success +} diff --git a/vlib/sync/channel_array_mut_test.v b/vlib/sync/channel_array_mut_test.v index 07b3bb0a13..adf10189b0 100644 --- a/vlib/sync/channel_array_mut_test.v +++ b/vlib/sync/channel_array_mut_test.v @@ -1,5 +1,3 @@ -import sync - const ( num_iterations = 10000 ) @@ -34,6 +32,6 @@ fn test_channel_array_mut() { chs[0] <- t t = <-chs[1] } - (&sync.Channel(chs[0])).close() + chs[0].close() assert t.n == 100 + num_iterations } diff --git a/vlib/sync/channel_polling_test.v b/vlib/sync/channel_polling_test.v new file mode 100755 index 0000000000..879352eab7 --- /dev/null +++ b/vlib/sync/channel_polling_test.v @@ -0,0 +1,53 @@ +// Channel Benchmark +// +// `nobj` integers are sent thru a channel with queue length`buflen` +// using `nsend` sender threads and `nrec` receiver threads. +// +// The receive threads add all received numbers and send them to the +// main thread where the total sum is compare to the expected value. + +const ( + nsend = 2 + nrec = 2 + buflen = 100 + nobj = 10000 + objs_per_thread = 5000 +) + +fn do_rec(ch chan int, resch chan i64, n int) { + mut sum := i64(0) + for _ in 0 .. n { + mut r := 0 + for ch.try_pop(mut r) != .success {} + sum += r + } + println(sum) + resch <- sum +} + +fn do_send(ch chan int, start, end int) { + for i in start .. end { + for ch.try_push(i) != .success {} + } +} + +fn test_channel_polling() { + ch := chan int{cap: buflen} + resch := chan i64{} + for i in 0 .. nrec { + go do_rec(ch, resch, objs_per_thread) + } + mut n := nobj + for i in 0 .. nsend { + end := n + n -= objs_per_thread + go do_send(ch, n, end) + } + mut sum := i64(0) + for _ in 0 .. nrec { + sum += <-resch + } + // use sum formula by Gauß to calculate the expected result + expected_sum := i64(nobj)*(nobj-1)/2 + assert sum == expected_sum +} diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v index 755b0f7093..0bcfaecbbc 100644 --- a/vlib/sync/channels.v +++ b/vlib/sync/channels.v @@ -76,12 +76,6 @@ enum Direction { push } -enum TransactionState { - success - not_ready // push()/pop() would have to wait, but no_block was requested - closed -} - struct Channel { writesem Semaphore // to wake thread that wanted to write, but buffer was full readsem Semaphore // to wake thread that wanted to read, but buffer was empty @@ -170,11 +164,11 @@ pub fn (mut ch Channel) push(src voidptr) { } [inline] -pub fn (mut ch Channel) try_push(src voidptr) TransactionState { +pub fn (mut ch Channel) try_push(src voidptr) ChanState { return ch.try_push_priv(src, false) } -fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState { +fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { if C.atomic_load_u16(&ch.closed) != 0 { return .closed } @@ -329,11 +323,11 @@ pub fn (mut ch Channel) pop(dest voidptr) bool { } [inline] -pub fn (mut ch Channel) try_pop(dest voidptr) TransactionState { +pub fn (mut ch Channel) try_pop(dest voidptr) ChanState { return ch.try_pop_priv(dest, false) } -fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { +fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 } mut have_swapped := false mut write_in_progress := false @@ -355,7 +349,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { } } if no_block { - return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed } + if C.atomic_load_u16(&ch.closed) == 0 { + return .not_ready + } else { + return .closed + } } } // get token to read @@ -367,7 +365,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { } if !got_sem { if no_block { - return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed } + if C.atomic_load_u16(&ch.closed) == 0 { + return .not_ready + } else { + return .closed + } } ch.readsem.wait() } diff --git a/vlib/v/gen/fn.v b/vlib/v/gen/fn.v index 95408e8dd7..25152418ac 100644 --- a/vlib/v/gen/fn.v +++ b/vlib/v/gen/fn.v @@ -391,6 +391,11 @@ fn (mut g Gen) method_call(node ast.CallExpr) { } } mut name := util.no_dots('${receiver_type_name}_$node.name') + if left_sym.kind == .chan { + if node.name in ['close', 'try_pop', 'try_push'] { + name = 'sync__Channel_$node.name' + } + } // Check if expression is: arr[a..b].clone(), arr[a..].clone() // if so, then instead of calling array_clone(&array_slice(...)) // call array_clone_static(array_slice(...))