Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Reconnects
  • Loading branch information
nicolad committed Nov 5, 2025
commit aad34a113b65c4a94f8b5971b46c2085b0045e2b
21 changes: 0 additions & 21 deletions crates/adapters/dydx/src/common/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,27 +336,6 @@ pub enum DydxCandleResolution {
#[cfg(test)]
mod tests {
use super::*;
use nautilus_model::enums::OrderSide;

#[test]
fn test_order_side_serialization() {
// Test that OrderSide serializes to SCREAMING_SNAKE_CASE as expected by dYdX API
let buy = OrderSide::Buy;
let sell = OrderSide::Sell;

assert_eq!(serde_json::to_string(&buy).unwrap(), r#""BUY""#);
assert_eq!(serde_json::to_string(&sell).unwrap(), r#""SELL""#);

// Test deserialization
assert_eq!(
serde_json::from_str::<OrderSide>(r#""BUY""#).unwrap(),
OrderSide::Buy
);
assert_eq!(
serde_json::from_str::<OrderSide>(r#""SELL""#).unwrap(),
OrderSide::Sell
);
}

#[test]
fn test_order_status_conversion() {
Expand Down
55 changes: 0 additions & 55 deletions crates/adapters/dydx/src/common/urls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,58 +64,3 @@ pub const fn grpc_urls(is_testnet: bool) -> &'static [&'static str] {
pub const fn grpc_url(is_testnet: bool) -> &'static str {
grpc_urls(is_testnet)[0]
}

////////////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////////////

#[cfg(test)]
mod tests {
use rstest::rstest;

use super::*;

#[rstest]
fn test_http_urls() {
assert_eq!(http_base_url(false), "https://indexer.dydx.trade");
assert_eq!(
http_base_url(true),
"https://indexer.v4testnet.dydx.exchange"
);
}

#[rstest]
fn test_ws_urls() {
assert_eq!(ws_url(false), "wss://indexer.dydx.trade/v4/ws");
assert_eq!(ws_url(true), "wss://indexer.v4testnet.dydx.exchange/v4/ws");
}

#[rstest]
fn test_grpc_urls() {
let mainnet_urls = grpc_urls(false);
assert_eq!(mainnet_urls.len(), 3);
assert_eq!(mainnet_urls[0], "https://dydx-grpc.publicnode.com:443");
assert_eq!(mainnet_urls[1], "https://dydx-ops-grpc.kingnodes.com:443");
assert_eq!(
mainnet_urls[2],
"https://dydx-mainnet-grpc.autostake.com:443"
);

let testnet_urls = grpc_urls(true);
assert_eq!(testnet_urls.len(), 2);
assert_eq!(
testnet_urls[0],
"https://dydx-testnet-grpc.publicnode.com:443"
);
assert_eq!(testnet_urls[1], "https://test-dydx-grpc.kingnodes.com:443");
}

#[rstest]
fn test_grpc_url() {
assert_eq!(grpc_url(false), "https://dydx-grpc.publicnode.com:443");
assert_eq!(
grpc_url(true),
"https://dydx-testnet-grpc.publicnode.com:443"
);
}
}
148 changes: 147 additions & 1 deletion crates/adapters/dydx/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct Height(pub u32);
/// - Transaction signing and broadcasting.
/// - Account query operations.
/// - Order placement and management via Cosmos SDK messages.
/// - Connection management and automatic failover to fallback nodes.
#[derive(Debug, Clone)]
pub struct DydxGrpcClient {
channel: Channel,
Expand All @@ -75,6 +76,7 @@ pub struct DydxGrpcClient {
clob: ClobClient<Channel>,
perpetuals: PerpetualsClient<Channel>,
subaccounts: SubaccountsClient<Channel>,
current_url: String,
}

impl DydxGrpcClient {
Expand All @@ -84,7 +86,7 @@ impl DydxGrpcClient {
///
/// Returns an error if the gRPC connection cannot be established.
pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
let channel = Channel::from_shared(grpc_url)
let channel = Channel::from_shared(grpc_url.clone())
.map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?
.connect()
.await
Expand All @@ -103,6 +105,7 @@ impl DydxGrpcClient {
perpetuals: PerpetualsClient::new(channel.clone()),
subaccounts: SubaccountsClient::new(channel.clone()),
channel,
current_url: grpc_url,
})
}

Expand Down Expand Up @@ -149,6 +152,89 @@ impl DydxGrpcClient {
}))
}

/// Reconnect to a different gRPC node from the fallback list.
///
/// Attempts to establish a new connection to each URL in the provided list
/// until successful. This is useful when the current node fails and you need
/// to failover to a different validator node.
///
/// # Errors
///
/// Returns an error if none of the provided URLs can establish a connection.
pub async fn reconnect_with_fallback(
&mut self,
grpc_urls: &[impl AsRef<str>],
) -> Result<(), DydxError> {
if grpc_urls.is_empty() {
return Err(DydxError::Config("No gRPC URLs provided".to_string()));
}

let mut last_error = None;

for (idx, url) in grpc_urls.iter().enumerate() {
let url_str = url.as_ref();

// Skip if it's the same URL we're currently connected to
if url_str == self.current_url {
tracing::debug!("Skipping current URL: {url_str}");
continue;
}

tracing::debug!(
"Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
idx + 1,
grpc_urls.len()
);

let channel = match Channel::from_shared(url_str.to_string())
.map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
{
Ok(ch) => ch,
Err(e) => {
last_error = Some(e);
continue;
}
};

match channel.connect().await {
Ok(connected_channel) => {
tracing::info!("Successfully reconnected to gRPC node: {url_str}");

// Update all service clients with the new channel
self.channel = connected_channel.clone();
self.auth = AuthClient::new(connected_channel.clone());
self.bank = BankClient::new(connected_channel.clone());
self.base = BaseClient::new(connected_channel.clone());
self.tx = TxClient::new(connected_channel.clone());
self.clob = ClobClient::new(connected_channel.clone());
self.perpetuals = PerpetualsClient::new(connected_channel.clone());
self.subaccounts = SubaccountsClient::new(connected_channel);
self.current_url = url_str.to_string();

return Ok(());
}
Err(e) => {
tracing::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
last_error = Some(DydxError::Grpc(tonic::Status::unavailable(format!(
"Connection failed: {e}"
))));
}
}
}

Err(last_error.unwrap_or_else(|| {
DydxError::Grpc(tonic::Status::unavailable(
"All gRPC reconnection attempts failed".to_string(),
))
}))
}

/// Get the currently connected gRPC node URL.
#[must_use]
pub fn current_url(&self) -> &str {
&self.current_url
}

/// Get the underlying gRPC channel.
///
/// This can be used to create custom gRPC service clients.
Expand Down Expand Up @@ -381,3 +467,63 @@ impl DydxGrpcClient {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_current_url_tracked() {
// Test that we can track the current URL
let url = "https://example.com:9090".to_string();
let client = DydxGrpcClient {
channel: Channel::from_static("https://example.com:9090"),
auth: AuthClient::new(Channel::from_static("https://example.com:9090")),
bank: BankClient::new(Channel::from_static("https://example.com:9090")),
base: BaseClient::new(Channel::from_static("https://example.com:9090")),
tx: TxClient::new(Channel::from_static("https://example.com:9090")),
clob: ClobClient::new(Channel::from_static("https://example.com:9090")),
perpetuals: PerpetualsClient::new(Channel::from_static("https://example.com:9090")),
subaccounts: SubaccountsClient::new(Channel::from_static("https://example.com:9090")),
current_url: url.clone(),
};

assert_eq!(client.current_url(), &url);
}

#[tokio::test]
async fn test_new_with_fallback_empty_urls() {
let urls: Vec<String> = vec![];
let result = DydxGrpcClient::new_with_fallback(&urls).await;

assert!(result.is_err());
match result.unwrap_err() {
DydxError::Config(msg) => assert_eq!(msg, "No gRPC URLs provided"),
_ => panic!("Expected Config error"),
}
}

#[tokio::test]
async fn test_reconnect_with_fallback_empty_urls() {
let mut client = DydxGrpcClient {
channel: Channel::from_static("https://example.com:9090"),
auth: AuthClient::new(Channel::from_static("https://example.com:9090")),
bank: BankClient::new(Channel::from_static("https://example.com:9090")),
base: BaseClient::new(Channel::from_static("https://example.com:9090")),
tx: TxClient::new(Channel::from_static("https://example.com:9090")),
clob: ClobClient::new(Channel::from_static("https://example.com:9090")),
perpetuals: PerpetualsClient::new(Channel::from_static("https://example.com:9090")),
subaccounts: SubaccountsClient::new(Channel::from_static("https://example.com:9090")),
current_url: "https://example.com:9090".to_string(),
};

let urls: Vec<String> = vec![];
let result = client.reconnect_with_fallback(&urls).await;

assert!(result.is_err());
match result.unwrap_err() {
DydxError::Config(msg) => assert_eq!(msg, "No gRPC URLs provided"),
_ => panic!("Expected Config error"),
}
}
}
Loading