Skip to content
Prev Previous commit
Next Next commit
RTSPClient: error corrections
  • Loading branch information
pschatzmann committed Sep 23, 2025
commit dd24c15a27b84361fd9e0af21c563f65102c132d
153 changes: 98 additions & 55 deletions src/AudioTools/Communication/RTSP/RTSPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,14 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {

// OPTIONS
LOGI("OPTIONS");
if (!sendSimpleRequest("OPTIONS", m_baseUrl, nullptr, 0, m_hdrBuf,
sizeof(m_hdrBuf), nullptr, 0)) {
// Some servers expect "OPTIONS *" instead of URL
if (!sendSimpleRequest("OPTIONS", "*", nullptr, 0, m_hdrBuf,
sizeof(m_hdrBuf), nullptr, 0)) {
int retry = m_connectRetries;
while (!sendSimpleRequest("OPTIONS", m_baseUrl, nullptr, 0, m_hdrBuf,
sizeof(m_hdrBuf), nullptr, 0)) {
if (--retry == 0) {
return fail("OPTIONS failed");
} else {
LOGW("RTSPClient: retrying OPTIONS");
delay(800);
}
}

Expand All @@ -182,6 +184,8 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
// Parse a=control and build the correct track URL for SETUP
parseControlFromSdp(m_bodyBuf);
buildTrackUrlFromBaseAndControl();
LOGI("RTSPClient: SDP control='%s' content-base='%s'", m_sdpControl,
m_contentBase);
LOGI("RTSPClient: SETUP url: %s", m_trackUrl);

// Prepare UDP (client_port)
Expand Down Expand Up @@ -215,6 +219,7 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
primeUdpPath();

// PLAY
LOGI("PLAY");
char sessionHdr[128];
snprintf(sessionHdr, sizeof(sessionHdr), "Session: %s\r\n", m_sessionId);
if (!sendSimpleRequest("PLAY", m_baseUrl, sessionHdr, strlen(sessionHdr),
Expand Down Expand Up @@ -257,10 +262,8 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
}
}

if (m_udp) {
m_udp->stop();
delete m_udp;
m_udp = nullptr;
if (m_udp_active) {
m_udp.stop();
}
if (m_tcp.connected()) m_tcp.stop();
m_started = false;
Expand All @@ -282,7 +285,7 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
return 0;
}
serviceUdp();
int avail = (int)(m_pktSize > m_pktPos ? (m_pktSize - m_pktPos) : 0);
int avail = m_pktBuf.available();
if (avail == 0) delay(m_idleDelayMs);
return avail;
}
Expand Down Expand Up @@ -358,7 +361,7 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
if (ok) {
m_isPlaying = false;
// drop any buffered payload
m_pktPos = m_pktSize = 0;
m_pktBuf.clear();
}
}
return ok;
Expand Down Expand Up @@ -391,7 +394,7 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
return 0;
}
serviceUdp();
if (m_pktPos >= m_pktSize) {
if (m_pktBuf.isEmpty()) {
LOGW("no data");
delay(m_idleDelayMs);
return 0;
Expand All @@ -412,10 +415,10 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
}
}

size_t n = m_pktSize - m_pktPos;
size_t written = m_multi_decoder.write(m_pktBuf.data() + m_pktPos, n);
m_pktPos = m_pktSize = 0;
int n = m_pktBuf.available();
size_t written = m_multi_decoder.write(m_pktBuf.data(), n);
LOGI("copy: %d -> %d", (int)n, (int)written);
m_pktBuf.clearArray(written);
return written;
}

Expand Down Expand Up @@ -445,7 +448,8 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
protected:
// Connection
TcpClient m_tcp;
UdpSocket* m_udp = nullptr;
UdpSocket m_udp;
bool m_udp_active = false;
IPAddress m_addr{};
uint16_t m_port = 0;

Expand All @@ -464,9 +468,8 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
const uint32_t m_keepaliveIntervalMs = 25000; // 25s

// Buffers
audio_tools::Vector<uint8_t> m_pktBuf{0};
size_t m_pktPos = 0;
size_t m_pktSize = 0;
SingleBuffer<uint8_t> m_pktBuf{0};
SingleBuffer<uint8_t> m_tcpCmd{0};
char m_hdrBuf[1024];
char m_bodyBuf[1024];

Expand Down Expand Up @@ -497,9 +500,9 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
m_clientRtpPort = 0;
m_cseq = 1;
m_pktBuf.resize(2048);
m_pktPos = m_pktSize = 0;
m_pktBuf.clear();
m_decoderReady = false;
// keep default decoders registered once per instance
m_udp_active = false;
}

void buildUrls(const char* path) {
Expand All @@ -523,19 +526,15 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
snprintf(m_trackUrl, sizeof(m_trackUrl), "%strackID=0", m_baseUrl);
}

// Always routed: MultiDecoder -> Resampler -> User sink

bool openUdpPorts() {
// Try a few even RTP ports starting at 5004
for (uint16_t p = 5004; p < 65000; p += 2) {
UdpSocket* s = new UdpSocket();
if (s->begin(p)) {
if (m_udp.begin(p)) {
LOGI("RTSPClient: bound UDP RTP port %u", (unsigned)p);
m_udp = s;
m_clientRtpPort = p;
m_udp_active = true;
return true;
}
delete s;
}
return false;
}
Expand All @@ -562,54 +561,78 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
}
}

