Skip to content

Commit d57418b

Browse files
committed
shim: add ConnectionType
1 parent 7d360f1 commit d57418b

File tree

1 file changed

+62
-0
lines changed
  • sugondat/shim/src/sugondat_rpc

1 file changed

+62
-0
lines changed

sugondat/shim/src/sugondat_rpc/conn.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,68 @@ use subxt::backend::rpc::RpcClient;
99
use sugondat_subxt::sugondat::is_codegen_valid_for;
1010
use tokio::sync::{oneshot, Mutex};
1111

12+
// Keeping the connection and connector inside an Arc is useful because
13+
// the same connection will be used for all the docks, and thus
14+
// it's shared among all of them
15+
#[derive(Clone)]
16+
pub enum ConnectionType {
17+
Persistent(Arc<Connector>),
18+
Single(Arc<Conn>),
19+
}
20+
21+
impl ConnectionType {
22+
pub async fn new(rpc_url: String, no_retry: bool) -> anyhow::Result<Self> {
23+
tracing::info!("connecting to sugondat node: {}", rpc_url);
24+
25+
if no_retry {
26+
// TODO: conn_id is required here, but with the no_retry flag,
27+
// only one connection attempt will be made
28+
let conn = Conn::connect(0, &rpc_url).await.map_err(|e| {
29+
tracing::error!("failed to connect to sugondat node: {}\n", e);
30+
e
31+
})?;
32+
return Ok(ConnectionType::Single(conn));
33+
}
34+
35+
let rpc_url = Arc::new(rpc_url);
36+
let connector = Arc::new(Connector::new(rpc_url));
37+
connector.ensure_connected().await;
38+
39+
Ok(ConnectionType::Persistent(connector))
40+
}
41+
42+
// Execute the given closure, if the connection type
43+
// is Persistent then the connection will be reset and
44+
// the closure re executed
45+
pub async fn run<T, Fut: futures::future::Future<Output = anyhow::Result<T>>>(
46+
&self,
47+
action: impl Fn(Arc<Conn>) -> Fut,
48+
) -> T {
49+
match self {
50+
ConnectionType::Persistent(connector) => loop {
51+
let conn = connector.ensure_connected().await;
52+
match action(conn).await {
53+
Ok(res) => break res,
54+
Err(e) => {
55+
tracing::error!("{}\n", e);
56+
// Reset the connection and retry
57+
connector.reset().await;
58+
}
59+
};
60+
},
61+
ConnectionType::Single(conn) => {
62+
action(conn.clone()).await.unwrap_or_else(|e| {
63+
tracing::error!("{}\n", e);
64+
// TODO: Is it ok to panic here?
65+
// I don't see a more elegant solution without
66+
// changing a lot of function signatures to support errors
67+
panic!("connection to sugondat node interruptedh\n")
68+
})
69+
}
70+
}
71+
}
72+
}
73+
1274
// Contains the RPC client structures that are assumed to be connected.
1375
pub struct Conn {
1476
/// Connection id. For diagnostics purposes only.

0 commit comments

Comments
 (0)