Skip to content

datatypes.lockfree #

Lockfree Library for V

A high-performance, thread-safe collection of lock-free data structures for the V programming language. Designed for concurrent applications requiring low-latency and high-throughput data processing.

Features

  • Truly Lock-Free: No mutexes or spinlocks
  • Cross-Platform: Works on Windows, Linux, macOS
  • Configurable: Tune parameters for specific workloads
  • High Performance: Optimized for modern multi-core processors

Data Structures

1. Atomic Counter

A thread-safe counter with atomic operations.

import datatypes.lockfree

mut counter := lockfree.new_counter[int](0)
counter.increment()
counter.increment_by(5)
value := counter.get() // 6
counter.decrement()
counter.clear()

Features:- Atomic increment/decrement

  • Batch operations
  • Get current value
  • Reset functionality

2. Ring Buffer

A circular buffer for producer-consumer scenarios.

import datatypes.lockfree

mut rb := lockfree.new_ringbuffer[int](1024)
rb.push(10)
rb.push(20)
item := rb.pop() // 10
free := rb.remaining()

Features:- Single/Multi producer/consumer modes

  • Blocking/non-blocking operations
  • Batch operations
  • Configurable size
  • Memory efficient

Acknowledgements

This library incorporates research and design principles from:- Intel Threading Building Blocks (TBB)

  • Facebook Folly
  • Java Concurrent Package
  • Dmitry Vyukov's lock-free algorithms
  • DPDK rte ring

fn new_counter #

fn new_counter[T](init T) &Counter[T]

new_counter creates a new atomic counter with the specified initial value. It only supports integer types (8-bit to 64-bit integers).

fn new_ringbuffer #

fn new_ringbuffer[T](size u32, param RingBufferParam) &RingBuffer[T]

new_ringbuffer creates a new lock-free ring buffer.

Note: The buffer capacity will be expanded to the next power of two for efficient modulo operations using bitwise AND. The actual capacity may be larger than the requested size.

fn (Counter[T]) increment_by #

fn (mut c Counter[T]) increment_by(delta T) T

increment_by atomically adds a delta value to the counter and returns the previous value before the operation (fetch-and-add).

fn (Counter[T]) increment #

fn (mut c Counter[T]) increment() T

increment atomically increments the counter by 1 and returns the previous value before the operation.

fn (Counter[T]) decrement_by #

fn (mut c Counter[T]) decrement_by(delta T) T

decrement_by atomically subtracts a delta value from the counter and returns the previous value before the operation (fetch-and-sub).

fn (Counter[T]) decrement #

fn (mut c Counter[T]) decrement() T

decrement atomically decrements the counter by 1 and returns the previous value before the operation.

fn (Counter[T]) get #

fn (mut c Counter[T]) get() T

get atomically retrieves the current value of the counter.

fn (Counter[T]) clear #

fn (mut c Counter[T]) clear()

clear atomically resets the counter to zero.

fn (RingBuffer[T]) try_push #

fn (mut rb RingBuffer[T]) try_push(item T) bool

try_push tries to push a single item non-blocking.

fn (RingBuffer[T]) try_push_many #

fn (mut rb RingBuffer[T]) try_push_many(items []T) u32

try_push_many tries to push multiple items non-blocking.

fn (RingBuffer[T]) try_pop #

fn (mut rb RingBuffer[T]) try_pop() ?T

try_pop tries to pop a single item non-blocking.

fn (RingBuffer[T]) try_pop_many #

fn (mut rb RingBuffer[T]) try_pop_many(mut items []T) u32

try_pop_many tries to pop multiple items non-blocking.

fn (RingBuffer[T]) push #

fn (mut rb RingBuffer[T]) push(item T)

push blocking push of a single item.

fn (RingBuffer[T]) pop #

fn (mut rb RingBuffer[T]) pop() T

pop blocking pop of a single item.

fn (RingBuffer[T]) push_many #

fn (mut rb RingBuffer[T]) push_many(items []T)

push_many blocking push of multiple items.

fn (RingBuffer[T]) pop_many #

fn (mut rb RingBuffer[T]) pop_many(mut result []T)

pop_many blocking pop of multiple items.

fn (RingBuffer[T]) is_empty #

fn (rb RingBuffer[T]) is_empty() bool

is_empty checks if the buffer is empty.

fn (RingBuffer[T]) is_full #

fn (rb RingBuffer[T]) is_full() bool

is_full checks if the buffer is full.

fn (RingBuffer[T]) capacity #

