Skip to content

Commit 0df3213

Browse files
authored
Various small v0.2 improvements (paritytech#367)
* Make telemetry less susceptible to flakey wifi * Update readme * Staging shouldn't autoconnect to telemetry * Don't try to output more than 1KB of hex to Display * Better logging of transactions * Grumbles * off-by-one
1 parent 08f9568 commit 0df3213

File tree

5 files changed

+68
-21
lines changed

5 files changed

+68
-21
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ one. First, get Rust (1.26.1 or later) and the support software if you don't alr
99

1010
```
1111
curl https://sh.rustup.rs -sSf | sh
12-
sudo apt install make clang
12+
sudo apt install make clang pkg-config libssl-dev
1313
```
1414

1515
Then, install Polkadot PoC-2:

polkadot/cli/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ fn load_spec(matches: &clap::ArgMatches) -> Result<(service::ChainSpec, bool), S
115115
.map(ChainSpec::from)
116116
.unwrap_or_else(|| if matches.is_present("dev") { ChainSpec::Development } else { ChainSpec::KrummeLanke });
117117
let is_global = match chain_spec {
118-
ChainSpec::KrummeLanke | ChainSpec::StagingTestnet => true,
118+
ChainSpec::KrummeLanke => true,
119119
_ => false,
120120
};
121121
let spec = chain_spec.load()?;

polkadot/transaction-pool/src/lib.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ extern crate ed25519;
1818
extern crate substrate_client as client;
1919
extern crate substrate_codec as codec;
2020
extern crate substrate_extrinsic_pool as extrinsic_pool;
21-
extern crate substrate_primitives as substrate_primitives;
21+
extern crate substrate_primitives;
2222
extern crate substrate_runtime_primitives;
2323
extern crate polkadot_runtime as runtime;
2424
extern crate polkadot_primitives as primitives;
@@ -279,13 +279,16 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
279279
type Error = Error;
280280

281281
fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result<Self::VerifiedTransaction> {
282-
info!("Extrinsic Submitted: {:?}", uxt);
283282

284283
if !uxt.is_signed() {
285284
bail!(ErrorKind::IsInherent(uxt))
286285
}
287286

288-
let (encoded_size, hash) = uxt.using_encoded(|e| (e.len(), BlakeTwo256::hash(e)));
287+
let encoded = uxt.encode();
288+
let (encoded_size, hash) = (encoded.len(), BlakeTwo256::hash(&encoded));
289+
290+
debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded));
291+
289292
let inner = match uxt.clone().check_with(|a| self.lookup(a)) {
290293
Ok(xt) => Some(xt),
291294
// keep the transaction around in the future pool and attempt to promote it later.
@@ -294,6 +297,12 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
294297
};
295298
let sender = inner.as_ref().map(|x| x.signed.clone());
296299

