Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 100 additions & 51 deletions hyperdrive/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,12 @@ async fn handle_network_error(wrapped_error: WrappedSendError, state: &ModuleSta
}

// forward error to response channel if it exists
if let Some(chan) = state.response_channels.get(&wrapped_error.id) {
let chan_to_send = state
.response_channels
.get(&wrapped_error.id)
.map(|chan| chan.clone());

if let Some(chan) = chan_to_send {
// don't close channel here, as channel holder will wish to try other providers.
verbose_print(
&state.print_tx,
Expand All @@ -494,7 +499,9 @@ async fn handle_message(
match &km.message {
Message::Response(_) => {
// map response to the correct channel
if let Some(chan) = state.response_channels.get(&km.id) {
let chan_to_send = state.response_channels.get(&km.id).map(|chan| chan.clone());

if let Some(chan) = chan_to_send {
// can't close channel here, as response may be an error
// and fulfill_request may wish to try other providers.
let _ = chan.send(Ok(km)).await;
Expand Down Expand Up @@ -544,30 +551,69 @@ async fn handle_message(
Ok(EthSub { id, .. }) => id,
Err(EthSubError { id, .. }) => id,
};
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.get(&sub_id) {
if let ActiveSub::Remote {
provider_node,
sender,
..
} = sub
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
// Extract the subscription and check if we need to close it
let (sender_to_use, needs_close) = {
if let Some(sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.get(&sub_id) {
if let ActiveSub::Remote {
provider_node,
sender,
..
} = sub
{
if provider_node == &km.source.node {
// Clone sender to avoid holding guard across await
(Some(sender.clone()), false)
} else {
// Provider node doesn't match
(None, true)
}
} else {
// Not a remote subscription
(None, false)
}
// failed to send subscription update to process,
// unsubscribe from provider and close
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but provider node did not match or local sub was already closed",
)
.await;
} else {
// Subscription not found in map
(None, false)
}
} else {
// No subscription map for this rsvp
(None, false)
}
}; // Drop the guard here before awaiting

if let Some(sender) = sender_to_use {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
}
// failed to send subscription update to process,
// need to unsubscribe from provider and close
verbose_print(
&state.print_tx,
"eth: failed to send subscription update to process",
)
.await;
// Now remove and close the subscription
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.remove(&sub_id) {
drop(sub_map); // Release guard before awaiting
sub.close(sub_id, state).await;
return Ok(());
}
}
} else if needs_close {
// Provider node didn't match, close the subscription
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but provider node did not match",
)
.await;
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.remove(&sub_id) {
drop(sub_map); // Release guard before awaiting
sub.close(sub_id, state).await;
sub_map.remove(&sub_id);
return Ok(());
}
}
Expand Down Expand Up @@ -694,26 +740,32 @@ async fn handle_eth_action(
.await;
}
EthAction::UnsubscribeLogs(sub_id) => {
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
&format!(
"eth: got unsubscribe from {} but no subscription found",
km.source
),
)
.await;
error_message(
&state.our,
km.id,
km.source,
EthError::MalformedRequest,
&state.send_to_loop,
)
.await;
return Ok(());
};
if let Some(sub) = sub_map.remove(&sub_id) {
// Remove the subscription from the map first, releasing the guard
let sub = {
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
&format!(
"eth: got unsubscribe from {} but no subscription found",
km.source
),
)
.await;
error_message(
&state.our,
km.id,
km.source,
EthError::MalformedRequest,
&state.send_to_loop,
)
.await;
return Ok(());
};
sub_map.remove(&sub_id)
}; // Guard is released here

if let Some(sub) = sub {
// Now we can safely call close without holding the guard
sub.close(sub_id, state).await;
verbose_print(
&state.print_tx,
Expand Down Expand Up @@ -750,10 +802,9 @@ async fn handle_eth_action(
.await;
}
// if sub_map is now empty, remove the source from the active_subscriptions map
if sub_map.is_empty() {
drop(sub_map);
state.active_subscriptions.remove(&km.source);
}
state
.active_subscriptions
.retain(|_, sub_map| !sub_map.is_empty());
}
EthAction::Request { .. } => {
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
Expand Down Expand Up @@ -1170,8 +1221,7 @@ async fn fulfill_request(

// Spawn method retry task if this is a new failure
if should_spawn_retry && method != "eth_sendRawTransaction" {
use crate::eth::utils::spawn_method_retry_for_node_provider;
spawn_method_retry_for_node_provider(
crate::eth::utils::spawn_method_retry_for_node_provider(
our.to_string(),
providers.clone(),
chain_id.clone(),
Expand Down Expand Up @@ -1216,8 +1266,7 @@ async fn fulfill_request(

// Spawn health check task if needed
if spawn_health_check {
use crate::eth::utils::spawn_health_check_for_node_provider;
spawn_health_check_for_node_provider(
crate::eth::utils::spawn_health_check_for_node_provider(
our.to_string(),
providers.clone(),
chain_id.clone(),
Expand Down
Loading