datatypes.lockfree #
Lockfree Library for V
A high-performance, thread-safe collection of lock-free data structures for the V programming language. Designed for concurrent applications requiring low-latency and high-throughput data processing.
Features
- Truly Lock-Free: No mutexes or spinlocks
- Cross-Platform: Works on Windows, Linux, macOS
- Configurable: Tune parameters for specific workloads
- High Performance: Optimized for modern multi-core processors
Data Structures
1. Atomic Counter
A thread-safe counter with atomic operations.
import datatypes.lockfree
mut counter := lockfree.new_counter[int](0)
counter.increment()
counter.increment_by(5)
value := counter.get() // 6
counter.decrement()
counter.clear()
Features:- Atomic increment/decrement
- Batch operations
- Get current value
- Reset functionality
2. Ring Buffer
A circular buffer for producer-consumer scenarios.
import datatypes.lockfree
mut rb := lockfree.new_ringbuffer[int](1024)
rb.push(10)
rb.push(20)
item := rb.pop() // 10
free := rb.remaining()
Features:- Single/Multi producer/consumer modes
- Blocking/non-blocking operations
- Batch operations
- Configurable size
- Memory efficient
Acknowledgements
This library incorporates research and design principles from:- Intel Threading Building Blocks (TBB)
- Facebook Folly
- Java Concurrent Package
- Dmitry Vyukov's lock-free algorithms
- DPDK rte ring
fn new_counter #
fn new_counter[T](init T) &Counter[T]
new_counter creates a new atomic counter with the specified initial value. It only supports integer types (8-bit to 64-bit integers).
fn new_ringbuffer #
fn new_ringbuffer[T](size u32, param RingBufferParam) &RingBuffer[T]
new_ringbuffer creates a new lock-free ring buffer.
Note: The buffer capacity will be expanded to the next power of two for efficient modulo operations using bitwise AND. The actual capacity may be larger than the requested size
.
fn (Counter[T]) increment_by #
fn (mut c Counter[T]) increment_by(delta T) T
increment_by atomically adds a delta value to the counter and returns the previous value before the operation (fetch-and-add).
fn (Counter[T]) increment #
fn (mut c Counter[T]) increment() T
increment atomically increments the counter by 1 and returns the previous value before the operation.
fn (Counter[T]) decrement_by #
fn (mut c Counter[T]) decrement_by(delta T) T
decrement_by atomically subtracts a delta value from the counter and returns the previous value before the operation (fetch-and-sub).
fn (Counter[T]) decrement #
fn (mut c Counter[T]) decrement() T
decrement atomically decrements the counter by 1 and returns the previous value before the operation.
fn (Counter[T]) get #
fn (mut c Counter[T]) get() T
get atomically retrieves the current value of the counter.
fn (Counter[T]) clear #
fn (mut c Counter[T]) clear()
clear atomically resets the counter to zero.
fn (RingBuffer[T]) try_push #
fn (mut rb RingBuffer[T]) try_push(item T) bool
try_push tries to push a single item non-blocking.
fn (RingBuffer[T]) try_push_many #
fn (mut rb RingBuffer[T]) try_push_many(items []T) u32
try_push_many tries to push multiple items non-blocking.
fn (RingBuffer[T]) try_pop #
fn (mut rb RingBuffer[T]) try_pop() ?T
try_pop tries to pop a single item non-blocking.
fn (RingBuffer[T]) try_pop_many #
fn (mut rb RingBuffer[T]) try_pop_many(mut items []T) u32
try_pop_many tries to pop multiple items non-blocking.
fn (RingBuffer[T]) push #
fn (mut rb RingBuffer[T]) push(item T)
push blocking push of a single item.
fn (RingBuffer[T]) pop #
fn (mut rb RingBuffer[T]) pop() T
pop blocking pop of a single item.
fn (RingBuffer[T]) push_many #
fn (mut rb RingBuffer[T]) push_many(items []T)
push_many blocking push of multiple items.
fn (RingBuffer[T]) pop_many #
fn (mut rb RingBuffer[T]) pop_many(mut result []T)
pop_many blocking pop of multiple items.
fn (RingBuffer[T]) is_empty #
fn (rb RingBuffer[T]) is_empty() bool
is_empty checks if the buffer is empty.
fn (RingBuffer[T]) is_full #
fn (rb RingBuffer[T]) is_full() bool
is_full checks if the buffer is full.
fn (RingBuffer[T]) capacity #
fn (rb RingBuffer[T]) capacity() u32
capacity returns the total capacity of the buffer.
fn (RingBuffer[T]) occupied #
fn (rb RingBuffer[T]) occupied() u32
occupied returns the number of occupied slots.
fn (RingBuffer[T]) remaining #
fn (rb RingBuffer[T]) remaining() u32
remaining returns the number of free slots.
fn (RingBuffer[T]) clear #
fn (mut rb RingBuffer[T]) clear() bool
clear clears the ring buffer and resets all pointers.
fn (RingBuffer[T]) stat #
fn (rb RingBuffer[T]) stat() RingBufferStat
stat retrieves current performance statistics of the ring buffer.
This method fetches all recorded operation counters:- push_full_count: Times producers encountered full buffer
- push_fail_count: Times producers failed to reserve space
- push_wait_prev_count: Times producers waited for predecessors
- push_waiting_count: Current number of producers in waiting state
- pop_empty_count: Times consumers found empty buffer
- pop_fail_count: Times consumers failed to reserve items
- pop_wait_prev_count: Times consumers waited for predecessors
- pop_waiting_count: Current number of consumers in waiting state
enum RingBufferMode #
enum RingBufferMode {
spsc = 0 // Single Producer, Single Consumer (optimized for single-threaded access)
spmc = 1 // Single Producer, Multiple Consumers (one writer, multiple readers)
mpsc = 2 // Multiple Producers, Single Consumer (multiple writers, one reader)
mpmc = 3 // Multiple Producers, Multiple Consumers (default, fully concurrent)
}
RingBufferMode Operation modes for the ring buffer.
struct RingBuffer #
struct RingBuffer[T] {
mut:
mode u32 // Current operation mode (from RingBufferMode)
capacity u32 // Total capacity (always power of two)
mask u32 // Bitmask for index calculation (capacity - 1)
clear_flag u32 // Flag indicating clear operation in progress
max_waiting_prod_cons u32 // Max allowed waiting producers/consumers
pad0 [cache_line_size - 20]u8 // Padding to align to cache line boundary
// Producer state (isolated to prevent false sharing)
prod_head u32 // Producer head (next write position)
pad1 [cache_line_size - 4]u8 // Cache line padding
prod_tail u32 // Producer tail (last committed position)
pad2 [cache_line_size - 4]u8 // Cache line padding
// Consumer state (isolated to prevent false sharing)
cons_head u32 // Consumer head (next read position)
pad3 [cache_line_size - 4]u8 // Cache line padding
cons_tail u32 // Consumer tail (last committed position)
pad4 [cache_line_size - 4]u8 // Cache line padding
// Data storage area
slots []T // Array holding actual data elements
// Performance counters
push_full_count u32 // Count of full buffer encounters
push_fail_count u32 // Count of failed push attempts
push_wait_prev_count u32 // Count of waits for previous producers
push_waiting_count u32 // Current number of waiting producers
pop_empty_count u32 // Count of empty buffer encounters
pop_fail_count u32 // Count of failed pop attempts
pop_wait_prev_count u32 // Count of waits for previous consumers
pop_waiting_count u32 // Current number of waiting consumers
}
RingBuffer Lock-free multiple producer/multiple consumer ring buffer. Requires explicit initialization
struct RingBufferParam #
struct RingBufferParam {
pub:
mode RingBufferMode = .mpmc // Default to most concurrent mode
max_waiting_prod_cons int = 1 // Max allowed waiting producers/consumers before rejecting operations
}
RingBufferParam Configuration parameters for ring buffer creation.- max_waiting_prod_cons: Setting this to a larger value may improve performance,but in scenarios with many producers/consumers, it could lead to severe contention issues.
struct RingBufferStat #
struct RingBufferStat {
pub mut:
push_full_count u32 // Times producers encountered full buffer
push_fail_count u32 // Times producers failed to reserve space
push_wait_prev_count u32 // Times producers waited for predecessors
push_waiting_count u32 // Current number of producers in waiting state
pop_empty_count u32 // Times consumers found empty buffer
pop_fail_count u32 // Times consumers failed to reserve items
pop_wait_prev_count u32 // Times consumers waited for predecessors
pop_waiting_count u32 // Current number of consumers in waiting state
}
RingBufferStat holds performance counters for ring buffer operations.