datatypes.lockfree #
Lockfree Library for V
A high-performance, thread-safe collection of lock-free data structures for the V programming language. Designed for concurrent applications requiring low-latency and high-throughput data processing.
Features
- Truly Lock-Free: No mutexes or spinlocks
- Cross-Platform: Works on Windows, Linux, macOS
- Configurable: Tune parameters for specific workloads
- High Performance: Optimized for modern multi-core processors
Data Structures
1. Atomic Counter
A thread-safe counter with atomic operations.
import datatypes.lockfree
mut counter := lockfree.new_counter[int](0)
counter.increment()
counter.increment_by(5)
value := counter.get() // 6
counter.decrement()
counter.clear()
Features:- Atomic increment/decrement
- Batch operations
- Get current value
- Reset functionality
2. Ring Buffer
A circular buffer for producer-consumer scenarios.
import datatypes.lockfree
mut rb := lockfree.new_ringbuffer[int](1024)
rb.push(10)
rb.push(20)
item := rb.pop() // 10
free := rb.remaining()
Features:- Single/Multi producer/consumer modes
- Blocking/non-blocking operations
- Batch operations
- Configurable size
- Memory efficient
Acknowledgements
This library incorporates research and design principles from:- Intel Threading Building Blocks (TBB)
- Facebook Folly
- Java Concurrent Package
- Dmitry Vyukov's lock-free algorithms
- DPDK rte ring
fn new_counter #
fn new_counter[T](init T) &Counter[T]
new_counter creates a new atomic counter with the specified initial value. It only supports integer types (8-bit to 64-bit integers).
fn new_ringbuffer #
fn new_ringbuffer[T](size u32, param RingBufferParam) &RingBuffer[T]
new_ringbuffer creates a new lock-free ring buffer.
Note: The buffer capacity will be expanded to the next power of two for efficient modulo operations using bitwise AND. The actual capacity may be larger than the requested size.
fn (Counter[T]) increment_by #
fn (mut c Counter[T]) increment_by(delta T) T
increment_by atomically adds a delta value to the counter and returns the previous value before the operation (fetch-and-add).
fn (Counter[T]) increment #
fn (mut c Counter[T]) increment() T
increment atomically increments the counter by 1 and returns the previous value before the operation.
fn (Counter[T]) decrement_by #
fn (mut c Counter[T]) decrement_by(delta T) T
decrement_by atomically subtracts a delta value from the counter and returns the previous value before the operation (fetch-and-sub).
fn (Counter[T]) decrement #
fn (mut c Counter[T]) decrement() T
decrement atomically decrements the counter by 1 and returns the previous value before the operation.
fn (Counter[T]) get #
fn (mut c Counter[T]) get() T
get atomically retrieves the current value of the counter.
fn (Counter[T]) clear #
fn (mut c Counter[T]) clear()
clear atomically resets the counter to zero.
fn (RingBuffer[T]) try_push #
fn (mut rb RingBuffer[T]) try_push(item T) bool
try_push tries to push a single item non-blocking.
fn (RingBuffer[T]) try_push_many #
fn (mut rb RingBuffer[T]) try_push_many(items []T) u32
try_push_many tries to push multiple items non-blocking.
fn (RingBuffer[T]) try_pop #
fn (mut rb RingBuffer[T]) try_pop() ?T
try_pop tries to pop a single item non-blocking.
fn (RingBuffer[T]) try_pop_many #
fn (mut rb RingBuffer[T]) try_pop_many(mut items []T) u32
try_pop_many tries to pop multiple items non-blocking.
fn (RingBuffer[T]) push #
fn (mut rb RingBuffer[T]) push(item T)
push blocking push of a single item.
fn (RingBuffer[T]) pop #
fn (mut rb RingBuffer[T]) pop() T
pop blocking pop of a single item.
fn (RingBuffer[T]) push_many #
fn (mut rb RingBuffer[T]) push_many(items []T)
push_many blocking push of multiple items.
fn (RingBuffer[T]) pop_many #
fn (mut rb RingBuffer[T]) pop_many(mut result []T)
pop_many blocking pop of multiple items.
fn (RingBuffer[T]) is_empty #
fn (rb RingBuffer[T]) is_empty() bool
is_empty checks if the buffer is empty.
fn (RingBuffer[T]) is_full #
fn (rb RingBuffer[T]) is_full() bool
is_full checks if the buffer is full.
fn (RingBuffer[T]) capacity #
fn (rb RingBuffer[T]) capacity() u32
capacity returns the total capacity of the buffer.
fn (RingBuffer[T]) occupied #
fn (rb RingBuffer[T]) occupied() u32
occupied returns the number of occupied slots.
fn (RingBuffer[T]) remaining #
fn (rb RingBuffer[T]) remaining() u32
remaining returns the number of free slots.
fn (RingBuffer[T]) clear #
fn (mut rb RingBuffer[T]) clear() bool
clear clears the ring buffer and resets all pointers.
fn (RingBuffer[T]) stat #
fn (rb RingBuffer[T]) stat() RingBufferStat
stat retrieves current performance statistics of the ring buffer.
This method fetches all recorded operation counters:- push_full_count: Times producers encountered full buffer
- push_fail_count: Times producers failed to reserve space
- push_wait_prev_count: Times producers waited for predecessors
- push_waiting_count: Current number of producers in waiting state
- pop_empty_count: Times consumers found empty buffer
- pop_fail_count: Times consumers failed to reserve items
- pop_wait_prev_count: Times consumers waited for predecessors
- pop_waiting_count: Current number of consumers in waiting state
enum RingBufferMode #
enum RingBufferMode {
spsc = 0 // Single Producer, Single Consumer (optimized for single-threaded access)
spmc = 1 // Single Producer, Multiple Consumers (one writer, multiple readers)
mpsc = 2 // Multiple Producers, Single Consumer (multiple writers, one reader)
mpmc = 3 // Multiple Producers, Multiple Consumers (default, fully concurrent)
}
RingBufferMode Operation modes for the ring buffer.
struct RingBuffer #
struct RingBuffer[T] {
mut:
mode u32 // Current operation mode (from RingBufferMode)
capacity u32 // Total capacity (always power of two)
mask u32 // Bitmask for index calculation (capacity - 1)
clear_flag u32 // Flag indicating clear operation in progress
max_waiting_prod_cons u32 // Max allowed waiting producers/consumers
pad0 [cache_line_size - 20]u8 // Padding to align to cache line boundary
// Producer state (isolated to prevent false sharing)
prod_head u32 // Producer head (next write position)
pad1 [cache_line_size - 4]u8 // Cache line padding
prod_tail u32 // Producer tail (last committed position)
pad2 [cache_line_size - 4]u8 // Cache line padding
// Consumer state (isolated to prevent false sharing)
cons_head u32 // Consumer head (next read position)
pad3 [cache_line_size - 4]u8 // Cache line padding
cons_tail u32 // Consumer tail (last committed position)
pad4 [cache_line_size - 4]u8 // Cache line padding
// Data storage area
slots []T // Array holding actual data elements
// Performance counters
push_full_count u32 // Count of full buffer encounters
push_fail_count u32 // Count of failed push attempts
push_wait_prev_count u32 // Count of waits for previous producers
push_waiting_count u32 // Current number of waiting producers
pop_empty_count u32 // Count of empty buffer encounters
pop_fail_count u32 // Count of failed pop attempts
pop_wait_prev_count u32 // Count of waits for previous consumers
pop_waiting_count u32 // Current number of waiting consumers
}
RingBuffer Lock-free multiple producer/multiple consumer ring buffer. Requires explicit initialization
struct RingBufferParam #
struct RingBufferParam {
pub:
mode RingBufferMode = .mpmc // Default to most concurrent mode
max_waiting_prod_cons int = 1 // Max allowed waiting producers/consumers before rejecting operations
}
RingBufferParam Configuration parameters for ring buffer creation.- max_waiting_prod_cons: Setting this to a larger value may improve performance,but in scenarios with many producers/consumers, it could lead to severe contention issues.
struct RingBufferStat #
struct RingBufferStat {
pub mut:
push_full_count u32 // Times producers encountered full buffer
push_fail_count u32 // Times producers failed to reserve space
push_wait_prev_count u32 // Times producers waited for predecessors
push_waiting_count u32 // Current number of producers in waiting state
pop_empty_count u32 // Times consumers found empty buffer
pop_fail_count u32 // Times consumers failed to reserve items
pop_wait_prev_count u32 // Times consumers waited for predecessors
pop_waiting_count u32 // Current number of consumers in waiting state
}
RingBufferStat holds performance counters for ring buffer operations.