Skip to content

sync.pool #

The sync.pool module provides a convenient way to run identical tasks over an array of items in parallel, without worrying about thread synchronization, waitgroups, mutexes etc.., you just need to supply a callback function, that will be called once per each item in your input array.

After all the work is done in parallel by the worker threads in the pool, pool.work_on_items will return. You can then call pool.get_resultsResult to retrieve a list of all the results, that the worker callbacks returned for each input item. Example:

import sync.pool

pub struct SResult {
    s string
}

fn sprocess(mut pp pool.PoolProcessor, idx int, wid int) &SResult {
    item := pp.get_item[string](idx)
    println('idx: ${idx}, wid: ${wid}, item: ' + item)
    return &SResult{item.reverse()}
}

fn main() {
    mut pp := pool.new_pool_processor(callback: sprocess)
    pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc'])
    // optionally, you can iterate over the results too:
    for x in pp.get_results[SResult]() {
        println('result: ${x.s}')
    }
}

See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a more detailed usage example.

Constants #

const no_result = unsafe { nil }

fn new_pool_processor #

fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor

new_pool_processor returns a new PoolProcessor instance. The parameters of new_pool_processor are: context.maxjobs: when 0 (the default), the PoolProcessor will use a number of threads, that is optimal for your system to process your items. context.callback: this should be a callback function, that each worker thread in the pool will run for each item. The callback function will receive as parameters: 1) the PoolProcessor instance, so it can call p.get_itemint to get the actual item at index idx 2) idx - the index of the currently processed item 3) task_id - the index of the worker thread in which the callback function is running.

type ThreadCB #

type ThreadCB = fn (mut p PoolProcessor, idx int, task_id int) voidptr

struct PoolProcessor #

struct PoolProcessor {
	thread_cb voidptr
mut:
	njobs           int
	items           []voidptr
	results         []voidptr
	ntask           u32 // reading/writing to this should be atomic
	waitgroup       sync.WaitGroup
	shared_context  voidptr
	thread_contexts []voidptr
}

fn (PoolProcessor) set_max_jobs #

fn (mut pool PoolProcessor) set_max_jobs(njobs int)

set_max_jobs gives you the ability to override the number of jobs after the PoolProcessor had been created already.

fn (PoolProcessor) work_on_items #

fn (mut pool PoolProcessor) work_on_items[T](items []T)

work_on_items receives a list of items of type T, then starts a work pool of pool.njobs threads, each running pool.thread_cb in a loop, until all items in the list, are processed. When pool.njobs is 0, the number of jobs is determined by the number of available cores on the system. work_on_items returns after all threads finish. You can optionally call get_results after that.

fn (PoolProcessor) work_on_pointers #

fn (mut pool PoolProcessor) work_on_pointers(items []voidptr)

fn (PoolProcessor) get_item #

fn (pool &PoolProcessor) get_item[T](idx int) T

get_item - called by the worker callback. Retrieves a type safe instance of the currently processed item

fn (PoolProcessor) get_result #

fn (pool &PoolProcessor) get_result[T](idx int) T

get_result - called by the main thread to get a specific result. Retrieves a type safe instance of the produced result.

fn (PoolProcessor) get_results #

fn (pool &PoolProcessor) get_results[T]() []T

get_results - get a list of type safe results in the main thread.

fn (PoolProcessor) get_results_ref #

fn (pool &PoolProcessor) get_results_ref[T]() []&T

get_results_ref - get a list of type safe results in the main thread.

fn (PoolProcessor) set_shared_context #

fn (mut pool PoolProcessor) set_shared_context(context voidptr)

set_shared_context - can be called during the setup so that you can provide a context that is shared between all worker threads, like common options/settings.

fn (PoolProcessor) get_shared_context #

fn (pool &PoolProcessor) get_shared_context() voidptr

get_shared_context - can be called in each worker callback, to get the context set by pool.set_shared_context

fn (PoolProcessor) set_thread_context #

fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr)

set_thread_context - can be called during the setup at the start of each worker callback, so that the worker callback can have some thread local storage area where it can write/read information that is private to the given thread, without worrying that it will get overwritten by another thread

fn (PoolProcessor) get_thread_context #

fn (pool &PoolProcessor) get_thread_context(idx int) voidptr

get_thread_context - returns a pointer, that was set with pool.set_thread_context . This pointer is private to each thread.

struct PoolProcessorConfig #

struct PoolProcessorConfig {
pub:
	maxjobs  int
	callback ThreadCB = empty_cb
}