1. Fixed ByteFIFO (fifo.go): Changed Dequeue(), DequeueOrWait(), and DequeueOrWaitContext() to always copy() data into a new byte slice instead of just slicing the buffer. 2. Fixed TCP server logic (tcpserver.go): Changed the send goroutine to poll for data availability (without dequeueing) using GetLen(), then sleep briefly to batch incoming data, and finally dequeue all available data at once. This avoids the race condition where we were splitting dequeue operations and losing bytes in between.
262 lines
6.1 KiB
Go
262 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
type ByteFIFO struct {
|
|
buffer []byte // The underlying byte slice
|
|
mutex sync.Mutex // Mutex for thread safety
|
|
cond *sync.Cond // Condition variable for signaling
|
|
cap int // Capacity of the buffer
|
|
}
|
|
|
|
// NewByteFIFO creates a new ByteFIFO with the given capacity.
|
|
func NewByteFIFO(capacity int) *ByteFIFO {
|
|
fifo := &ByteFIFO{
|
|
buffer: make([]byte, 0, capacity),
|
|
cap: capacity,
|
|
}
|
|
fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable
|
|
return fifo
|
|
}
|
|
|
|
// Enqueue adds bytes to the end of the buffer. Returns an error if the buffer is full.
|
|
func (f *ByteFIFO) Enqueue(data []byte) error {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer)+len(data) > f.cap {
|
|
return errors.New("buffer overflow")
|
|
}
|
|
|
|
f.buffer = append(f.buffer, data...)
|
|
f.cond.Signal() // Notify waiting goroutines that data is available
|
|
return nil
|
|
}
|
|
|
|
// Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)).
|
|
// Returns an error if nothing is in the queue
|
|
func (f *ByteFIFO) Dequeue(n int) ([]byte, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer) == 0 {
|
|
return nil, errors.New("buffer is empty")
|
|
}
|
|
if n > len(f.buffer) {
|
|
n = len(f.buffer)
|
|
}
|
|
|
|
data := make([]byte, n)
|
|
copy(data, f.buffer[:n])
|
|
f.buffer = f.buffer[n:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available.
|
|
func (f *ByteFIFO) DequeueOrWait(n int) ([]byte, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) < n {
|
|
f.cond.Wait()
|
|
}
|
|
|
|
data := make([]byte, n)
|
|
copy(data, f.buffer[:n])
|
|
f.buffer = f.buffer[n:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled.
|
|
func (f *ByteFIFO) DequeueOrWaitContext(ctx context.Context, n int) ([]byte, error) {
|
|
done := make(chan struct{})
|
|
var data []byte
|
|
var err error
|
|
|
|
go func() {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
defer close(done)
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) < n {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
return
|
|
default:
|
|
f.cond.Wait()
|
|
}
|
|
}
|
|
|
|
// Debug: print first 16 bytes of buffer before dequeueing
|
|
debugLen := len(f.buffer)
|
|
if debugLen > 16 {
|
|
debugLen = 16
|
|
}
|
|
writeDebug(fmt.Sprintf("DequeueOrWaitContext: buffer has %d bytes, first %d bytes: %x, dequeueing %d bytes", len(f.buffer), debugLen, f.buffer[:debugLen], n), 1)
|
|
|
|
data = make([]byte, n)
|
|
copy(data, f.buffer[:n])
|
|
f.buffer = f.buffer[n:]
|
|
|
|
writeDebug(fmt.Sprintf("DequeueOrWaitContext: dequeued data: %x, buffer now has %d bytes", data, len(f.buffer)), 1)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return data, err
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes.
|
|
func (f *ByteFIFO) Peek(n int) ([]byte, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if n > len(f.buffer) {
|
|
return nil, errors.New("not enough data in buffer")
|
|
}
|
|
|
|
return f.buffer[:n], nil
|
|
}
|
|
|
|
// GetLen returns the current number of bytes in the buffer.
|
|
func (f *ByteFIFO) GetLen() int {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
return len(f.buffer)
|
|
}
|
|
|
|
// Capacity returns the maximum capacity of the buffer.
|
|
func (f *ByteFIFO) Capacity() int {
|
|
return f.cap
|
|
}
|
|
|
|
// Clear removes all data from the buffer.
|
|
func (f *ByteFIFO) Clear() {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
f.buffer = make([]byte, 0, f.cap)
|
|
}
|
|
|
|
// *****
|
|
|
|
type StringFIFO struct {
|
|
buffer []string // The underlying byte slice
|
|
mutex sync.Mutex // Mutex for thread safety
|
|
cond *sync.Cond // Condition variable for signaling
|
|
}
|
|
|
|
// NewStringFIFO creates a new StringFIFO.
|
|
func NewStringFIFO() *StringFIFO {
|
|
fifo := &StringFIFO{
|
|
buffer: make([]string, 0),
|
|
}
|
|
fifo.cond = sync.NewCond(&fifo.mutex) // Initialize condition variable
|
|
return fifo
|
|
}
|
|
|
|
// Enqueue adds a string to the end of the buffer. Returns an error if the buffer is full.
|
|
func (f *StringFIFO) Enqueue(data string) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
f.buffer = append(f.buffer, data)
|
|
f.cond.Signal() // Notify waiting goroutines that data is available
|
|
}
|
|
|
|
// Dequeue removes and returns the first `n` bytes from the buffer (or what's left if n>len(f.buffer)).
|
|
// Returns an error if nothing is in the queue
|
|
func (f *StringFIFO) Dequeue() (string, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer) == 0 {
|
|
return "", errors.New("buffer is empty")
|
|
}
|
|
|
|
data := f.buffer[0]
|
|
f.buffer = f.buffer[1:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWait removes and returns the first `n` bytes, waiting if the buffer is empty until data is available.
|
|
func (f *StringFIFO) DequeueOrWait() (string, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) == 0 {
|
|
f.cond.Wait()
|
|
}
|
|
|
|
data := f.buffer[0]
|
|
f.buffer = f.buffer[1:]
|
|
return data, nil
|
|
}
|
|
|
|
// DequeueOrWaitContext removes and returns the first `n` bytes, waiting if necessary until data is available or the context is canceled.
|
|
func (f *StringFIFO) DequeueOrWaitContext(ctx context.Context) (string, error) {
|
|
done := make(chan struct{})
|
|
var data string
|
|
var err error
|
|
|
|
go func() {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
defer close(done)
|
|
|
|
// Wait until enough data is available
|
|
for len(f.buffer) == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
return
|
|
default:
|
|
f.cond.Wait()
|
|
}
|
|
}
|
|
|
|
data = f.buffer[0]
|
|
f.buffer = f.buffer[1:]
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return data, err
|
|
case <-ctx.Done():
|
|
return "", ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Peek returns the first `n` bytes without removing them from the buffer. Returns an error if there are not enough bytes.
|
|
func (f *StringFIFO) Peek() (string, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
if len(f.buffer) == 0 {
|
|
return "", errors.New("not enough data in buffer")
|
|
}
|
|
|
|
return f.buffer[0], nil
|
|
}
|
|
|
|
// GetLen returns the current number of bytes in the buffer.
|
|
func (f *StringFIFO) GetLen() int {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
return len(f.buffer)
|
|
}
|