// Compute the RTP payload offset inside a UDP packet
// Considers fixed RTP header (12 bytes), CSRC count, and configured extra
// offset
size_t computeRtpPayloadOffset(const uint8_t* data, size_t length) {
if (length <= 12) return length;
size_t offset = 12;
uint8_t cc = data[0] & 0x0F; // CSRC count
offset += cc * 4;
// Apply any configured additional payload offset (e.g., RFC2250)
offset += m_payloadOffset;
return offset;
}

void serviceUdp() {
// Keep RTSP session alive
maybeKeepalive();

if (!m_udp) return;
if (m_pktPos < m_pktSize) return; // still have data buffered
if (!m_udp_active) {
LOGE("no UDP");
return;
}
if (m_pktBuf.available() > 0) {
LOGI("Still have unprocessed data");
return; // still have data buffered
}

int packetSize = m_udp->parsePacket();
if (packetSize <= 0) return;
// parse next UDP packet
int packetSize = m_udp.parsePacket();
if (packetSize <= 0) {
LOGW("packet size: %d", packetSize);
return;
}

// Fill buffer
if ((size_t)packetSize > m_pktBuf.size()) m_pktBuf.resize(packetSize);
int n = m_udp->read(m_pktBuf.data(), packetSize);
if (n <= 12) return; // too small to contain RTP

// Very basic RTP parsing: skip 12-byte header
size_t payloadOffset = 12;
uint8_t cc = m_pktBuf[0] & 0x0F;
uint8_t payloadType = m_pktBuf[1] & 0x7F;
payloadOffset += cc * 4; // skip CSRCs if present
// Apply any configured additional payload offset (e.g., RFC2250)
payloadOffset += m_payloadOffset;
if (payloadOffset >= (size_t)n) return;
int n = m_udp.read(m_pktBuf.data(), packetSize);
m_pktBuf.setAvailable(n);
if (n <= 12) {
LOGE("packet too small: %d", n);
return; // too small to contain RTP
}

// Very basic RTP parsing: compute payload offset
uint8_t* data = m_pktBuf.data();
size_t payloadOffset = computeRtpPayloadOffset(data, (size_t)n);
if (payloadOffset >= (size_t)n) {
LOGW("no payload: %d", n);
}

m_pktPos = 0;
m_pktSize = n - payloadOffset;
// move payload to beginning for contiguous read
memmove(m_pktBuf.data(), m_pktBuf.data() + payloadOffset, m_pktSize);
m_pktBuf.clearArray(payloadOffset);
}

void primeUdpPath() {
if (!m_udp) return;
if (!m_udp_active) return;
if (m_serverRtpPort == 0) return;
// Send a tiny datagram to server RTP port to open NAT/flows
// Not required by RTSP, but improves interoperability
for (int i = 0; i < 2; ++i) {
m_udp->beginPacket(m_addr, m_serverRtpPort);
m_udp.beginPacket(m_addr, m_serverRtpPort);
uint8_t b = 0x00;
m_udp->write(&b, 1);
m_udp->endPacket();
m_udp.write(&b, 1);
m_udp.endPacket();
delay(2);
}
}

bool sniffUdpFor(uint32_t ms) {
if (!m_udp) return false;
if (!m_udp_active) return false;
uint32_t start = millis();
while ((millis() - start) < ms) {
int packetSize = m_udp->parsePacket();
int packetSize = m_udp.parsePacket();
if (packetSize > 0) {
// restore to be processed by normal path
return true;
Expand All @@ -619,6 +642,19 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
return false;
}

// Centralized TCP write helper
size_t tcpWrite(const uint8_t* data, size_t len) {
if (m_tcpCmd.size() < 400) m_tcpCmd.resize(400);
return m_tcpCmd.writeArray(data, len);
}

bool tcpCommit() {
bool rc = m_tcp.write(m_tcpCmd.data(), m_tcpCmd.available()) ==
m_tcpCmd.available();
m_tcpCmd.clear();
return rc;
}

bool sendSimpleRequest(const char* method, const char* url,
const char* extraHeaders, size_t extraLen,
char* outHeaders, size_t outHeadersLen, char* outBody,
Expand All @@ -632,18 +668,25 @@ class RTSPClient : public AudioInfoSource, public AudioInfoSupport {
if (reqLen <= 0) return false;

// Send start line + mandatory headers
if (m_tcp.write((const uint8_t*)reqStart, reqLen) != (size_t)reqLen)
if (tcpWrite((const uint8_t*)reqStart, reqLen) != (size_t)reqLen) {
return false;
}
// Optional extra headers
if (extraHeaders && extraLen) {
if (m_tcp.write((const uint8_t*)extraHeaders, extraLen) != extraLen)
if (tcpWrite((const uint8_t*)extraHeaders, extraLen) != extraLen) {
return false;
}
}
// End of headers
const char* end = "\r\n";
if (m_tcp.write((const uint8_t*)end, 2) != 2) return false;
if (tcpWrite((const uint8_t*)end, 2) != 2) {
return false;
}

m_tcp.flush();
if (!tcpCommit()) {
LOGE("TCP write failed");
return false;
}

// Read response headers until CRLFCRLF
int hdrUsed = 0;
Expand Down