300+
if encoded_size < 1024 {
301+
info!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt);
302+
} else {
303+
info!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size);
304+
}
305+
297306
Ok(VerifiedTransaction {
298307
original: uxt,
299308
inner,

substrate/primitives/src/hexdisplay.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,18 @@ impl<'a> HexDisplay<'a> {
2626

2727
impl<'a> ::core::fmt::Display for HexDisplay<'a> {
2828
fn fmt(&self, fmtr: &mut ::core::fmt::Formatter) -> Result<(), ::core::fmt::Error> {
29-
for byte in self.0 {
30-
try!( fmtr.write_fmt(format_args!("{:02x}", byte)));
29+
if self.0.len() < 1027 {
30+
for byte in self.0 {
31+
fmtr.write_fmt(format_args!("{:02x}", byte))?;
32+
}
33+
} else {
34+
for byte in &self.0[0..512] {
35+
fmtr.write_fmt(format_args!("{:02x}", byte))?;
36+
}
37+
fmtr.write_str("...")?;
38+
for byte in &self.0[self.0.len() - 512..] {
39+
fmtr.write_fmt(format_args!("{:02x}", byte))?;
40+
}
3141
}
3242
Ok(())
3343
}

substrate/telemetry/src/lib.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ extern crate log;
3131
extern crate slog;
3232
extern crate slog_scope;
3333

34-
use std::io;
34+
use std::{io, time};
3535
use parking_lot::Mutex;
3636
use slog::Drain;
3737
pub use slog_scope::with_logger;
@@ -49,16 +49,15 @@ const CHANNEL_SIZE: usize = 262144;
4949

5050
/// Initialise telemetry.
5151
pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard {
52+
let client = ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok());
5253
let log = slog::Logger::root(
5354
slog_async::Async::new(
5455
slog_json::Json::default(
5556
TelemetryWriter {
5657
buffer: vec![],
57-
out: Mutex::new(
58-
ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok())
59-
),
58+
out: Mutex::new(client),
6059
config,
61-
first_time: true, // ensures that on_connect will be called.
60+
last_time: None, // ensures that on_connect will be called.
6261
}
6362
).fuse()
6463
).chan_size(CHANNEL_SIZE)
@@ -78,20 +77,47 @@ struct TelemetryWriter {
7877
buffer: Vec<u8>,
7978
out: Mutex<Option<ws::sync::Client<Box<ws::stream::sync::NetworkStream + Send>>>>,
8079
config: TelemetryConfig,
81-
first_time: bool,
80+
last_time: Option<time::Instant>,
8281
}
8382

83+
/// Every two minutes we reconnect to the telemetry server otherwise we don't get notified
84+
/// of a flakey connection that has been dropped and needs to be reconnected. We can remove
85+
/// this once we introduce a keepalive ping/pong.
86+
const RECONNECT_PERIOD: u64 = 120;
87+
8488
impl TelemetryWriter {
8589
fn ensure_connected(&mut self) {
86-
if self.first_time {
87-
info!("Connected to telemetry server: {}", self.config.url);
88-
(self.config.on_connect)();
89-
self.first_time = false;
90-
}
9190
let mut client = self.out.lock();
92-
if client.is_none() {
91+
92+
let controlled_disconnect = if let Some(t) = self.last_time {
93+
if t.elapsed().as_secs() > RECONNECT_PERIOD && client.is_some() {
94+
trace!(target: "telemetry", "Performing controlled drop of the telemetry connection.");
95+
let _ = client.as_mut().and_then(|socket|
96+
socket.send_message(&ws::Message::text("{\"msg\":\"system.reconnect\"}")).ok()
97+
);
98+
*client = None;
99+
true
100+
} else {
101+
false
102+
}
103+
} else {
104+
false
105+
};
106+
107+
let just_connected = if client.is_none() {
108+
if !controlled_disconnect {
109+
info!(target: "telemetry", "Connection dropped unexpectedly. Reconnecting to telemetry server...");
110+
}
93111
*client = ws::ClientBuilder::new(&self.config.url).ok().and_then(|mut x| x.connect(None).ok());
94-
drop(client);
112+
client.is_some()
113+
} else {
114+
self.last_time.is_none()
115+
};
116+
117+
drop(client);
118+
if just_connected && !controlled_disconnect {
119+
self.last_time = Some(time::Instant::now());
120+
info!("Reconnected to telemetry server: {}", self.config.url);
95121
(self.config.on_connect)();
96122
}
97123
}
@@ -113,7 +139,9 @@ impl io::Write for TelemetryWriter {
113139
let mut l = self.out.lock();
114140
let socket_closed = if let Some(ref mut socket) = *l {
115141
if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) {
116-
socket.send_message(&ws::Message::text(s)).is_err()
142+
let r = socket.send_message(&ws::Message::text(s));
143+
trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r);
144+
r.is_err()
117145
} else { false }
118146
} else { false };
119147
if socket_closed {

0 commit comments

Comments
 (0)