Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-agent"
version = "0.6.1"
version = "0.7.0"
edition = "2024"
description = "Pyth Lazer Agent"
license = "Apache-2.0"
Expand Down Expand Up @@ -32,6 +32,7 @@ serde_json = "1.0.140"
soketto = { version = "0.8.1", features = ["http"] }
solana-keypair = "2.2.1"
tokio = { version = "1.44.1", features = ["full"] }
tokio-native-tls = "0.3.1"
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
tokio-util = { version = "0.7.14", features = ["compat"] }
tracing = "0.1.41"
Expand Down
4 changes: 4 additions & 0 deletions apps/pyth-lazer-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ authorization_token = "your_token"
listen_address = "0.0.0.0:8910"
publish_interval_duration = "25ms"
enable_update_deduplication = false
# Optional proxy configuration
# proxy_url = "http://proxy.example.com:8080"
# proxy_url = "http://username:[email protected]:8080" # With authentication
```

- `relayers_urls`: The Lazer team will provide these.
Expand All @@ -58,3 +61,4 @@ enable_update_deduplication = false
- `listen_address`: The local port the agent will be listening on; can be anything you want.
- `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here.
- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer.
- `proxy_url` (optional): HTTP/HTTPS proxy URL for WebSocket connections. Supports Basic authentication via URL credentials (e.g., `http://user:pass@proxy:port`).
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Config {
pub enable_update_deduplication: bool,
#[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")]
pub update_deduplication_ttl: Duration,
pub proxy_url: Option<Url>,
}

#[derive(Deserialize, Derivative, Clone, PartialEq)]
Expand Down
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ pub mod tests {
history_service_url: None,
enable_update_deduplication: false,
update_deduplication_ttl: Default::default(),
proxy_url: None,
};

println!("{:?}", get_metadata(config).await.unwrap());
Expand Down
2 changes: 2 additions & 0 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl LazerPublisher {
token: authorization_token.clone(),
receiver: relayer_sender.subscribe(),
is_ready: is_ready.clone(),
proxy_url: config.proxy_url.clone(),
};
tokio::spawn(async move { task.run().await });
}
Expand Down Expand Up @@ -301,6 +302,7 @@ mod tests {
history_service_url: None,
enable_update_deduplication: false,
update_deduplication_ttl: Default::default(),
proxy_url: None,
};

let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
Expand Down
141 changes: 129 additions & 12 deletions apps/pyth-lazer-agent/src/relayer_session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{Result, bail};
use anyhow::{Context, Result, bail};
use backoff::ExponentialBackoffBuilder;
use backoff::backoff::Backoff;
use base64::Engine;
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use http::HeaderValue;
Expand All @@ -9,32 +10,150 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async_with_config,
MaybeTlsStream, WebSocketStream, client_async, connect_async_with_config,
tungstenite::Message as TungsteniteMessage,
};
use url::Url;

type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;

async fn connect_to_relayer(url: Url, token: &str) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
tracing::info!("connecting to the relayer at {}", url);
let mut req = url.clone().into_client_request()?;
async fn connect_through_proxy(
proxy_url: &Url,
target_url: &Url,
token: &str,
) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
tracing::info!(
"connecting to the relayer at {} via proxy {}",
target_url,
proxy_url
);

let proxy_host = proxy_url.host_str().context("Proxy URL must have a host")?;
let proxy_port = proxy_url
.port()
.unwrap_or(if proxy_url.scheme() == "https" {
443
} else {
80
});

let proxy_addr = format!("{proxy_host}:{proxy_port}");
let mut stream = TcpStream::connect(&proxy_addr)
.await
.context(format!("Failed to connect to proxy at {proxy_addr}"))?;

let target_host = target_url
.host_str()
.context("Target URL must have a host")?;
let target_port = target_url
.port()
.unwrap_or(if target_url.scheme() == "wss" {
443
} else {
80
});

let mut connect_request = format!(
"CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n"
);

let username = proxy_url.username();
if !username.is_empty() {
let password = proxy_url.password().unwrap_or("");
let credentials = format!("{username}:{password}");
let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes());
connect_request = format!("{connect_request}Proxy-Authorization: Basic {encoded}\r\n");
}

connect_request = format!("{connect_request}\r\n");

stream
.write_all(connect_request.as_bytes())
.await
.context(format!(
"Failed to send CONNECT request to proxy at {proxy_url}"
))?;

let mut response = vec![0u8; 1024];
let n = stream.read(&mut response).await.context(format!(
"Failed to read CONNECT response from proxy at {proxy_url}"
))?;

let response_str =
String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?);

if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") {
bail!(
"Proxy CONNECT failed: {}",
response_str.lines().next().unwrap_or("Unknown error")
);
}

tracing::info!("Successfully connected through proxy at {}", proxy_url);

let mut req = target_url.clone().into_client_request()?;
let headers = req.headers_mut();
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
tracing::info!("connected to the relayer at {}", url);

let maybe_tls_stream = if target_url.scheme() == "wss" {
let tls_connector = tokio_native_tls::native_tls::TlsConnector::builder()
.build()
.context("Failed to build TLS connector")?;
let tokio_connector = tokio_native_tls::TlsConnector::from(tls_connector);
let domain = target_host;
let tls_stream = tokio_connector
.connect(domain, stream)
.await
.context("Failed to establish TLS connection")?;

MaybeTlsStream::NativeTls(tls_stream)
} else {
MaybeTlsStream::Plain(stream)
};

let (ws_stream, _) = client_async(req, maybe_tls_stream)
.await
.context("Failed to complete WebSocket handshake")?;

tracing::info!(
"WebSocket connection established to relayer at {} via proxy {}",
target_url,
proxy_url
);
Ok(ws_stream.split())
}

async fn connect_to_relayer(
url: Url,
token: &str,
proxy_url: Option<&Url>,
) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
if let Some(proxy) = proxy_url {
connect_through_proxy(proxy, &url, token).await
} else {
tracing::info!("connecting to the relayer at {}", url);
let mut req = url.clone().into_client_request()?;
let headers = req.headers_mut();
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
tracing::info!("connected to the relayer at {}", url);
Ok(ws_stream.split())
}
}

struct RelayerWsSession {
ws_sender: RelayerWsSender,
}
Expand All @@ -58,11 +177,11 @@ impl RelayerWsSession {
}

pub struct RelayerSessionTask {
// connection state
pub url: Url,
pub token: String,
pub receiver: broadcast::Receiver<SignedLazerTransaction>,
pub is_ready: Arc<AtomicBool>,
pub proxy_url: Option<Url>,
}

impl RelayerSessionTask {
Expand Down Expand Up @@ -108,10 +227,8 @@ impl RelayerSessionTask {
}

pub async fn run_relayer_connection(&mut self) -> Result<()> {
// Establish relayer connection
// Relayer will drop the connection if no data received in 5s
let (relayer_ws_sender, mut relayer_ws_receiver) =
connect_to_relayer(self.url.clone(), &self.token).await?;
connect_to_relayer(self.url.clone(), &self.token, self.proxy_url.as_ref()).await?;
let mut relayer_ws_session = RelayerWsSession {
ws_sender: relayer_ws_sender,
};
Expand Down Expand Up @@ -236,11 +353,11 @@ mod tests {
let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);

let mut relayer_session_task = RelayerSessionTask {
// connection state
url: Url::parse("ws://127.0.0.1:12346").unwrap(),
token: "token1".to_string(),
receiver: relayer_receiver,
is_ready: Arc::new(AtomicBool::new(false)),
proxy_url: None,
};
tokio::spawn(async move { relayer_session_task.run().await });
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Expand Down