fn (rb RingBuffer[T]) capacity() u32

capacity returns the total capacity of the buffer.

fn (RingBuffer[T]) occupied #

fn (rb RingBuffer[T]) occupied() u32

occupied returns the number of occupied slots.

fn (RingBuffer[T]) remaining #

fn (rb RingBuffer[T]) remaining() u32

remaining returns the number of free slots.

fn (RingBuffer[T]) clear #

fn (mut rb RingBuffer[T]) clear() bool

clear clears the ring buffer and resets all pointers.

fn (RingBuffer[T]) stat #

fn (rb RingBuffer[T]) stat() RingBufferStat

stat retrieves current performance statistics of the ring buffer.

This method fetches all recorded operation counters:- push_full_count: Times producers encountered full buffer

  • push_fail_count: Times producers failed to reserve space
  • push_wait_prev_count: Times producers waited for predecessors
  • push_waiting_count: Current number of producers in waiting state
  • pop_empty_count: Times consumers found empty buffer
  • pop_fail_count: Times consumers failed to reserve items
  • pop_wait_prev_count: Times consumers waited for predecessors
  • pop_waiting_count: Current number of consumers in waiting state

enum RingBufferMode #

enum RingBufferMode {
	spsc = 0 // Single Producer, Single Consumer (optimized for single-threaded access)
	spmc = 1 // Single Producer, Multiple Consumers (one writer, multiple readers)
	mpsc = 2 // Multiple Producers, Single Consumer (multiple writers, one reader)
	mpmc = 3 // Multiple Producers, Multiple Consumers (default, fully concurrent)
}

RingBufferMode Operation modes for the ring buffer.

struct RingBuffer #

@[noinit]
struct RingBuffer[T] {
mut:
	mode                  u32                      // Current operation mode (from RingBufferMode)
	capacity              u32                      // Total capacity (always power of two)
	mask                  u32                      // Bitmask for index calculation (capacity - 1)
	clear_flag            u32                      // Flag indicating clear operation in progress
	max_waiting_prod_cons u32                      // Max allowed waiting producers/consumers
	pad0                  [cache_line_size - 20]u8 // Padding to align to cache line boundary

	// Producer state (isolated to prevent false sharing)
	prod_head u32                     // Producer head (next write position)
	pad1      [cache_line_size - 4]u8 // Cache line padding
	prod_tail u32                     // Producer tail (last committed position)
	pad2      [cache_line_size - 4]u8 // Cache line padding

	// Consumer state (isolated to prevent false sharing)
	cons_head u32                     // Consumer head (next read position)
	pad3      [cache_line_size - 4]u8 // Cache line padding
	cons_tail u32                     // Consumer tail (last committed position)
	pad4      [cache_line_size - 4]u8 // Cache line padding

	// Data storage area
	slots []T // Array holding actual data elements

	// Performance counters
	push_full_count      u32 // Count of full buffer encounters
	push_fail_count      u32 // Count of failed push attempts
	push_wait_prev_count u32 // Count of waits for previous producers
	push_waiting_count   u32 // Current number of waiting producers
	pop_empty_count      u32 // Count of empty buffer encounters
	pop_fail_count       u32 // Count of failed pop attempts
	pop_wait_prev_count  u32 // Count of waits for previous consumers
	pop_waiting_count    u32 // Current number of waiting consumers
}

RingBuffer Lock-free multiple producer/multiple consumer ring buffer. Requires explicit initialization

struct RingBufferParam #

@[params]
struct RingBufferParam {
pub:
	mode                  RingBufferMode = .mpmc // Default to most concurrent mode
	max_waiting_prod_cons int            = 1     // Max allowed waiting producers/consumers before rejecting operations
}

RingBufferParam Configuration parameters for ring buffer creation.- max_waiting_prod_cons: Setting this to a larger value may improve performance,but in scenarios with many producers/consumers, it could lead to severe contention issues.

struct RingBufferStat #

struct RingBufferStat {
pub mut:
	push_full_count      u32 // Times producers encountered full buffer
	push_fail_count      u32 // Times producers failed to reserve space
	push_wait_prev_count u32 // Times producers waited for predecessors
	push_waiting_count   u32 // Current number of producers in waiting state
	pop_empty_count      u32 // Times consumers found empty buffer
	pop_fail_count       u32 // Times consumers failed to reserve items
	pop_wait_prev_count  u32 // Times consumers waited for predecessors
	pop_waiting_count    u32 // Current number of consumers in waiting state
}

RingBufferStat holds performance counters for ring buffer operations.