sync #

Constants #

const (
	no_result = voidptr(0)
)
  • Goal: this file provides a convenient way to run identical tasks over a list * of items in parallel, without worrying about waitgroups, mutexes and so on. * * Usage example: * struct SResult{ s string } * fn sprocess(p &sync.PoolProcessor, idx, wid int) voidptr { * item := p.get_item<string>(idx) * println('idx: $idx, wid: $wid, item: ' + item) * return &SResult{ item.reverse() } * } * pool := sync.new_pool_processor({ callback: sprocess }) * pool.work_on_items(['a','b','c','d','e','f','g']) * // optionally, you can iterate over the results too: * for x in pool.get_results<SResult>() { * println('result: $x.s') * } * * See https://github.com/vlang/v/blob/master/vlib/sync/pool_test.v for a * more detailed usage example. * * After all the work is done in parallel by the worker threads in the pool, * pool.work_on_items will return, and you can then call * pool.get_results<Result>() to retrieve a list of all the results, * that the worker callbacks returned for each item that you passed. * The parameters of new_pool_processor are: * context.maxjobs: when 0 (the default), the PoolProcessor will use an * optimal for your system number of threads to process your items * context.callback: this should be a callback function, that each worker * thread in the pool will run for each item. * The callback function will receive as parameters: * 1) the PoolProcessor instance, so it can call * p.get_item<int>(idx) to get the actual item at index idx * NB: for now, you are better off calling p.get_string_item(idx) * or p.get_int_item(idx) ; TODO: vfmt and generics * 2) idx - the index of the currently processed item * 3) task_id - the index of the worker thread in which the callback * function is running.

fn channel_select #

fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int

Wait timeout on any of channels[i] until one of them can push (is_push[i] = true) or pop (is_push[i] = false) object referenced by objrefs[i]. timeout &lt; 0 means wait unlimited time. timeout == 0 means return immediately if no transaction can be performed without waiting. return value: the index of the channel on which a transaction has taken place -1 if waiting for a transaction has exceeded timeout -2 if all channels are closed

fn new_channel #

fn new_channel<T>(n u32) &Channel

fn new_mutex #

fn new_mutex() &Mutex

fn new_pool_processor #

fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor

new_pool_processor returns a new PoolProcessor instance.

fn new_rwmutex #

fn new_rwmutex() &RwMutex

fn new_semaphore #

fn new_semaphore() Semaphore

fn new_semaphore_init #

fn new_semaphore_init(n u32) Semaphore

fn new_waiter #

fn new_waiter() &Waiter

fn new_waitgroup #

fn new_waitgroup() &WaitGroup

fn (Channel) close #

fn (mut ch Channel) close()

fn (Channel) len #

fn (mut ch Channel) len() int

fn (Channel) push #

fn (mut ch Channel) push(src voidptr)

fn (Channel) try_push #

fn (mut ch Channel) try_push(src voidptr) ChanState

fn (Channel) pop #

fn (mut ch Channel) pop(dest voidptr) bool

fn (Channel) try_pop #

fn (mut ch Channel) try_pop(dest voidptr) ChanState

type ThreadCB #

type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr

fn (Waiter) wait #

fn (mut w Waiter) wait()

fn (Waiter) stop #

fn (mut w Waiter) stop()

fn (WaitGroup) add #

fn (mut wg WaitGroup) add(delta int)

add increments (+ve delta) or decrements (-ve delta) task count by delta and unblocks any wait() calls if task count becomes zero. add panics if task count drops below zero.

fn (WaitGroup) done #

fn (mut wg WaitGroup) done()

done is a convenience fn for add(-1)

fn (WaitGroup) wait #

fn (mut wg WaitGroup) wait()

wait blocks until all tasks are done (task count becomes zero)

struct IResult #

struct IResult {
pub:
	i int
}

struct Mutex #

struct Mutex {
	mutex C.pthread_mutex_t
}

