From dfca7ed80c5fb638013a6024887330d60882c6ec Mon Sep 17 00:00:00 2001 From: Torsten Harenberg Date: Thu, 30 Oct 2025 09:34:50 +0100 Subject: [PATCH] Fix data loss on TCP reconnect: 1. Fixed ByteFIFO (fifo.go): Changed Dequeue(), DequeueOrWait(), and DequeueOrWaitContext() to always copy() data into a new byte slice instead of just slicing the buffer. 2. Fixed TCP server logic (tcpserver.go): Changed the send goroutine to poll for data availability (without dequeueing) using GetLen(), then sleep briefly to batch incoming data, and finally dequeue all available data at once. This avoids the race condition where we were splitting dequeue operations and losing bytes in between. --- fifo.go | 27 ++++++++++++++++++++--- ptc.go | 1 + tcpserver.go | 60 ++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 67 insertions(+), 21 deletions(-) diff --git a/fifo.go b/fifo.go index fa6b7c5..8ae43a1 100644 --- a/fifo.go +++ b/fifo.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "sync" ) @@ -50,7 +51,8 @@ func (f *ByteFIFO) Dequeue(n int) ([]byte, error) { n = len(f.buffer) } - data := f.buffer[:n] + data := make([]byte, n) + copy(data, f.buffer[:n]) f.buffer = f.buffer[n:] return data, nil } @@ -65,7 +67,8 @@ func (f *ByteFIFO) DequeueOrWait(n int) ([]byte, error) { f.cond.Wait() } - data := f.buffer[:n] + data := make([]byte, n) + copy(data, f.buffer[:n]) f.buffer = f.buffer[n:] return data, nil } @@ -92,8 +95,18 @@ func (f *ByteFIFO) DequeueOrWaitContext(ctx context.Context, n int) ([]byte, err } } - data = f.buffer[:n] + // Debug: print first 16 bytes of buffer before dequeueing + debugLen := len(f.buffer) + if debugLen > 16 { + debugLen = 16 + } + writeDebug(fmt.Sprintf("DequeueOrWaitContext: buffer has %d bytes, first %d bytes: %x, dequeueing %d bytes", len(f.buffer), debugLen, f.buffer[:debugLen], n), 1) + + data = make([]byte, n) + copy(data, f.buffer[:n]) f.buffer = f.buffer[n:] + + writeDebug(fmt.Sprintf("DequeueOrWaitContext: dequeued data: %x, buffer now has %d bytes", data, len(f.buffer)), 1) }() select { @@ -129,6 +142,14 @@ func (f *ByteFIFO) Capacity() int { return f.cap } +// Clear removes all data from the buffer. +func (f *ByteFIFO) Clear() { + f.mutex.Lock() + defer f.mutex.Unlock() + + f.buffer = make([]byte, 0, f.cap) +} + // ***** type StringFIFO struct { diff --git a/ptc.go b/ptc.go index ac0fefa..924c583 100644 --- a/ptc.go +++ b/ptc.go @@ -392,6 +392,7 @@ func (p *Modem) modemThread() { writeDebug("response: "+string(res)+"\n"+hex.Dump(res), 3) switch c { case PactorChannel: + writeDebug(fmt.Sprintf("Modem received %d bytes from PACTOR, enqueueing to Data.Response: %s", len(res), hex.EncodeToString(res)), 1) err := s.Data.Response.Enqueue(res) if err != nil { writeDebug(err.Error(), 0) diff --git a/tcpserver.go b/tcpserver.go index 1684823..102cdf1 100644 --- a/tcpserver.go +++ b/tcpserver.go @@ -156,38 +156,62 @@ func handleTCPDataConnection(conn net.Conn) { }() s.Protocol <- fmt.Sprintf(color.InGreen("TCP Data Connection established with %s\n"), conn.RemoteAddr()) + s.Status |= StatusTCPDataActive ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + // Clear only the outgoing data buffer (Data.Data) to discard any unsent data from previous connection + // But keep Data.Response so we don't lose any data received from the modem that should be sent to the client + s.Data.Data.Clear() + go func() { for ctx.Err() == nil { - msg, err := s.Data.Response.DequeueOrWaitContext(ctx, 1) - if err != nil { - s.Protocol <- fmt.Sprintf(color.InRed("End of TCP Data Connection %s\n"), conn.RemoteAddr()) - return - } else { - //TODO: VARA - if !s.DaemonMode { - err := s.FromPactor.Enqueue(msg) - if err != nil { - writeDebug(err.Error(), 0) + writeDebug("TCP Data send goroutine: waiting for data...", 1) + + // Wait until buffer has at least 1 byte without dequeueing + for { + if ctx.Err() != nil { + writeDebug("TCP Data send goroutine: context cancelled, exiting", 1) + return + } + + bufLen := s.Data.Response.GetLen() + if bufLen > 0 { + writeDebug(fmt.Sprintf("TCP Data send goroutine: buffer has %d bytes", bufLen), 1) + // Sleep briefly to allow more data to arrive + time.Sleep(10 * time.Millisecond) + + // Dequeue ALL available data + bufLen = s.Data.Response.GetLen() + msg, err := s.Data.Response.Dequeue(bufLen) + if err == nil { + writeDebug(fmt.Sprintf("TCP Data send goroutine: sending %d bytes to client", len(msg)), 1) + //TODO: VARA + if !s.DaemonMode { + err := s.FromPactor.Enqueue(msg) + if err != nil { + writeDebug(err.Error(), 0) + } + } + _, err = conn.Write(msg) + if err != nil { + writeDebug(err.Error(), 0) + } } + break } - _, err = conn.Write(msg) - if err != nil { - writeDebug(err.Error(), 0) - } + + // Sleep a bit before checking again + time.Sleep(1 * time.Millisecond) } } }() reader := bufio.NewReader(conn) for { - var temp []byte buf := make([]byte, 1024) n, err := reader.Read(buf) - if n > 0 { - temp = append(temp, buf[:n]...) - } + temp := buf[:n] if err != nil { s.Status &^= StatusTCPDataActive cancel()