package main import ( "context" "errors" "sync" ) type ByteFIFO struct { buffer []byte // The underlying byte slice mutex sync.Mutex // Mutex for thread safety cond *sync.Cond // Condition variable for signaling cap int // Capacity of the buffer } // NewByteFIFO creates a new ByteFIFO with the given capacity. func NewByteFIFO(capacity int) *ByteFIFO { fifo := &ByteFIFO{ buffer: make([]byte, 0, capacity), cap: capacity, } fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable return fifo } // Enqueue adds bytes to the end of the buffer. Returns an error if the buffer is full. func (f *ByteFIFO) Enqueue(data []byte) error { f.mutex.Lock() defer f.mutex.Unlock() if len(f.buffer)+len(data) > f.cap { return errors.New("buffer overflow") } f.buffer = append(f.buffer, data...) f.cond.Signal() // Notify waiting goroutines that data is available return nil } // Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)). // Returns an error if nothing is in the queue func (f *ByteFIFO) Dequeue(n int) ([]byte, error) { f.mutex.Lock() defer f.mutex.Unlock() if len(f.buffer) == 0 { return nil, errors.New("buffer is empty") } if n > len(f.buffer) { n = len(f.buffer) } data := f.buffer[:n] f.buffer = f.buffer[n:] return data, nil } // DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available. func (f *ByteFIFO) DequeueOrWait(n int) ([]byte, error) { f.mutex.Lock() defer f.mutex.Unlock() // Wait until enough data is available for len(f.buffer) < n { f.cond.Wait() } data := f.buffer[:n] f.buffer = f.buffer[n:] return data, nil } // DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled. func (f *ByteFIFO) DequeueOrWaitContext(ctx context.Context, n int) ([]byte, error) { done := make(chan struct{}) var data []byte var err error go func() { f.mutex.Lock() defer f.mutex.Unlock() defer close(done) // Wait until enough data is available for len(f.buffer) < n { select { case <-ctx.Done(): err = ctx.Err() return default: f.cond.Wait() } } data = f.buffer[:n] f.buffer = f.buffer[n:] }() select { case <-done: return data, err case <-ctx.Done(): return nil, ctx.Err() } } // Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes. func (f *ByteFIFO) Peek(n int) ([]byte, error) { f.mutex.Lock() defer f.mutex.Unlock() if n > len(f.buffer) { return nil, errors.New("not enough data in buffer") } return f.buffer[:n], nil } // GetLen returns the current number of bytes in the buffer. func (f *ByteFIFO) GetLen() int { f.mutex.Lock() defer f.mutex.Unlock() return len(f.buffer) } // Capacity returns the maximum capacity of the buffer. func (f *ByteFIFO) Capacity() int { return f.cap } // ***** type StringFIFO struct { buffer []string // The underlying byte slice mutex sync.Mutex // Mutex for thread safety cond *sync.Cond // Condition variable for signaling } // NewStringFIFO creates a new StringFIFO. func NewStringFIFO() *StringFIFO { fifo := &StringFIFO{ buffer: make([]string, 0), } fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable return fifo } // Enqueue adds a string to the end of the buffer. Returns an error if the buffer is full. func (f *StringFIFO) Enqueue(data string) { f.mutex.Lock() defer f.mutex.Unlock() f.buffer = append(f.buffer, data) f.cond.Signal() // Notify waiting goroutines that data is available } // Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)). // Returns an error if nothing is in the queue func (f *StringFIFO) Dequeue() (string, error) { f.mutex.Lock() defer f.mutex.Unlock() if len(f.buffer) == 0 { return "", errors.New("buffer is empty") } data := f.buffer[0] f.buffer = f.buffer[1:] return data, nil } // DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available. func (f *StringFIFO) DequeueOrWait() (string, error) { f.mutex.Lock() defer f.mutex.Unlock() // Wait until enough data is available for len(f.buffer) == 0 { f.cond.Wait() } data := f.buffer[0] f.buffer = f.buffer[1:] return data, nil } // DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled. func (f *StringFIFO) DequeueOrWaitContext(ctx context.Context) (string, error) { done := make(chan struct{}) var data string var err error go func() { f.mutex.Lock() defer f.mutex.Unlock() defer close(done) // Wait until enough data is available for len(f.buffer) == 0 { select { case <-ctx.Done(): err = ctx.Err() return default: f.cond.Wait() } } data = f.buffer[0] f.buffer = f.buffer[1:] }() select { case <-done: return data, err case <-ctx.Done(): return "", ctx.Err() } } // Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes. func (f *StringFIFO) Peek() (string, error) { f.mutex.Lock() defer f.mutex.Unlock() if len(f.buffer) == 0 { return "", errors.New("not enough data in buffer") } return f.buffer[0], nil } // GetLen returns the current number of bytes in the buffer. func (f *StringFIFO) GetLen() int { f.mutex.Lock() defer f.mutex.Unlock() return len(f.buffer) }