241 lines
5.5 KiB
Go
241 lines
5.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
type ByteFIFO struct {
|
|
buffer []byte // The underlying byte slice
|
|
mutex sync.Mutex // Mutex for thread safety
|
|
cond *sync.Cond // Condition variable for signaling
|
|
cap int // Capacity of the buffer
|
|
}
|
|
|
|
// NewByteFIFO creates a new ByteFIFO with the given capacity.
|
|
func NewByteFIFO(capacity int) *ByteFIFO {
|
|
fifo := &ByteFIFO{
|
|
buffer: make([]byte, 0, capacity),
|
|
cap: capacity,
|
|
}
|
|
fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable
|
|
return fifo
|
|
}
|
|
|
|
// Enqueue adds bytes to the end of the buffer. Returns an error if the buffer is full.
|
|
func (f *ByteFIFO) Enqueue(data []byte) error {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer)+len(data) > f.cap {
|
|
return errors.New("buffer overflow")
|
|
}
|
|
|
|
f.buffer = append(f.buffer, data...)
|
|
f.cond.Signal() // Notify waiting goroutines that data is available
|
|
return nil
|
|
}
|
|
|
|
// Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)).
|
|
// Returns an error if nothing is in the queue
|
|
func (f *ByteFIFO) Dequeue(n int) ([]byte, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer) == 0 {
|
|
return nil, errors.New("buffer is empty")
|
|
}
|
|
if n > len(f.buffer) {
|
|
n = len(f.buffer)
|
|
}
|
|
|
|
data := f.buffer[:n]
|
|
f.buffer = f.buffer[n:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available.
|
|
func (f *ByteFIFO) DequeueOrWait(n int) ([]byte, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) < n {
|
|
f.cond.Wait()
|
|
}
|
|
|
|
data := f.buffer[:n]
|
|
f.buffer = f.buffer[n:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled.
|
|
func (f *ByteFIFO) DequeueOrWaitContext(ctx context.Context, n int) ([]byte, error) {
|
|
done := make(chan struct{})
|
|
var data []byte
|
|
var err error
|
|
|
|
go func() {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
defer close(done)
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) < n {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
return
|
|
default:
|
|
f.cond.Wait()
|
|
}
|
|
}
|
|
|
|
data = f.buffer[:n]
|
|
f.buffer = f.buffer[n:]
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return data, err
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes.
|
|
func (f *ByteFIFO) Peek(n int) ([]byte, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if n > len(f.buffer) {
|
|
return nil, errors.New("not enough data in buffer")
|
|
}
|
|
|
|
return f.buffer[:n], nil
|
|
}
|
|
|
|
// GetLen returns the current number of bytes in the buffer.
|
|
func (f *ByteFIFO) GetLen() int {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
return len(f.buffer)
|
|
}
|
|
|
|
// Capacity returns the maximum capacity of the buffer.
|
|
func (f *ByteFIFO) Capacity() int {
|
|
return f.cap
|
|
}
|
|
|
|
// *****
|
|
|
|
type StringFIFO struct {
|
|
buffer []string // The underlying byte slice
|
|
mutex sync.Mutex // Mutex for thread safety
|
|
cond *sync.Cond // Condition variable for signaling
|
|
}
|
|
|
|
// NewStringFIFO creates a new StringFIFO.
|
|
func NewStringFIFO() *StringFIFO {
|
|
fifo := &StringFIFO{
|
|
buffer: make([]string, 0),
|
|
}
|
|
fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable
|
|
return fifo
|
|
}
|
|
|
|
// Enqueue adds a string to the end of the buffer. Returns an error if the buffer is full.
|
|
func (f *StringFIFO) Enqueue(data string) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
f.buffer = append(f.buffer, data)
|
|
f.cond.Signal() // Notify waiting goroutines that data is available
|
|
}
|
|
|
|
// Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)).
|
|
// Returns an error if nothing is in the queue
|
|
func (f *StringFIFO) Dequeue() (string, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer) == 0 {
|
|
return "", errors.New("buffer is empty")
|
|
}
|
|
|
|
data := f.buffer[0]
|
|
f.buffer = f.buffer[1:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available.
|
|
func (f *StringFIFO) DequeueOrWait() (string, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) == 0 {
|
|
f.cond.Wait()
|
|
}
|
|
|
|
data := f.buffer[0]
|
|
f.buffer = f.buffer[1:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled.
|
|
func (f *StringFIFO) DequeueOrWaitContext(ctx context.Context) (string, error) {
|
|
done := make(chan struct{})
|
|
var data string
|
|
var err error
|
|
|
|
go func() {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
defer close(done)
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
return
|
|
default:
|
|
f.cond.Wait()
|
|
}
|
|
}
|
|
|
|
data = f.buffer[0]
|
|
f.buffer = f.buffer[1:]
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return data, err
|
|
case <-ctx.Done():
|
|
return "", ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes.
|
|
func (f *StringFIFO) Peek() (string, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer) == 0 {
|
|
return "", errors.New("not enough data in buffer")
|
|
}
|
|
|
|
return f.buffer[0], nil
|
|
}
|
|
|
|
// GetLen returns the current number of bytes in the buffer.
|
|
func (f *StringFIFO) GetLen() int {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
return len(f.buffer)
|
|
}
|