From b0ece3a9d874efa76c83160492c301037110b9d3 Mon Sep 17 00:00:00 2001 From: Alexander Medvednikov Date: Wed, 4 Mar 2020 20:28:42 +0100 Subject: [PATCH] sync: implement pool.work_on_items to process a list of items in parallel --- cmd/tools/modules/testing/common.v | 197 ++++++++++++--------------- examples/news_fetcher.v | 72 +++------- vlib/benchmark/benchmark.v | 7 + vlib/builtin/array.v | 10 ++ vlib/compiler/tests/repl/repl_test.v | 97 ++++++------- vlib/sync/pool.v | 197 +++++++++++++++++++++++++++ vlib/sync/pool_test.v | 58 ++++++++ 7 files changed, 423 insertions(+), 215 deletions(-) create mode 100644 vlib/sync/pool.v create mode 100644 vlib/sync/pool_test.v diff --git a/cmd/tools/modules/testing/common.v b/cmd/tools/modules/testing/common.v index 72116a1eee..0c6376dbaa 100644 --- a/cmd/tools/modules/testing/common.v +++ b/cmd/tools/modules/testing/common.v @@ -5,22 +5,17 @@ import ( term benchmark filepath - runtime sync v.pref ) pub struct TestSession { pub mut: - files []string - vexe string - vargs string - failed bool - benchmark benchmark.Benchmark - - ntask int // writing to this should be locked by mu. - ntask_mtx &sync.Mutex - waitgroup &sync.WaitGroup + files []string + vexe string + vargs string + failed bool + benchmark benchmark.Benchmark show_ok_tests bool } @@ -28,11 +23,6 @@ pub fn new_test_session(vargs string) TestSession { return TestSession{ vexe: pref.vexe_path() vargs: vargs - - ntask: 0 - ntask_mtx: sync.new_mutex() - waitgroup: sync.new_waitgroup() - show_ok_tests: !vargs.contains('-silent') } } @@ -69,109 +59,96 @@ pub fn (ts mut TestSession) test() { } remaining_files << dot_relative_file } - ts.files = remaining_files ts.benchmark.set_total_expected_steps(remaining_files.len) - - mut njobs := runtime.nr_jobs() + mut pool_of_test_runners := sync.new_pool_processor({ + callback: worker_trunner + }) + pool_of_test_runners.set_shared_context(ts) $if msvc { // NB: MSVC can not be launched in parallel, without giving it // the option /FS because it uses a shared PDB file, which should // be locked, but that makes writing slower... // See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019 // Instead, just run tests on 1 core for now. - njobs = 1 + pool_of_test_runners.set_max_jobs(1) } - ts.waitgroup.add( njobs ) - for i:=0; i < njobs; i++ { - go process_in_thread(ts) - } - ts.waitgroup.wait() + pool_of_test_runners.work_on_pointers(remaining_files.pointers()) ts.benchmark.stop() eprintln(term.h_divider('-')) } - -fn process_in_thread(ts mut TestSession){ - ts.process_files() - ts.waitgroup.done() -} - -fn (ts mut TestSession) process_files() { +fn worker_trunner(p mut sync.PoolProcessor, idx int, thread_id int) voidptr { + mut ts := &TestSession(p.get_shared_context()) tmpd := os.tmpdir() show_stats := '-stats' in ts.vargs.split(' ') - - mut tls_bench := benchmark.new_benchmark() // tls_bench is used to format the step messages/timings - tls_bench.set_total_expected_steps( ts.benchmark.nexpected_steps ) - for { - - ts.ntask_mtx.lock() - ts.ntask++ - idx := ts.ntask-1 - ts.ntask_mtx.unlock() - - if idx >= ts.files.len { break } - tls_bench.cstep = idx - - dot_relative_file := ts.files[ idx ] - relative_file := dot_relative_file.replace('./', '') - file := os.realpath(relative_file) - // Ensure that the generated binaries will be stored in the temporary folder. - // Remove them after a test passes/fails. - fname := filepath.filename(file) - generated_binary_fname := if os.user_os() == 'windows' { fname.replace('.v', '.exe') } else { fname.replace('.v', '') } - generated_binary_fpath := filepath.join(tmpd,generated_binary_fname) - if os.exists(generated_binary_fpath) { - os.rm(generated_binary_fpath) - } - mut cmd_options := [ts.vargs] - if !ts.vargs.contains('fmt') { - cmd_options << ' -o "$generated_binary_fpath"' - } - cmd := '"${ts.vexe}" ' + cmd_options.join(' ') + ' "${file}"' - // eprintln('>>> v cmd: $cmd') - ts.benchmark.step() - tls_bench.step() - if show_stats { - eprintln(term.h_divider('-')) - status := os.system(cmd) - if status == 0 { - ts.benchmark.ok() - tls_bench.ok() - } - else { - ts.failed = true - ts.benchmark.fail() - tls_bench.fail() - continue - } + // tls_bench is used to format the step messages/timings + mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx)) + if isnil(tls_bench) { + tls_bench = benchmark.new_benchmark_pointer() + tls_bench.set_total_expected_steps(ts.benchmark.nexpected_steps) + p.set_thread_context(idx, tls_bench) + } + tls_bench.cstep = idx + dot_relative_file := p.get_string_item(idx) + relative_file := dot_relative_file.replace('./', '') + file := os.realpath(relative_file) + // Ensure that the generated binaries will be stored in the temporary folder. + // Remove them after a test passes/fails. + fname := filepath.filename(file) + generated_binary_fname := if os.user_os() == 'windows' { fname.replace('.v', '.exe') } else { fname.replace('.v', '') } + generated_binary_fpath := filepath.join(tmpd,generated_binary_fname) + if os.exists(generated_binary_fpath) { + os.rm(generated_binary_fpath) + } + mut cmd_options := [ts.vargs] + if !ts.vargs.contains('fmt') { + cmd_options << ' -o "$generated_binary_fpath"' + } + cmd := '"${ts.vexe}" ' + cmd_options.join(' ') + ' "${file}"' + // eprintln('>>> v cmd: $cmd') + ts.benchmark.step() + tls_bench.step() + if show_stats { + eprintln(term.h_divider('-')) + status := os.system(cmd) + if status == 0 { + ts.benchmark.ok() + tls_bench.ok() } else { - r := os.exec(cmd) or { - ts.failed = true - ts.benchmark.fail() - tls_bench.fail() - eprintln(tls_bench.step_message_fail(relative_file)) - continue - } - if r.exit_code != 0 { - ts.failed = true - ts.benchmark.fail() - tls_bench.fail() - eprintln(tls_bench.step_message_fail('${relative_file}\n`$file`\n (\n$r.output\n)')) - } - else { - ts.benchmark.ok() - tls_bench.ok() - if ts.show_ok_tests { - eprintln(tls_bench.step_message_ok(relative_file)) - } - } - } - if os.exists(generated_binary_fpath) { - os.rm(generated_binary_fpath) + ts.failed = true + ts.benchmark.fail() + tls_bench.fail() + return sync.no_result } } + else { + r := os.exec(cmd) or { + ts.failed = true + ts.benchmark.fail() + tls_bench.fail() + eprintln(tls_bench.step_message_fail(relative_file)) + return sync.no_result + } + if r.exit_code != 0 { + ts.failed = true + ts.benchmark.fail() + tls_bench.fail() + eprintln(tls_bench.step_message_fail('${relative_file}\n`$file`\n (\n$r.output\n)')) + } + else { + ts.benchmark.ok() + tls_bench.ok() + if ts.show_ok_tests { + eprintln(tls_bench.step_message_ok(relative_file)) + } + } + } + if os.exists(generated_binary_fpath) { + os.rm(generated_binary_fpath) + } + return sync.no_result } pub fn vlib_should_be_present(parent_dir string) { @@ -193,17 +170,17 @@ pub fn v_build_failing(zargs string, folder string) bool { eprintln('v compiler args: "$vargs"') mut session := new_test_session(vargs) files := os.walk_ext(filepath.join(parent_dir,folder), '.v') - mut mains := files.filter(!it.contains('modules') && !it.contains('preludes')) - $if windows { - // skip pico example on windows - // there was a bug using filter here - mut mains_filtered := []string - for file in mains { - if !file.ends_with('examples\\pico\\pico.v') { - mains_filtered << file + mut mains := []string + for f in files { + if !f.contains('modules') && !f.contains('preludes') { + $if windows { + // skip pico example on windows + if f.ends_with('examples\\pico\\pico.v') { + continue + } } + mains << f } - mains = mains_filtered } session.files << mains session.test() @@ -257,9 +234,9 @@ pub fn building_any_v_binaries_failed() bool { } pub fn eheader(msg string) { - eprintln(term.header(msg,'-')) + eprintln(term.header(msg, '-')) } pub fn header(msg string) { - println(term.header(msg,'-')) + println(term.header(msg, '-')) } diff --git a/examples/news_fetcher.v b/examples/news_fetcher.v index f15e87b68e..96db33f56d 100644 --- a/examples/news_fetcher.v +++ b/examples/news_fetcher.v @@ -5,47 +5,26 @@ import net.http import json import sync -const ( - nr_threads = 4 -) - struct Story { title string url string } -struct Fetcher { -mut: - mu &sync.Mutex - ids []int - cursor int - wg &sync.WaitGroup -} - -fn (f mut Fetcher) fetch() { - for { - if f.cursor >= f.ids.len { - return - } - id := f.ids[f.cursor] - f.mu.lock() - f.cursor++ - f.mu.unlock() - cursor := f.cursor - resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or { - println('failed to fetch data from /v0/item/${id}.json') - exit(1) - } - story := json.decode(Story,resp.text) or { - println('failed to decode a story') - exit(1) - } - println('#$cursor) $story.title | $story.url') - f.wg.done() +fn worker_fetch(p &sync.PoolProcessor, cursor int, worker_id int) voidptr { + id := p.get_item(cursor) + resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or { + println('failed to fetch data from /v0/item/${id}.json') + return sync.no_result } + story := json.decode(Story,resp.text) or { + println('failed to decode a story') + return sync.no_result + } + println('# $cursor) $story.title | $story.url') + return sync.no_result } -// Fetches top HN stories in 4 coroutines +// Fetches top HN stories in parallel, depending on how many cores you have fn main() { resp := http.get('https://hacker-news.firebaseio.com/v0/topstories.json') or { println('failed to fetch data from /v0/topstories.json') @@ -56,22 +35,15 @@ fn main() { return } if ids.len > 10 { - // ids = ids[:10] - mut tmp := [0].repeat(10) - for i in 0..10 { - tmp[i] = ids[i] - } - ids = tmp + ids = ids[0..10] } - mut fetcher := &Fetcher{ - ids: ids - mu: sync.new_mutex() - wg: sync.new_waitgroup() - } - fetcher.wg.add(ids.len) - for i in 0..nr_threads { - go fetcher.fetch() - } - fetcher.wg.wait() + mut fetcher_pool := sync.new_pool_processor({ + callback: worker_fetch + }) + // NB: if you do not call set_max_jobs, the pool will try to use an optimal + // number of threads, one per each core in your system, which in most + // cases is what you want anyway... You can override the automatic choice + // by setting the VJOBS environment variable too. + // fetcher_pool.set_max_jobs( 4 ) + fetcher_pool.work_on_items(ids) } - diff --git a/vlib/benchmark/benchmark.v b/vlib/benchmark/benchmark.v index a7f1837b76..d2511e18ba 100644 --- a/vlib/benchmark/benchmark.v +++ b/vlib/benchmark/benchmark.v @@ -74,6 +74,13 @@ pub fn new_benchmark() Benchmark { } } +pub fn new_benchmark_pointer() &Benchmark { + return &Benchmark{ + bench_start_time: benchmark.now() + verbose: true + } +} + pub fn (b mut Benchmark) set_total_expected_steps(n int) { b.nexpected_steps = n } diff --git a/vlib/builtin/array.v b/vlib/builtin/array.v index 2167ea08df..407f46318a 100644 --- a/vlib/builtin/array.v +++ b/vlib/builtin/array.v @@ -577,3 +577,13 @@ pub fn compare_f32(a, b &f32) int { } return 0 } + +// a.pointers() returns a new array, where each element +// is the address of the corresponding element in a. +pub fn (a array) pointers() []voidptr { + mut res := []voidptr + for i in 0..a.len { + res << a.data + i * a.element_size + } + return res +} diff --git a/vlib/compiler/tests/repl/repl_test.v b/vlib/compiler/tests/repl/repl_test.v index 76df228f8f..aeb325929d 100644 --- a/vlib/compiler/tests/repl/repl_test.v +++ b/vlib/compiler/tests/repl/repl_test.v @@ -3,7 +3,6 @@ module main import os import compiler.tests.repl.runner import benchmark -import runtime import sync import filepath @@ -28,77 +27,65 @@ fn test_the_v_compiler_can_be_invoked() { struct Session { mut: - options runner.RunnerOptions - bmark benchmark.Benchmark - ntask int - ntask_mtx &sync.Mutex - waitgroup &sync.WaitGroup + options runner.RunnerOptions + bmark benchmark.Benchmark } fn test_all_v_repl_files() { mut session := &Session{ options: runner.new_options() bmark: benchmark.new_benchmark() - ntask: 0 - ntask_mtx: sync.new_mutex() - waitgroup: sync.new_waitgroup() } // warmup, and ensure that the vrepl is compiled in single threaded mode if it does not exist runner.run_repl_file(os.cachedir(), session.options.vexec, 'vlib/compiler/tests/repl/nothing.repl') or { panic(err) } - - session.bmark.set_total_expected_steps( session.options.files.len ) - mut ncpus := 0 - ncpus = runtime.nr_cpus() + session.bmark.set_total_expected_steps(session.options.files.len) + mut pool_repl := sync.new_pool_processor({ + callback: worker_repl + }) + pool_repl.set_shared_context(session) $if windows { - // See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019 - ncpus = 1 + // See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019 + pool_repl.set_max_jobs(1) } - session.waitgroup.add( ncpus ) - for i:=0; i < ncpus; i++ { - go process_in_thread(session,i) - } - session.waitgroup.wait() + pool_repl.work_on_items(session.options.files) session.bmark.stop() println(session.bmark.total_message('total time spent running REPL files')) } -fn process_in_thread( session mut Session, thread_id int ){ +fn worker_repl(p mut sync.PoolProcessor, idx int, thread_id int) voidptr { cdir := os.cachedir() - mut tls_bench := benchmark.new_benchmark() - tls_bench.set_total_expected_steps( session.bmark.nexpected_steps ) - for { - session.ntask_mtx.lock() - session.ntask++ - idx := session.ntask-1 - session.ntask_mtx.unlock() - - if idx >= session.options.files.len { break } - tls_bench.cstep = idx - - tfolder := filepath.join( cdir, 'vrepl_tests_$idx') - if os.is_dir( tfolder ) { - os.rmdir_all( tfolder ) - } - os.mkdir( tfolder ) or { panic(err) } - - file := session.options.files[ idx ] - session.bmark.step() - tls_bench.step() - fres := runner.run_repl_file(tfolder, session.options.vexec, file) or { - session.bmark.fail() - tls_bench.fail() - os.rmdir_all( tfolder ) - eprintln(tls_bench.step_message_fail(err)) - assert false - continue - } - session.bmark.ok() - tls_bench.ok() - os.rmdir_all( tfolder ) - println(tls_bench.step_message_ok(fres)) - assert true + mut session := &Session(p.get_shared_context()) + mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx)) + if isnil(tls_bench) { + tls_bench = benchmark.new_benchmark_pointer() + tls_bench.set_total_expected_steps(session.bmark.nexpected_steps) + p.set_thread_context(idx, tls_bench) } - session.waitgroup.done() + tls_bench.cstep = idx + tfolder := filepath.join(cdir,'vrepl_tests_$idx') + if os.is_dir(tfolder) { + os.rmdir_all(tfolder) + } + os.mkdir(tfolder) or { + panic(err) + } + file := p.get_string_item(idx) + session.bmark.step() + tls_bench.step() + fres := runner.run_repl_file(tfolder, session.options.vexec, file) or { + session.bmark.fail() + tls_bench.fail() + os.rmdir_all(tfolder) + eprintln(tls_bench.step_message_fail(err)) + assert false + return sync.no_result + } + session.bmark.ok() + tls_bench.ok() + os.rmdir_all(tfolder) + println(tls_bench.step_message_ok(fres)) + assert true + return sync.no_result } diff --git a/vlib/sync/pool.v b/vlib/sync/pool.v new file mode 100644 index 0000000000..a3bf183d79 --- /dev/null +++ b/vlib/sync/pool.v @@ -0,0 +1,197 @@ +module sync +// * Goal: this file provides a convenient way to run identical tasks over a list +// * of items in parallel, without worrying about waitgroups, mutexes and so on. +// * +// * Usage example: +// * pool := sync.new_pool_processor({ callback: worker_cb }) +// * //pool.work_on_items(['a','b','c']) // TODO: vfmt and generics +// * pool.work_on_pointers(['a','b','c'].pointers()) +// * // optionally, you can iterate over the results too: +// * for x in pool.get_results() { +// * // do stuff with x +// * } +// * +// * See https://github.com/vlang/v/blob/master/vlib/sync/pool_test.v for a +// * more detailed usage example. +// * +// * After all the work is done in parallel by the worker threads in the pool, +// * pool.work_on_items will return, and you can then call +// * pool.get_results() to retrieve a list of all the results, +// * that the worker callbacks returned for each item that you passed. +// * The parameters of new_pool_processor are: +// * context.maxjobs: when 0 (the default), the PoolProcessor will use an +// * optimal for your system number of threads to process your items +// * context.callback: this should be a callback function, that each worker +// * thread in the pool will run for each item. +// * The callback function will receive as parameters: +// * 1) the PoolProcessor instance, so it can call +// * p.get_item(idx) to get the actual item at index idx +// * NB: for now, you are better off calling p.get_string_item(idx) +// * or p.get_int_item(idx) ; TODO: vfmt and generics +// * 2) idx - the index of the currently processed item +// * 3) task_id - the index of the worker thread in which the callback +// * function is running. +import runtime + +pub const ( + no_result = voidptr(0) +) + +pub struct PoolProcessor { + thread_cb voidptr +mut: + njobs int + items []voidptr + results []voidptr + ntask int // writing to this should be locked by ntask_mtx. + ntask_mtx &sync.Mutex + waitgroup &sync.WaitGroup + shared_context voidptr + thread_contexts []voidptr +} + +pub type ThreadCB fn(p &PoolProcessor, idx int, task_id int)voidptr + +pub struct PoolProcessorConfig { + maxjobs int + callback ThreadCB +} + +// new_pool_processor returns a new PoolProcessor instance. +pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor { + if isnil(context.callback) { + panic('You need to pass a valid callback to new_pool_processor.') + } + // TODO: remove this call. + // It prevents a V warning about unused module runtime. + runtime.nr_jobs() + pool := &PoolProcessor { + items: [] + results: [] + shared_context: voidptr(0) + thread_contexts: [] + njobs: context.maxjobs + ntask: 0 + ntask_mtx: sync.new_mutex() + waitgroup: sync.new_waitgroup() + thread_cb: context.callback + } + return pool +} + +// set_max_jobs gives you the ability to override the number +// of jobs *after* the PoolProcessor had been created already. +pub fn (pool mut PoolProcessor) set_max_jobs(njobs int) { + pool.njobs = njobs +} + +// work_on_items receives a list of items of type T, +// then starts a work pool of pool.njobs threads, each running +// pool.thread_cb in a loop, untill all items in the list, +// are processed. +// When pool.njobs is 0, the number of jobs is determined +// by the number of available cores on the system. +// work_on_items returns *after* all threads finish. +// You can optionally call get_results after that. +pub fn (pool mut PoolProcessor) work_on_items(items []T) { + pool.work_on_pointers( items.pointers() ) +} + +pub fn (pool mut PoolProcessor) work_on_pointers(items []voidptr) { + mut njobs := runtime.nr_jobs() + if pool.njobs > 0 { + njobs = pool.njobs + } + pool.items = [] + pool.results = [] + pool.thread_contexts = [] + pool.items << items + pool.results = [voidptr(0)].repeat(pool.items.len) + pool.thread_contexts << [voidptr(0)].repeat(pool.items.len) + pool.waitgroup.add(njobs) + for i := 0; i < njobs; i++ { + go process_in_thread(pool,i) + } + pool.waitgroup.wait() +} + +// process_in_thread does the actual work of worker thread. +// It is a workaround for the current inability to pass a +// method in a callback. +fn process_in_thread(pool mut PoolProcessor, task_id int) { + cb := ThreadCB(pool.thread_cb) + mut idx := 0 + ilen := pool.items.len + for { + if pool.ntask >= ilen { + break + } + pool.ntask_mtx.lock() + idx = pool.ntask + pool.ntask++ + pool.ntask_mtx.unlock() + pool.results[idx] = cb(pool, idx, task_id) + } + pool.waitgroup.done() +} + +// get_item - called by the worker callback. +// Retrieves a type safe instance of the currently processed item +pub fn (pool &PoolProcessor) get_item(idx int) T { + return *(&T(pool.items[idx])) +} + +// get_string_item - called by the worker callback. +// It does not use generics so it does not mess up vfmt. +// TODO: remove the need for this when vfmt becomes smarter. +pub fn (pool &PoolProcessor) get_string_item(idx int) string { + return *(&string(pool.items[idx])) +} + +// get_int_item - called by the worker callback. +// It does not use generics so it does not mess up vfmt. +// TODO: remove the need for this when vfmt becomes smarter. +pub fn (pool &PoolProcessor) get_int_item(idx int) int { + return *(&int(pool.items[idx])) +} + +pub fn (pool &PoolProcessor) get_result(idx int) T { + return *(&T(pool.results[idx])) +} + +// get_results - can be called to get a list of type safe results. +pub fn (pool &PoolProcessor) get_results() []T { + mut res := []T + for i in 0 .. pool.results.len { + res << *(&T(pool.results[i])) + } + return res +} + +// set_shared_context - can be called during the setup so that you can +// provide a context that is shared between all worker threads, like +// common options/settings. +pub fn (pool mut PoolProcessor) set_shared_context(context voidptr) { + pool.shared_context = context +} + +// get_shared_context - can be called in each worker callback, to get +// the context set by pool.set_shared_context +pub fn (pool &PoolProcessor) get_shared_context() voidptr { + return pool.shared_context +} + +// set_thread_context - can be called during the setup at the start of +// each worker callback, so that the worker callback can have some thread +// local storage area where it can write/read information that is private +// to the given thread, without worrying that it will get overwritten by +// another thread +pub fn (pool mut PoolProcessor) set_thread_context(idx int, context voidptr) { + pool.thread_contexts[idx] = context +} + +// get_thread_context - returns a pointer, that was set with +// pool.set_thread_context . This pointer is private to each thread. +pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr { + return pool.thread_contexts[idx] +} diff --git a/vlib/sync/pool_test.v b/vlib/sync/pool_test.v new file mode 100644 index 0000000000..557a96869f --- /dev/null +++ b/vlib/sync/pool_test.v @@ -0,0 +1,58 @@ +import sync +import time +import rand + +struct SResult { + s string +} + +fn worker_s(p &sync.PoolProcessor, idx int, worker_id int) voidptr { + // TODO: this works, but confuses vfmt. It should be used instead of + // p.get_int_item when vfmt becomes smarter. + // item := p.get_item(idx) + item := p.get_string_item(idx) + println('worker_s worker_id: $worker_id | idx: $idx | item: ${item}') + time.sleep_ms(rand.next(3)) + return &SResult{item + item} +} + +struct IResult { + i int +} + +fn worker_i(p &sync.PoolProcessor, idx int, worker_id int) voidptr { + // TODO: this works, but confuses vfmt. See the comment above. + // item := p.get_item(idx) + item := p.get_int_item(idx) + println('worker_i worker_id: $worker_id | idx: $idx | item: ${item}') + time.sleep_ms(rand.next(5)) + return &IResult{item * 1000} +} + +fn test_work_on_strings() { + rand.seed(0) + mut pool_s := sync.new_pool_processor({ + callback: worker_s + maxjobs: 8 + }) + pool_s.work_on_items(['a','b','c','d','e','f','g','h','i','j']) + for x in pool_s.get_results() { + println( x.s ) + assert x.s.len > 1 + } +} + +fn test_work_on_ints() { + rand.seed(0) + // NB: since maxjobs is left empty here, + // the pool processor will use njobs = runtime.nr_jobs so that + // it will work optimally without overloading the system + mut pool_i := sync.new_pool_processor({ + callback: worker_i + }) + pool_i.work_on_items([1,2,3,4,5,6,7,8]) + for x in pool_i.get_results() { + println( x.i ) + assert x.i > 100 + } +}