[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.

fn (Mutex) m_lock #

fn (mut m Mutex) m_lock()

m_lock(), for manual mutex handling, since lock is a keyword

fn (Mutex) unlock #

fn (mut m Mutex) unlock()

struct PoolProcessor #

struct PoolProcessor {
	thread_cb voidptr
mut:
	njobs           int
	items           []voidptr
	results         []voidptr
	ntask           int
	ntask_mtx       &Mutex
	waitgroup       &WaitGroup
	shared_context  voidptr
	thread_contexts []voidptr
}

fn (PoolProcessor) set_max_jobs #

fn (mut pool PoolProcessor) set_max_jobs(njobs int)

set_max_jobs gives you the ability to override the number of jobs after the PoolProcessor had been created already.

fn (PoolProcessor) work_on_pointers #

fn (mut pool PoolProcessor) work_on_pointers(items []voidptr)

work_on_items receives a list of items of type T, then starts a work pool of pool.njobs threads, each running pool.thread_cb in a loop, untill all items in the list, are processed. When pool.njobs is 0, the number of jobs is determined by the number of available cores on the system. work_on_items returns after all threads finish. You can optionally call get_results after that. TODO: uncomment, when generics work again pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) { pool.work_on_pointers( items.pointers() ) }

fn (PoolProcessor) get_string_item #

fn (pool &PoolProcessor) get_string_item(idx int) string

get_item - called by the worker callback. Retrieves a type safe instance of the currently processed item TODO: uncomment, when generics work again pub fn (pool &PoolProcessor) get_item<T>(idx int) T { return *(&T(pool.items[idx])) } get_string_item - called by the worker callback. It does not use generics so it does not mess up vfmt. TODO: remove the need for this when vfmt becomes smarter.

fn (PoolProcessor) get_int_item #

fn (pool &PoolProcessor) get_int_item(idx int) int

get_int_item - called by the worker callback. It does not use generics so it does not mess up vfmt. TODO: remove the need for this when vfmt becomes smarter.

fn (PoolProcessor) set_shared_context #

fn (mut pool PoolProcessor) set_shared_context(context voidptr)

TODO: uncomment, when generics work again pub fn (pool &PoolProcessor) get_result<T>(idx int) T { return *(&T(pool.results[idx])) } TODO: uncomment, when generics work again get_results - can be called to get a list of type safe results. pub fn (pool &PoolProcessor) get_results<T>() []T { mut res := []T{} for i in 0 .. pool.results.len { res << *(&T(pool.results[i])) } return res } set_shared_context - can be called during the setup so that you can provide a context that is shared between all worker threads, like common options/settings.

fn (PoolProcessor) get_shared_context #

fn (pool &PoolProcessor) get_shared_context() voidptr

get_shared_context - can be called in each worker callback, to get the context set by pool.set_shared_context

fn (PoolProcessor) set_thread_context #

fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr)

set_thread_context - can be called during the setup at the start of each worker callback, so that the worker callback can have some thread local storage area where it can write/read information that is private to the given thread, without worrying that it will get overwritten by another thread

fn (PoolProcessor) get_thread_context #

fn (pool &PoolProcessor) get_thread_context(idx int) voidptr

get_thread_context - returns a pointer, that was set with pool.set_thread_context . This pointer is private to each thread.

fn (PoolProcessor) work_on_items_s #

fn (mut pool PoolProcessor) work_on_items_s(items []string)

fn (PoolProcessor) work_on_items_i #

fn (mut pool PoolProcessor) work_on_items_i(items []int)

fn (PoolProcessor) get_results_s #

fn (pool &PoolProcessor) get_results_s() []SResult

fn (PoolProcessor) get_results_i #

fn (pool &PoolProcessor) get_results_i() []IResult

struct PoolProcessorConfig #

struct PoolProcessorConfig {
	maxjobs  int
	callback ThreadCB
}

struct RwMutex #

struct RwMutex {
	mutex C.pthread_rwlock_t
}

fn (RwMutex) r_lock #

fn (mut m RwMutex) r_lock()

RwMutex has separate read- and write locks

fn (RwMutex) w_lock #

fn (mut m RwMutex) w_lock()

fn (RwMutex) r_unlock #

fn (mut m RwMutex) r_unlock()

Windows SRWLocks have different function to unlock So provide two functions here, too, to have a common interface

fn (RwMutex) w_unlock #

fn (mut m RwMutex) w_unlock()

struct Semaphore #

struct Semaphore {
mut:
	sem voidptr
}

fn (Semaphore) post #

fn (s Semaphore) post()

fn (Semaphore) wait #

fn (s Semaphore) wait()

fn (Semaphore) try_wait #

fn (s Semaphore) try_wait() bool

fn (Semaphore) timed_wait #

fn (s Semaphore) timed_wait(timeout time.Duration) bool

fn (Semaphore) destroy #

fn (s Semaphore) destroy() bool

struct SResult #

struct SResult {
pub:
	s string
}

TODO: remove everything below this line after generics are fixed: