Skip to content

sync #

Description:

sync provides cross platform handling of concurrency primitives.

fn channel_select #

fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int

Wait timeout on any of channels[i] until one of them can push (is_push[i] = true) or pop (is_push[i] = false) object referenced by objrefs[i]. timeout = time.infinite means wait unlimited time. timeout <= 0 means return immediately if no transaction can be performed without waiting.
return value: the index of the channel on which a transaction has taken place -1 if waiting for a transaction has exceeded timeout -2 if all channels are closed

fn new_channel #

fn new_channel[T](n u32) &Channel

fn new_many_times #

fn new_many_times(times u64) &ManyTimes

new_many_times return a new ManyTimes struct.

fn new_mutex #

fn new_mutex() &Mutex

fn new_once #

fn new_once() &Once

new_once return a new Once struct.

fn new_rwmutex #

fn new_rwmutex() &RwMutex

fn new_semaphore #

fn new_semaphore() &Semaphore

fn new_semaphore_init #

fn new_semaphore_init(n u32) &Semaphore

fn new_waitgroup #

fn new_waitgroup() &WaitGroup

fn thread_id #

fn thread_id() u64

thread_id returns a unique identifier for the caller thread.
All currently running threads in the same process, will have different thread identifiers.
Note: if a thread finishes, and another starts, the identifier of the old thread may be reused for the newly started thread.
In other words, thread IDs are guaranteed to be unique only within a process.
A thread ID may be reused after a terminated thread has been joined (with t.wait()), or when the thread has terminated.

struct Channel #

struct Channel {
	ringbuf   &u8 = unsafe { nil } // queue for buffered channels
	statusbuf &u8 = unsafe { nil } // flags to synchronize write/read in ringbuf
	objsize   u32
mut: // atomic
	writesem           Semaphore // to wake thread that wanted to write, but buffer was full
	readsem            Semaphore // to wake thread that wanted to read, but buffer was empty
	writesem_im        Semaphore
	readsem_im         Semaphore
	write_adr          C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
	read_adr           C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
	adr_read           C.atomic_uintptr_t // used to identify origin of writesem
	adr_written        C.atomic_uintptr_t // used to identify origin of readsem
	write_free         u32 // for queue state
	read_avail         u32
	buf_elem_write_idx u32
	buf_elem_read_idx  u32
// for select
	write_subscriber &Subscription = unsafe { nil }
	read_subscriber  &Subscription = unsafe { nil }
	write_sub_mtx    u16
	read_sub_mtx     u16
	closed           u16
pub:
	cap u32 // queue length in #objects
}

fn (Channel) auto_str #

fn (ch &Channel) auto_str(typename string) string

fn (Channel) close #

fn (mut ch Channel) close()

fn (Channel) len #

fn (mut ch Channel) len() int

fn (Channel) closed #

fn (mut ch Channel) closed() bool

fn (Channel) push #

fn (mut ch Channel) push(src voidptr)

fn (Channel) try_push #

fn (mut ch Channel) try_push(src voidptr) ChanState

fn (Channel) pop #

fn (mut ch Channel) pop(dest voidptr) bool

fn (Channel) try_pop #

fn (mut ch Channel) try_pop(dest voidptr) ChanState

struct ManyTimes #

struct ManyTimes {
mut:
	m RwMutex
pub:
	times u64 = 1
	count u64
}

fn (ManyTimes) do #

fn (mut m ManyTimes) do(f fn ())

do execute the function only setting times.

struct Mutex #

struct Mutex {
	mutex C.pthread_mutex_t
}

[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.

fn (Mutex) init #

fn (mut m Mutex) init()

fn (Mutex) @lock #

fn (mut m Mutex) @lock()

@lock(), for manual mutex handling, since lock is a keyword

fn (Mutex) unlock #

fn (mut m Mutex) unlock()

struct Once #

struct Once {
mut:
	m RwMutex
pub:
	count u64
}

fn (Once) do #

fn (mut o Once) do(f fn ())

do executes the function f() only once

fn (Once) do_with_param #

fn (mut o Once) do_with_param(f fn (voidptr), param voidptr)

do_with_param executes f(param) only once` This method can be used as a workaround for passing closures to once.do/1 on Windows (they are not implemented there yet) - just pass your data explicitly.
i.e. instead of:

   once.do(fn [mut o] () {
       o.add(5)
})

... you can use:

   once.do_with_param(fn (mut o One) {
       o.add(5)
   }, o)

struct RwMutex #

struct RwMutex {
	mutex C.pthread_rwlock_t
}

fn (RwMutex) init #

fn (mut m RwMutex) init()

fn (RwMutex) @rlock #

fn (mut m RwMutex) @rlock()

RwMutex has separate read- and write locks

fn (RwMutex) @lock #

fn (mut m RwMutex) @lock()

fn (RwMutex) runlock #

fn (mut m RwMutex) runlock()

Windows SRWLocks have different function to unlock So provide two functions here, too, to have a common interface

fn (RwMutex) unlock #

fn (mut m RwMutex) unlock()

struct Semaphore #

struct Semaphore {
	sem C.sem_t
}

fn (Semaphore) init #

fn (mut sem Semaphore) init(n u32)

fn (Semaphore) post #

fn (mut sem Semaphore) post()

fn (Semaphore) wait #

fn (mut sem Semaphore) wait()

fn (Semaphore) try_wait #

fn (mut sem Semaphore) try_wait() bool

try_wait() should return as fast as possible so error handling is only done when debugging

fn (Semaphore) timed_wait #

fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool

fn (Semaphore) destroy #

fn (sem Semaphore) destroy()

struct WaitGroup #

struct WaitGroup {
mut:
	task_count u32       // current task count - reading/writing should be atomic
	wait_count u32       // current wait count - reading/writing should be atomic
	sem        Semaphore // This blocks wait() until tast_countreleased by add()
}

WaitGroup Do not copy an instance of WaitGroup, use a ref instead.

usage: in main thread: wg := sync.new_waitgroup() wg.add(nr_jobs)before starting jobs withgo ... wg.wait()` to wait for all jobs to have finished

in each parallel job: wg.done() when finished

[init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function.

fn (WaitGroup) init #

fn (mut wg WaitGroup) init()

fn (WaitGroup) add #

fn (mut wg WaitGroup) add(delta int)

add increments (+ve delta) or decrements (-ve delta) task count by delta and unblocks any wait() calls if task count becomes zero.
add panics if task count drops below zero.

fn (WaitGroup) done #

fn (mut wg WaitGroup) done()

done is a convenience fn for add(-1)

fn (WaitGroup) wait #

fn (mut wg WaitGroup) wait()

wait blocks until all tasks are done (task count becomes zero)