diff --git a/examples/news_fetcher.v b/examples/news_fetcher.v index c355209fd9..78f3d72d3b 100644 --- a/examples/news_fetcher.v +++ b/examples/news_fetcher.v @@ -5,10 +5,9 @@ import http import json import sync -import time const ( - NR_THREADS = 8 + NR_THREADS = 4 ) struct Story { @@ -21,7 +20,7 @@ mut: mu sync.Mutex ids []int cursor int - list_id int + wg &sync.WaitGroup } fn (f mut Fetcher) fetch() { @@ -32,6 +31,7 @@ fn (f mut Fetcher) fetch() { } id := f.ids[f.cursor] f.cursor++ + cursor := f.cursor f.mu.unlock() resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or { println('failed to fetch data from /v0/item/${id}.json') @@ -41,28 +41,36 @@ fn (f mut Fetcher) fetch() { println('failed to decode a story') exit(1) } - f.mu.lock() - f.list_id++ - cursor := f.list_id - f.mu.unlock() + f.wg.done() println('#$cursor) $story.title | $story.url') } } -// Fetches top HN stories in 8 coroutines +// Fetches top HN stories in 4 coroutines fn main() { resp := http.get('https://hacker-news.firebaseio.com/v0/topstories.json') or { println('failed to fetch data from /v0/topstories.json') return } - ids := json.decode( []int, resp) or { + mut ids := json.decode([]int, resp) or { println('failed to decode topstories.json') return } - fetcher := &Fetcher{ids: ids} + if ids.len > 10 { + // ids = ids[:10] + mut tmp := [0 ; 10] + for i := 0 ; i < 10 ; i++ { + tmp[i] = ids[i] + } + ids = tmp + } + + mut wg := &sync.WaitGroup{} + fetcher := &Fetcher{ids: ids, wg: wg} // wg sent via ptr + wg.add(ids.len) for i := 0; i < NR_THREADS; i++ { go fetcher.fetch() } - time.sleep(5) + wg.wait() } diff --git a/vlib/sync/waitgroup.v b/vlib/sync/waitgroup.v new file mode 100644 index 0000000000..86aeba13b7 --- /dev/null +++ b/vlib/sync/waitgroup.v @@ -0,0 +1,37 @@ +// Copyright (c) 2019 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. + +module sync + +struct WaitGroup { +mut: + mu Mutex + finished Mutex + active int +} + +pub fn (wg mut WaitGroup) add(delta int) { + wg.mu.lock() + if wg.active == 0 { + wg.finished.lock() + } + wg.active += delta + if wg.active < 0 { + panic('Negative number of jobs in waitgroup') + } + if wg.active == 0 { + wg.finished.unlock() + } + wg.mu.unlock() +} + +pub fn (wg mut WaitGroup) done() { + wg.add(-1) +} + +pub fn (wg mut WaitGroup) wait() { + wg.finished.lock() + wg.finished.unlock() +} +