sync.pool #
The sync.pool
module provides a convenient way to run identical tasks over an array of items in parallel, without worrying about thread synchronization, waitgroups, mutexes etc.., you just need to supply a callback function, that will be called once per each item in your input array.
After all the work is done in parallel by the worker threads in the pool, pool.work_on_items will return. You can then call pool.get_resultsResult to retrieve a list of all the results, that the worker callbacks returned for each input item. Example:
import sync.pool
pub struct SResult {
s string
}
fn sprocess(mut pp pool.PoolProcessor, idx int, wid int) &SResult {
item := pp.get_item[string](idx)
println('idx: ${idx}, wid: ${wid}, item: ' + item)
return &SResult{item.reverse()}
}
fn main() {
mut pp := pool.new_pool_processor(callback: sprocess)
pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc'])
// optionally, you can iterate over the results too:
for x in pp.get_results[SResult]() {
println('result: ${x.s}')
}
}
See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a more detailed usage example.
Constants #
const no_result = unsafe { nil }
fn new_pool_processor #
fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor
new_pool_processor returns a new PoolProcessor instance. The parameters of new_pool_processor are: context.maxjobs: when 0 (the default), the PoolProcessor will use a number of threads, that is optimal for your system to process your items. context.callback: this should be a callback function, that each worker thread in the pool will run for each item. The callback function will receive as parameters: 1) the PoolProcessor instance, so it can call p.get_itemint to get the actual item at index idx 2) idx - the index of the currently processed item 3) task_id - the index of the worker thread in which the callback function is running.
type ThreadCB #
type ThreadCB = fn (mut p PoolProcessor, idx int, task_id int) voidptr
struct PoolProcessor #
struct PoolProcessor {
thread_cb voidptr
mut:
njobs int
items []voidptr
results []voidptr
ntask u32 // reading/writing to this should be atomic
waitgroup sync.WaitGroup
shared_context voidptr
thread_contexts []voidptr
}
fn (PoolProcessor) set_max_jobs #
fn (mut pool PoolProcessor) set_max_jobs(njobs int)
set_max_jobs gives you the ability to override the number of jobs after the PoolProcessor had been created already.
fn (PoolProcessor) work_on_items #
fn (mut pool PoolProcessor) work_on_items[T](items []T)
work_on_items receives a list of items of type T, then starts a work pool of pool.njobs threads, each running pool.thread_cb in a loop, until all items in the list, are processed. When pool.njobs is 0, the number of jobs is determined by the number of available cores on the system. work_on_items returns after all threads finish. You can optionally call get_results after that.
fn (PoolProcessor) work_on_pointers #
fn (mut pool PoolProcessor) work_on_pointers(items []voidptr)
fn (PoolProcessor) get_item #
fn (pool &PoolProcessor) get_item[T](idx int) T
get_item - called by the worker callback. Retrieves a type safe instance of the currently processed item
fn (PoolProcessor) get_result #
fn (pool &PoolProcessor) get_result[T](idx int) T
get_result - called by the main thread to get a specific result. Retrieves a type safe instance of the produced result.
fn (PoolProcessor) get_results #
fn (pool &PoolProcessor) get_results[T]() []T
get_results - get a list of type safe results in the main thread.
fn (PoolProcessor) get_results_ref #
fn (pool &PoolProcessor) get_results_ref[T]() []&T
get_results_ref - get a list of type safe results in the main thread.
fn (PoolProcessor) set_thread_context #
fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr)
set_thread_context - can be called during the setup at the start of each worker callback, so that the worker callback can have some thread local storage area where it can write/read information that is private to the given thread, without worrying that it will get overwritten by another thread
fn (PoolProcessor) get_thread_context #
fn (pool &PoolProcessor) get_thread_context(idx int) voidptr
get_thread_context - returns a pointer, that was set with pool.set_thread_context . This pointer is private to each thread.
struct PoolProcessorConfig #
struct PoolProcessorConfig {
pub:
maxjobs int
callback ThreadCB = empty_cb
}