diff --git a/fifo.go b/fifo.go index fa6b7c5..8ae43a1 100644 --- a/fifo.go +++ b/fifo.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "sync" ) @@ -50,7 +51,8 @@ func (f *ByteFIFO) Dequeue(n int) ([]byte, error) { n = len(f.buffer) } - data := f.buffer[:n] + data := make([]byte, n) + copy(data, f.buffer[:n]) f.buffer = f.buffer[n:] return data, nil } @@ -65,7 +67,8 @@ func (f *ByteFIFO) DequeueOrWait(n int) ([]byte, error) { f.cond.Wait() } - data := f.buffer[:n] + data := make([]byte, n) + copy(data, f.buffer[:n]) f.buffer = f.buffer[n:] return data, nil } @@ -92,8 +95,18 @@ func (f *ByteFIFO) DequeueOrWaitContext(ctx context.Context, n int) ([]byte, err } } - data = f.buffer[:n] + // Debug: print first 16 bytes of buffer before dequeueing + debugLen := len(f.buffer) + if debugLen > 16 { + debugLen = 16 + } + writeDebug(fmt.Sprintf("DequeueOrWaitContext: buffer has %d bytes, first %d bytes: %x, dequeueing %d bytes", len(f.buffer), debugLen, f.buffer[:debugLen], n), 1) + + data = make([]byte, n) + copy(data, f.buffer[:n]) f.buffer = f.buffer[n:] + + writeDebug(fmt.Sprintf("DequeueOrWaitContext: dequeued data: %x, buffer now has %d bytes", data, len(f.buffer)), 1) }() select { @@ -129,6 +142,14 @@ func (f *ByteFIFO) Capacity() int { return f.cap } +// Clear removes all data from the buffer. +func (f *ByteFIFO) Clear() { + f.mutex.Lock() + defer f.mutex.Unlock() + + f.buffer = make([]byte, 0, f.cap) +} + // ***** type StringFIFO struct { diff --git a/ptc.go b/ptc.go index ac0fefa..924c583 100644 --- a/ptc.go +++ b/ptc.go @@ -392,6 +392,7 @@ func (p *Modem) modemThread() { writeDebug("response: "+string(res)+"\n"+hex.Dump(res), 3) switch c { case PactorChannel: + writeDebug(fmt.Sprintf("Modem received %d bytes from PACTOR, enqueueing to Data.Response: %s", len(res), hex.EncodeToString(res)), 1) err := s.Data.Response.Enqueue(res) if err != nil { writeDebug(err.Error(), 0) diff --git a/tcpserver.go b/tcpserver.go index 1684823..102cdf1 100644 --- a/tcpserver.go +++ b/tcpserver.go @@ -156,38 +156,62 @@ func handleTCPDataConnection(conn net.Conn) { }() s.Protocol <- fmt.Sprintf(color.InGreen("TCP Data Connection established with %s\n"), conn.RemoteAddr()) + s.Status |= StatusTCPDataActive ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + // Clear only the outgoing data buffer (Data.Data) to discard any unsent data from previous connection + // But keep Data.Response so we don't lose any data received from the modem that should be sent to the client + s.Data.Data.Clear() + go func() { for ctx.Err() == nil { - msg, err := s.Data.Response.DequeueOrWaitContext(ctx, 1) - if err != nil { - s.Protocol <- fmt.Sprintf(color.InRed("End of TCP Data Connection %s\n"), conn.RemoteAddr()) - return - } else { - //TODO: VARA - if !s.DaemonMode { - err := s.FromPactor.Enqueue(msg) - if err != nil { - writeDebug(err.Error(), 0) + writeDebug("TCP Data send goroutine: waiting for data...", 1) + + // Wait until buffer has at least 1 byte without dequeueing + for { + if ctx.Err() != nil { + writeDebug("TCP Data send goroutine: context cancelled, exiting", 1) + return + } + + bufLen := s.Data.Response.GetLen() + if bufLen > 0 { + writeDebug(fmt.Sprintf("TCP Data send goroutine: buffer has %d bytes", bufLen), 1) + // Sleep briefly to allow more data to arrive + time.Sleep(10 * time.Millisecond) + + // Dequeue ALL available data + bufLen = s.Data.Response.GetLen() + msg, err := s.Data.Response.Dequeue(bufLen) + if err == nil { + writeDebug(fmt.Sprintf("TCP Data send goroutine: sending %d bytes to client", len(msg)), 1) + //TODO: VARA + if !s.DaemonMode { + err := s.FromPactor.Enqueue(msg) + if err != nil { + writeDebug(err.Error(), 0) + } + } + _, err = conn.Write(msg) + if err != nil { + writeDebug(err.Error(), 0) + } } + break } - _, err = conn.Write(msg) - if err != nil { - writeDebug(err.Error(), 0) - } + + // Sleep a bit before checking again + time.Sleep(1 * time.Millisecond) } } }() reader := bufio.NewReader(conn) for { - var temp []byte buf := make([]byte, 1024) n, err := reader.Read(buf) - if n > 0 { - temp = append(temp, buf[:n]...) - } + temp := buf[:n] if err != nil { s.Status &^= StatusTCPDataActive cancel()