first commit in new repo

This commit is contained in:
Torsten Harenberg
2025-02-02 19:19:16 +01:00
commit 255137345f
10 changed files with 2009 additions and 0 deletions

241
fifo.go Normal file
View File

@@ -0,0 +1,241 @@
package main
import (
"context"
"errors"
"sync"
)
type ByteFIFO struct {
buffer []byte // The underlying byte slice
mutex sync.Mutex // Mutex for thread safety
cond *sync.Cond // Condition variable for signaling
cap int // Capacity of the buffer
}
// NewByteFIFO creates a new ByteFIFO with the given capacity.
func NewByteFIFO(capacity int) *ByteFIFO {
fifo := &ByteFIFO{
buffer: make([]byte, 0, capacity),
cap: capacity,
}
fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable
return fifo
}
// Enqueue adds bytes to the end of the buffer. Returns an error if the buffer is full.
func (f *ByteFIFO) Enqueue(data []byte) error {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.buffer)+len(data) > f.cap {
return errors.New("buffer overflow")
}
f.buffer = append(f.buffer, data...)
f.cond.Signal() // Notify waiting goroutines that data is available
return nil
}
// Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)).
// Returns an error if nothing is in the queue
func (f *ByteFIFO) Dequeue(n int) ([]byte, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.buffer) == 0 {
return nil, errors.New("buffer is empty")
}
if n > len(f.buffer) {
n = len(f.buffer)
}
data := f.buffer[:n]
f.buffer = f.buffer[n:]
return data, nil
}
// DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available.
func (f *ByteFIFO) DequeueOrWait(n int) ([]byte, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
// Wait until enough data is available
for len(f.buffer) < n {
f.cond.Wait()
}
data := f.buffer[:n]
f.buffer = f.buffer[n:]
return data, nil
}
// DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled.
func (f *ByteFIFO) DequeueOrWaitContext(ctx context.Context, n int) ([]byte, error) {
done := make(chan struct{})
var data []byte
var err error
go func() {
f.mutex.Lock()
defer f.mutex.Unlock()
defer close(done)
// Wait until enough data is available
for len(f.buffer) < n {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
f.cond.Wait()
}
}
data = f.buffer[:n]
f.buffer = f.buffer[n:]
}()
select {
case <-done:
return data, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes.
func (f *ByteFIFO) Peek(n int) ([]byte, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if n > len(f.buffer) {
return nil, errors.New("not enough data in buffer")
}
return f.buffer[:n], nil
}
// Size returns the current number of bytes in the buffer.
func (f *ByteFIFO) GetLen() int {
f.mutex.Lock()
defer f.mutex.Unlock()
return len(f.buffer)
}
// Capacity returns the maximum capacity of the buffer.
func (f *ByteFIFO) Capacity() int {
return f.cap
}
// *****
type StringFIFO struct {
buffer []string // The underlying byte slice
mutex sync.Mutex // Mutex for thread safety
cond *sync.Cond // Condition variable for signaling
}
// NewByteFIFO creates a new ByteFIFO with the given capacity.
func NewStringFIFO() *StringFIFO {
fifo := &StringFIFO{
buffer: make([]string, 0),
}
fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable
return fifo
}
// Enqueue adds a string to the end of the buffer. Returns an error if the buffer is full.
func (f *StringFIFO) Enqueue(data string) error {
f.mutex.Lock()
defer f.mutex.Unlock()
f.buffer = append(f.buffer, data)
f.cond.Signal() // Notify waiting goroutines that data is available
return nil
}
// Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)).
// Returns an error if nothing is in the queue
func (f *StringFIFO) Dequeue() (string, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.buffer) == 0 {
return "", errors.New("buffer is empty")
}
data := f.buffer[0]
f.buffer = f.buffer[1:]
return data, nil
}
// DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available.
func (f *StringFIFO) DequeueOrWait() (string, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
// Wait until enough data is available
for len(f.buffer) == 0 {
f.cond.Wait()
}
data := f.buffer[0]
f.buffer = f.buffer[1:]
return data, nil
}
// DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled.
func (f *StringFIFO) DequeueOrWaitContext(ctx context.Context) (string, error) {
done := make(chan struct{})
var data string
var err error
go func() {
f.mutex.Lock()
defer f.mutex.Unlock()
defer close(done)
// Wait until enough data is available
for len(f.buffer) == 0 {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
f.cond.Wait()
}
}
data = f.buffer[0]
f.buffer = f.buffer[1:]
}()
select {
case <-done:
return data, err
case <-ctx.Done():
return "", ctx.Err()
}
}
// Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes.
func (f *StringFIFO) Peek() (string, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if len(f.buffer) == 0 {
return "", errors.New("not enough data in buffer")
}
return f.buffer[0], nil
}
// GetLen returns the current number of bytes in the buffer.
func (f *StringFIFO) GetLen() int {
f.mutex.Lock()
defer f.mutex.Unlock()
return len(f.buffer)
}