diff --git a/Cargo.lock b/Cargo.lock index 3831fbcf46d..1a566d8e412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3396,6 +3396,17 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "md-5" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +dependencies = [ + "block-buffer", + "digest", + "opaque-debug", +] + [[package]] name = "memchr" version = "2.4.1" @@ -4021,6 +4032,24 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9fc3db1018c4b59d7d582a739436478b6035138b6aecbce989fc91c3e98409f" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "0.4.28" @@ -4152,6 +4181,35 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "postgres-protocol" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b145e6a4ed52cb316a27787fc20fe8a25221cb476479f61e4e0327c15b98d91a" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes", + "fallible-iterator", + "hmac 0.11.0", + "md-5", + "memchr", + "rand 0.8.4", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "ppv-lite86" version = "0.2.15" @@ -5217,6 +5275,12 @@ dependencies = [ "types", ] +[[package]] +name = "siphasher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" + [[package]] name = "slab" version = "0.4.5" @@ -5564,6 +5628,16 @@ dependencies = [ "types", ] +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.8.0" @@ -5925,6 +5999,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6c8b33df661b548dcd8f9bf87debb8c56c05657ed291122e1188698c2ece95" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite 0.2.7", + "postgres-protocol", + "postgres-types", + "socket2 0.4.2", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-rustls" version = "0.22.0" @@ -6645,6 +6742,29 @@ dependencies = [ "quote", ] +[[package]] +name = "watch" +version = "0.1.0" +dependencies = [ + "beacon_chain", + "clap", + "env_logger 0.9.0", + "eth2", + "http_api", + "log", + "network", + "rand 0.7.3", + "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-postgres", + "types", + "url", + "warp", + "warp_utils", +] + [[package]] name = "web-sys" version = "0.3.55" diff --git a/Cargo.toml b/Cargo.toml index a7789fa0fdd..bfeefd81154 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,8 @@ members = [ "validator_client", "validator_client/slashing_protection", + + "watch", ] [patch] diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 9afbf159720..d67e24a016c 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -27,12 +27,12 @@ slot_clock = { path = "../../common/slot_clock" } eth2_ssz = "0.4.0" bs58 = "0.4.0" futures = "0.3.8" +sensitive_url = { path = "../../common/sensitive_url" } [dev-dependencies] store = { path = "../store" } environment = { path = "../../lighthouse/environment" } tree_hash = "0.4.0" -sensitive_url = { path = "../../common/sensitive_url" } [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 64d9b9e841c..1ca7a5d1631 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -12,6 +12,7 @@ mod metrics; mod proposer_duties; mod state_id; mod sync_committees; +pub mod test_utils; mod validator_inclusion; mod version; diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/src/test_utils.rs similarity index 97% rename from beacon_node/http_api/tests/common.rs rename to beacon_node/http_api/src/test_utils.rs index dd2a40efa63..723133433bf 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -1,9 +1,9 @@ +use crate::{Config, Context}; use beacon_chain::{ test_utils::{BeaconChainHarness, EphemeralHarnessType}, BeaconChain, BeaconChainTypes, }; use eth2::{BeaconNodeHttpClient, Timeouts}; -use http_api::{Config, Context}; use lighthouse_network::{ discv5::enr::{CombinedKey, EnrBuilder}, libp2p::{core::connection::ConnectionId, swarm::NetworkBehaviour}, @@ -146,7 +146,7 @@ pub async fn create_api_server( // It's not really interesting why this triggered, just that it happened. let _ = shutdown_rx.await; }; - let (listening_socket, server) = http_api::serve(ctx, server_shutdown).unwrap(); + let (listening_socket, server) = crate::serve(ctx, server_shutdown).unwrap(); ApiServer { server, diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 6b4f79fa5d5..c5bdc36c69e 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -1,7 +1,7 @@ //! Tests for API behaviour across fork boundaries. -use crate::common::*; use beacon_chain::{test_utils::RelativeSyncCommittee, StateSkipConfig}; use eth2::types::{StateId, SyncSubcommittee}; +use http_api::test_utils::*; use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec, Slot}; type E = MinimalEthSpec; diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 64ce3b6566c..90af1070fc6 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -1,6 +1,6 @@ //! Generic tests that make use of the (newer) `InteractiveApiTester` -use crate::common::*; use eth2::types::DepositContractData; +use http_api::test_utils::*; use types::{EthSpec, MainnetEthSpec}; type E = MainnetEthSpec; diff --git a/beacon_node/http_api/tests/main.rs b/beacon_node/http_api/tests/main.rs index ca6a27530a6..161b8d29e46 100644 --- a/beacon_node/http_api/tests/main.rs +++ b/beacon_node/http_api/tests/main.rs @@ -1,7 +1,6 @@ #![cfg(not(debug_assertions))] // Tests are too slow in debug. #![recursion_limit = "256"] -pub mod common; pub mod fork_tests; pub mod interactive_tests; pub mod tests; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 95f08713014..b9e05fcea08 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1,4 +1,3 @@ -use crate::common::{create_api_server, ApiServer}; use beacon_chain::test_utils::RelativeSyncCommittee; use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, @@ -13,6 +12,7 @@ use eth2::{ }; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; +use http_api::test_utils::{create_api_server, ApiServer}; use lighthouse_network::{Enr, EnrExt, PeerId}; use network::NetworkMessage; use sensitive_url::SensitiveUrl; diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index bdad6728667..b31e6c99a88 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -22,7 +22,7 @@ use lighthouse_network::PeerId; pub use reqwest; use reqwest::{IntoUrl, RequestBuilder, Response}; pub use reqwest::{StatusCode, Url}; -use sensitive_url::SensitiveUrl; +pub use sensitive_url::{SensitiveError, SensitiveUrl}; use serde::{de::DeserializeOwned, Serialize}; use std::convert::TryFrom; use std::fmt; diff --git a/watch/Cargo.toml b/watch/Cargo.toml new file mode 100644 index 00000000000..8ef001ba19d --- /dev/null +++ b/watch/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "watch" +version = "0.1.0" +edition = "2018" + +[lib] +name = "watch" +path = "src/lib.rs" + +[[bin]] +name = "watch" +path = "src/main.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio-postgres = "0.7.5" +clap = "2.33.3" +log = "0.4.14" +env_logger = "0.9.0" +types = { path = "../consensus/types" } +eth2 = { path = "../common/eth2" } +tokio = { version = "1.14.0", features = ["time"] } +warp_utils = { path = "../common/warp_utils" } +warp = "0.3.2" +serde = "1.0.116" +serde_json = "1.0.58" +reqwest = { version = "0.11.0", features = ["json","stream"] } +url = "2.2.2" +rand = "0.7.3" + +[dev-dependencies] +http_api = { path = "../beacon_node/http_api" } +beacon_chain = { path = "../beacon_node/beacon_chain" } +network = { path = "../beacon_node/network" } diff --git a/watch/postgres_docker_compose/compose.yml b/watch/postgres_docker_compose/compose.yml new file mode 100644 index 00000000000..0cbddcaaa9b --- /dev/null +++ b/watch/postgres_docker_compose/compose.yml @@ -0,0 +1,30 @@ +version: "3" + +services: + postgres: + image: postgres:12.3-alpine + restart: always + environment: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + volumes: + - postgres:/var/lib/postgresql/data + ports: + - 5432:5432 + + pgadmin: + image: dpage/pgadmin4:4.23 + environment: + PGADMIN_DEFAULT_EMAIL: admin@pgadmin.com + PGADMIN_DEFAULT_PASSWORD: password + PGADMIN_LISTEN_PORT: 80 + ports: + - 15432:80 + volumes: + - pgadmin:/var/lib/pgadmin + depends_on: + - postgres + +volumes: + postgres: + pgadmin: diff --git a/watch/src/cli.rs b/watch/src/cli.rs new file mode 100644 index 00000000000..d8cb0961b8d --- /dev/null +++ b/watch/src/cli.rs @@ -0,0 +1,87 @@ +use crate::{ + database::{Config, Database}, + server, update_service, +}; +use clap::{App, Arg}; +use tokio::sync::oneshot; +use types::MainnetEthSpec; + +pub const SERVE: &'static str = "serve"; +pub const START_DAEMON: &'static str = "start-daemon"; +pub const INIT_DB: &'static str = "init-db"; +pub const CONFIG: &'static str = "config"; +pub const DEFAULT_CONFIG: &'static str = "default-config"; +pub const DROP: &'static str = "drop"; + +fn init_db<'a, 'b>() -> App<'a, 'b> { + App::new(INIT_DB) + .setting(clap::AppSettings::ColoredHelp) + .arg( + Arg::with_name(DROP) + .long(DROP) + .help("Drop the database before creating. DESTRUCTIVE ACTION."), + ) +} + +fn start_daemon<'a, 'b>() -> App<'a, 'b> { + App::new(START_DAEMON).setting(clap::AppSettings::ColoredHelp) +} + +fn serve<'a, 'b>() -> App<'a, 'b> { + App::new(SERVE).setting(clap::AppSettings::ColoredHelp) +} + +pub fn app<'a, 'b>() -> App<'a, 'b> { + App::new("beacon_watch_daemon") + .author("Sigma Prime ") + .setting(clap::AppSettings::ColoredHelp) + .arg( + Arg::with_name(CONFIG) + .long(CONFIG) + .value_name("PATH_TO_CONFIG") + .help("Path to configuration file") + .takes_value(true), + ) + .arg( + Arg::with_name(DEFAULT_CONFIG) + .long(DEFAULT_CONFIG) + .help("Load a default configuration. Used only for testing.") + .conflicts_with("config"), + ) + .subcommand(init_db()) + .subcommand(start_daemon()) + .subcommand(serve()) +} + +pub async fn run() -> Result<(), String> { + let matches = app().get_matches(); + + let mut config = if matches.is_present(DEFAULT_CONFIG) { + Config::default() + } else { + unimplemented!("parsing config from a file"); + }; + + match matches.subcommand() { + (INIT_DB, Some(submatches)) => { + if submatches.is_present(DROP) { + config.drop_dbname = true; + } + + Database::create(&config) + .await + .map_err(|e| format!("Failure: {:?}", e)) + .map(|_| ()) + } + (START_DAEMON, Some(_)) => update_service::run_once::(&config) + .await + .map_err(|e| format!("Failure: {:?}", e)), + (SERVE, Some(_)) => { + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + server::serve::(config, shutdown_rx) + .await + .map_err(|e| format!("Failure: {:?}", e)) + } + _ => Err("Unsupported subcommand. See --help".into()), + } +} diff --git a/watch/src/client.rs b/watch/src/client.rs new file mode 100644 index 00000000000..020a5bd1e0d --- /dev/null +++ b/watch/src/client.rs @@ -0,0 +1,78 @@ +use crate::database::WatchBeaconBlock; +use eth2::types::BlockId; +use reqwest::Client; +use serde::de::DeserializeOwned; +use types::Slot; +use url::Url; + +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + Url(url::ParseError), +} + +impl From for Error { + fn from(e: reqwest::Error) -> Self { + Error::Reqwest(e) + } +} + +impl From for Error { + fn from(e: url::ParseError) -> Self { + Error::Url(e) + } +} + +pub struct WatchHttpClient { + pub client: Client, + pub server: Url, +} + +impl WatchHttpClient { + async fn get_opt(&self, url: Url) -> Result, Error> { + let response = self.client.get(url).send().await?; + + if response.status() == 404 { + Ok(None) + } else { + response + .error_for_status()? + .json() + .await + .map_err(Into::into) + } + } + + pub async fn get_beacon_blocks( + &self, + block_id: BlockId, + ) -> Result, Error> { + let url = self + .server + .join("v1/")? + .join("beacon_blocks/")? + .join(&block_id.to_string())?; + + self.get_opt(url).await + } + + pub async fn get_lowest_canonical_slot(&self) -> Result, Error> { + let url = self + .server + .join("v1/")? + .join("canonical_slots/")? + .join("lowest")?; + + self.get_opt(url).await + } + + pub async fn get_highest_canonical_slot(&self) -> Result, Error> { + let url = self + .server + .join("v1/")? + .join("canonical_slots/")? + .join("highest")?; + + self.get_opt(url).await + } +} diff --git a/watch/src/database/config.rs b/watch/src/database/config.rs new file mode 100644 index 00000000000..77cd75d40ed --- /dev/null +++ b/watch/src/database/config.rs @@ -0,0 +1,34 @@ +use std::net::Ipv4Addr; + +#[derive(Debug, Clone)] +pub struct Config { + pub user: String, + pub password: String, + pub dbname: String, + pub default_dbname: String, + pub host: String, + pub port: u16, + pub connect_timeout_millis: u64, + pub drop_dbname: bool, + pub beacon_node_url: String, + pub server_listen_addr: Ipv4Addr, + pub server_listen_port: u16, +} + +impl Default for Config { + fn default() -> Self { + Self { + user: "postgres".to_string(), + password: "postgres".to_string(), + dbname: "dev".to_string(), + default_dbname: "postgres".to_string(), + host: "localhost".to_string(), + port: 5432, + connect_timeout_millis: 2_000, // 2s + drop_dbname: false, + beacon_node_url: "http://localhost:5052".to_string(), + server_listen_addr: Ipv4Addr::new(127, 0, 0, 1), + server_listen_port: 5059, + } + } +} diff --git a/watch/src/database/error.rs b/watch/src/database/error.rs new file mode 100644 index 00000000000..1286be26015 --- /dev/null +++ b/watch/src/database/error.rs @@ -0,0 +1,21 @@ +#[derive(Debug)] +pub enum Error { + Postgres(tokio_postgres::Error), + InvalidSlot, + InvalidRoot, + SensitiveUrl(eth2::SensitiveError), + BeaconNode(eth2::Error), + RemoteHeadUnknown, +} + +impl From for Error { + fn from(e: tokio_postgres::Error) -> Self { + Error::Postgres(e) + } +} + +impl From for Error { + fn from(e: eth2::Error) -> Self { + Error::BeaconNode(e) + } +} diff --git a/watch/src/database/mod.rs b/watch/src/database/mod.rs new file mode 100644 index 00000000000..123c9cd5925 --- /dev/null +++ b/watch/src/database/mod.rs @@ -0,0 +1,334 @@ +use log::{error, info}; +use serde::{Deserialize, Serialize}; +use std::convert::TryInto; +use std::time::Duration; +use tokio::{runtime, task::JoinHandle}; +use tokio_postgres::{config::Config as PostgresConfig, Client, NoTls, Row}; +use types::{BeaconBlockHeader, EthSpec, Hash256, Slot}; + +pub use config::Config; +pub use error::Error; +pub use tokio_postgres::Transaction; + +mod config; +mod error; + +#[derive(Debug, Serialize, Deserialize)] +pub struct WatchBeaconBlock { + pub slot: Slot, + pub root: Hash256, + pub parent_root: Hash256, +} + +pub struct Database { + client: Client, + _connection: JoinHandle<()>, +} + +impl Database { + pub async fn connect(config: &Config) -> Result { + let (client, connection) = Self::postgres_config(&config).connect(NoTls).await?; + let connection = runtime::Handle::current().spawn(async move { + if let Err(e) = connection.await { + error!("Connection error: {:?}", e); + } + }); + + Ok(Self { + client, + _connection: connection, + }) + } + + /// Open an existing database at the given `path`, or create one if none exists. + fn postgres_config(config: &Config) -> PostgresConfig { + let mut postgres_config = PostgresConfig::new(); + postgres_config + .user(&config.user) + .password(config.password.clone()) + .dbname(&config.dbname) + .host(&config.host) + .port(config.port) + .connect_timeout(Duration::from_millis(config.connect_timeout_millis)); + postgres_config + } + + /// Create a slashing database at the given path. + /// + /// Error if a database (or any file) already exists at `path`. + pub async fn create(config: &Config) -> Result { + Self::create_database(&config).await?; + + let db = Self::connect(&config).await?; + + db.client + .execute( + "CREATE TABLE validators ( + id integer PRIMARY KEY, + validator_index integer NOT NULL, + public_key char(98) NOT NULL + )", + &[], + ) + .await?; + + db.client + .execute( + "CREATE TABLE beacon_blocks ( + root char(66) PRIMARY KEY, + parent_root char(66) NOT NULL, + slot integer NOT NULL + )", + &[], + ) + .await?; + + db.client + .execute( + "CREATE TABLE canonical_slots ( + slot integer PRIMARY KEY, + root char(66), + beacon_block char(66) REFERENCES beacon_blocks(root) + )", + &[], + ) + .await?; + + Ok(db) + } + + /// Sets `config.dbname` to `config.default_dbname` and returns `(new_config, old_dbname)`. + /// + /// This is useful for creating or dropping databases, since these actions must be done by + /// logging into another database. + fn get_config_using_default_db(config: &Config) -> (Config, String) { + let mut config = config.clone(); + let new_dbname = std::mem::replace(&mut config.dbname, config.default_dbname.clone()); + (config, new_dbname) + } + + async fn create_database(config: &Config) -> Result<(), Error> { + let (default_config, new_dbname) = Self::get_config_using_default_db(config); + let db = Self::connect(&default_config).await?; + + if config.drop_dbname { + Self::drop_database(&config).await?; + } + + info!("Creating {} database", new_dbname); + + db.client + .execute(&format!("CREATE DATABASE {};", new_dbname), &[]) + .await?; + Ok(()) + } + + pub async fn drop_database(config: &Config) -> Result<(), Error> { + let (default_config, new_dbname) = Self::get_config_using_default_db(config); + let db = Self::connect(&default_config).await?; + + info!("Dropping {} database", new_dbname); + + db.client + .execute(&format!("DROP DATABASE IF EXISTS {};", new_dbname), &[]) + .await?; + + Ok(()) + } + + pub async fn transaction<'a>(&'a mut self) -> Result, Error> { + self.client.transaction().await.map_err(Into::into) + } + + pub async fn delete_canonical_roots_above<'a, T: EthSpec>( + tx: &'a Transaction<'a>, + slot: Slot, + ) -> Result<(), Error> { + tx.execute( + "DELETE FROM canonical_slots + WHERE slot > $1", + &[&encode_slot(slot)?], + ) + .await?; + + Ok(()) + } + + pub async fn insert_canonical_root<'a, T: EthSpec>( + tx: &'a Transaction<'a>, + slot: Slot, + root: Hash256, + ) -> Result<(), Error> { + let root = encode_hash256(root); + + tx.execute( + "INSERT INTO canonical_slots (slot, root) + VALUES ($1, $2)", + &[&encode_slot(slot)?, &root], + ) + .await + .map_err(Into::into) + .map(|_| ()) + } + + pub async fn get_root_at_canonical_slot<'a>( + tx: &'a Transaction<'a>, + slot: Slot, + ) -> Result, Error> { + let row_opt = tx + .query_opt( + "SELECT root + FROM canonical_slots + WHERE slot = $1;", + &[&encode_slot(slot)?], + ) + .await?; + + if let Some(row) = row_opt { + Ok(Some(row_to_root(&row, 0)?)) + } else { + Ok(None) + } + } + + pub async fn lowest_canonical_slot<'a>(tx: &'a Transaction<'a>) -> Result, Error> { + let row_opt = tx + .query_opt( + "SELECT MIN(slot) + FROM canonical_slots", + &[], + ) + .await?; + + if let Some(row) = row_opt { + if let Ok(slot) = row.try_get::<_, i32>("min") { + let slot: u64 = slot.try_into().map_err(|_| Error::InvalidSlot)?; + Ok(Some(slot.into())) + } else { + Ok(None) + } + } else { + Ok(None) + } + } + + pub async fn highest_canonical_slot<'a>( + tx: &'a Transaction<'a>, + ) -> Result, Error> { + let row_opt = tx + .query_opt( + "SELECT MAX(slot) + FROM canonical_slots", + &[], + ) + .await?; + + if let Some(row) = row_opt { + if let Ok(slot) = row.try_get::<_, i32>("max") { + let slot: u64 = slot.try_into().map_err(|_| Error::InvalidSlot)?; + Ok(Some(slot.into())) + } else { + Ok(None) + } + } else { + Ok(None) + } + } + + pub async fn unknown_canonical_blocks<'a>( + tx: &'a Transaction<'a>, + count: i64, + ) -> Result, Error> { + let rows = tx + .query( + "SELECT root + FROM canonical_slots + WHERE beacon_block IS NULL + ORDER BY slot DESC + LIMIT $1", + &[&count], + ) + .await?; + + rows.into_iter() + .map(|row| row_to_root(&row, 0)) + .collect::>() + } + + pub async fn insert_canonical_header_if_not_exists<'a>( + tx: &'a Transaction<'a>, + header: &BeaconBlockHeader, + header_root: Hash256, + ) -> Result<(), Error> { + let slot = encode_slot(header.slot)?; + let header_root = encode_hash256(header_root); + + tx.execute( + "INSERT INTO beacon_blocks (slot, root, parent_root) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING", + &[&slot, &header_root, &encode_hash256(header.parent_root)], + ) + .await?; + + tx.execute( + "UPDATE canonical_slots + SET beacon_block = $1 + WHERE slot = $2", + &[&header_root, &slot], + ) + .await?; + + Ok(()) + } + + pub async fn get_beacon_block<'a>( + tx: &'a Transaction<'a>, + root: Hash256, + ) -> Result, Error> { + let row = tx + .query_opt( + "SELECT slot, root, parent_root + FROM beacon_blocks + WHERE root = $1", + &[&encode_hash256(root)], + ) + .await?; + + let block_opt = if let Some(row) = row { + let block = WatchBeaconBlock { + slot: row_to_slot(&row, 0)?, + root: row_to_root(&row, 1)?, + parent_root: row_to_root(&row, 2)?, + }; + + Some(block) + } else { + None + }; + + Ok(block_opt) + } +} + +fn row_to_root(row: &Row, index: usize) -> Result { + row.try_get::<_, String>(index)? + .parse() + .map_err(|_| Error::InvalidRoot) +} + +fn row_to_slot(row: &Row, index: usize) -> Result { + row.try_get::<_, i32>(index) + .map_err(|_| Error::InvalidSlot)? + .try_into() + .map_err(|_| Error::InvalidSlot) + .map(|slot: u64| slot.into()) +} + +fn encode_hash256(h: Hash256) -> String { + format!("{:?}", h) +} + +fn encode_slot(s: Slot) -> Result { + s.as_u64().try_into().map_err(|_| Error::InvalidSlot) +} diff --git a/watch/src/lib.rs b/watch/src/lib.rs new file mode 100644 index 00000000000..e47fa331ee0 --- /dev/null +++ b/watch/src/lib.rs @@ -0,0 +1,5 @@ +pub mod cli; +pub mod client; +pub mod database; +pub mod server; +pub mod update_service; diff --git a/watch/src/main.rs b/watch/src/main.rs new file mode 100644 index 00000000000..6eb5c6f7ee5 --- /dev/null +++ b/watch/src/main.rs @@ -0,0 +1,23 @@ +use env_logger::Builder; +use log::error; +use std::process; + +mod cli; +mod database; +mod server; +mod update_service; + +#[tokio::main] +async fn main() { + Builder::from_default_env().init(); + + match cli::run().await { + Ok(()) => process::exit(0), + Err(e) => { + error!("Command failed: {}", e); + eprintln!("{}", e); + drop(e); + process::exit(1) + } + } +} diff --git a/watch/src/server/handler.rs b/watch/src/server/handler.rs new file mode 100644 index 00000000000..c0448ae56bf --- /dev/null +++ b/watch/src/server/handler.rs @@ -0,0 +1,46 @@ +use super::Error; +use crate::database::{Config, Database, WatchBeaconBlock}; +use eth2::types::BlockId; +use std::future::Future; +use types::Slot; + +pub async fn get_db(config: &Config) -> Result { + Database::connect(&config).await.map_err(Into::into) +} + +pub async fn with_db(config: &Config, func: F) -> Result +where + F: Fn(Database) -> R, + R: Future>, +{ + let db = get_db(config).await?; + func(db).await +} + +pub async fn get_block( + db: &mut Database, + block_id: BlockId, +) -> Result, Error> { + let tx = db.transaction().await?; + + let block_opt = match block_id { + BlockId::Root(root) => Database::get_beacon_block(&tx, root).await?, + _ => unimplemented!(), + }; + + Ok(block_opt) +} + +pub async fn get_lowest_slot(db: &mut Database) -> Result, Error> { + let tx = db.transaction().await?; + Database::lowest_canonical_slot(&tx) + .await + .map_err(Into::into) +} + +pub async fn get_highest_slot(db: &mut Database) -> Result, Error> { + let tx = db.transaction().await?; + Database::highest_canonical_slot(&tx) + .await + .map_err(Into::into) +} diff --git a/watch/src/server/mod.rs b/watch/src/server/mod.rs new file mode 100644 index 00000000000..07305cc57dd --- /dev/null +++ b/watch/src/server/mod.rs @@ -0,0 +1,182 @@ +use crate::database::{self, Config}; +use eth2::types::BlockId; +use log::{debug, info}; +use serde::Serialize; +use serde_json; +use std::future::Future; +use std::marker::PhantomData; +use std::net::{SocketAddr, SocketAddrV4}; +use std::sync::Arc; +use tokio::sync::oneshot; +use types::EthSpec; +use warp::{ + http::{self}, + reject, reply, Filter, +}; + +mod handler; + +#[derive(Debug)] +pub enum Error { + Warp(warp::Error), + Serde(serde_json::Error), + Database(database::Error), + Http(http::Error), + Other(String), +} + +impl warp::reject::Reject for Error {} + +impl From for Error { + fn from(e: warp::Error) -> Self { + Error::Warp(e) + } +} + +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Error::Serde(e) + } +} + +impl From for Error { + fn from(e: database::Error) -> Self { + Error::Database(e) + } +} + +impl From for Error { + fn from(e: http::Error) -> Self { + Error::Http(e) + } +} + +impl From for Error { + fn from(e: String) -> Self { + Error::Other(e) + } +} + +#[derive(Debug)] +struct MissingIdField; + +impl warp::reject::Reject for MissingIdField {} + +/// A wrapper around all the items required to spawn the HTTP server. +/// +/// The server will gracefully handle the case where any fields are `None`. +pub struct Context { + pub config: Config, + pub _phantom: PhantomData, +} + +pub async fn serve( + config: Config, + shutdown: oneshot::Receiver<()>, +) -> Result<(), Error> { + let ctx: Context = Context { + config, + _phantom: <_>::default(), + }; + + let (_socket_addr, server) = start_server(Arc::new(ctx), async { + let _ = shutdown.await; + })?; + + server.await; + + Ok(()) +} + +/// Creates a server that will serve requests using information from `ctx`. +/// +/// The server will shut down gracefully when the `shutdown` future resolves. +/// +/// ## Returns +/// +/// This function will bind the server to the provided address and then return a tuple of: +/// +/// - `SocketAddr`: the address that the HTTP server will listen on. +/// - `Future`: the actual server future that will need to be awaited. +/// +/// ## Errors +/// +/// Returns an error if the server is unable to bind or there is another error during +/// configuration. +pub fn start_server( + ctx: Arc>, + shutdown: impl Future + Send + Sync + 'static, +) -> Result<(SocketAddr, impl Future), Error> { + let config = &ctx.config; + + let inner_ctx = ctx.clone(); + let ctx_filter = warp::any().map(move || inner_ctx.clone()); + + let beacon_blocks = warp::path("v1") + .and(warp::path("beacon_blocks")) + .and(warp::path::param::()) + .and(ctx_filter.clone()) + .and_then(|block_id, ctx: Arc>| async move { + let response = handler::with_db(&ctx.config, |mut db| async move { + handler::get_block(&mut db, block_id).await + }) + .await; + respond_opt(response) + }); + + let lowest_slot = warp::path("v1") + .and(warp::path("canonical_slots")) + .and(warp::path("lowest")) + .and(ctx_filter.clone()) + .and_then(|ctx: Arc>| async move { + let response = handler::with_db(&ctx.config, |mut db| async move { + handler::get_lowest_slot(&mut db).await + }) + .await; + respond_opt(response) + }); + + let highest_slot = warp::path("v1") + .and(warp::path("canonical_slots")) + .and(warp::path("highest")) + .and(ctx_filter.clone()) + .and_then(|ctx: Arc>| async move { + let response = handler::with_db(&ctx.config, |mut db| async move { + handler::get_highest_slot(&mut db).await + }) + .await; + respond_opt(response) + }); + + let routes = warp::get() + .and(beacon_blocks.or(lowest_slot).or(highest_slot)) + // Add a `Server` header. + .map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-watch")); + + let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( + SocketAddrV4::new(config.server_listen_addr, config.server_listen_port), + async { + shutdown.await; + }, + )?; + + info!("HTTP server listening on {}", listening_socket); + + Ok((listening_socket, server)) +} + +fn respond_opt( + result: Result, Error>, +) -> Result { + match result { + Ok(Some(t)) => Ok(reply::json(&t)), + Ok(None) => Err(reject::not_found()), + Err(e) => { + #[cfg(test)] // If testing, print errors for troubleshooting. + dbg!(&e); + + debug!("Request returned error: {:?}", e); + Err(reject::custom(e)) + } + } +} diff --git a/watch/src/update_service.rs b/watch/src/update_service.rs new file mode 100644 index 00000000000..d1dc494478a --- /dev/null +++ b/watch/src/update_service.rs @@ -0,0 +1,202 @@ +use crate::database::{Config, Database, Error, Transaction}; +use eth2::{types::BlockId, BeaconNodeHttpClient, SensitiveUrl, Timeouts}; +use log::{debug, info}; +use std::time::Duration; +use types::{BeaconBlockHeader, EthSpec, Hash256, Slot}; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); +pub const BACKFILL_SLOT_COUNT: usize = 64; + +pub async fn run_once(config: &Config) -> Result<(), Error> { + let bn = get_beacon_client(config)?; + let mut db = get_db_connection(config).await?; + + // TODO(paul): lock the canonical slots table? + + perform_head_update::(&mut db, &bn).await?; + perform_backfill::(&mut db, &bn, BACKFILL_SLOT_COUNT).await?; + update_unknown_blocks(&mut db, &bn, BACKFILL_SLOT_COUNT as i64).await?; + + Ok(()) +} + +pub fn get_beacon_client(config: &Config) -> Result { + let beacon_node_url = + SensitiveUrl::parse(&config.beacon_node_url).map_err(Error::SensitiveUrl)?; + Ok(BeaconNodeHttpClient::new( + beacon_node_url, + Timeouts::set_all(DEFAULT_TIMEOUT), + )) +} + +pub async fn get_db_connection(config: &Config) -> Result { + Database::connect(&config).await +} + +pub async fn update_unknown_blocks<'a>( + db: &mut Database, + bn: &BeaconNodeHttpClient, + max_blocks: i64, +) -> Result<(), Error> { + let tx = db.transaction().await?; + + let roots = Database::unknown_canonical_blocks(&tx, max_blocks).await?; + for root in roots { + if let Some(header) = get_header(bn, BlockId::Root(root)).await? { + Database::insert_canonical_header_if_not_exists(&tx, &header, root).await?; + } + } + + tx.commit().await?; + + Ok(()) +} + +pub async fn perform_head_update<'a, T: EthSpec>( + db: &mut Database, + bn: &BeaconNodeHttpClient, +) -> Result<(), Error> { + // Load the head from the beacon node. + let head = get_header(&bn, BlockId::Head) + .await? + .ok_or(Error::RemoteHeadUnknown)?; + let head_root = head.canonical_root(); + + debug!("Starting head update with head slot {}", head.slot); + + let tx = db.transaction().await?; + + // Remove all canonical roots with a slot higher than the head. This removes prunes + // non-canonical blocks when there is a re-org to a block with a lower slot. + if let Some(root) = Database::get_root_at_canonical_slot(&tx, head.slot).await? { + if root != head.canonical_root() { + Database::delete_canonical_roots_above::(&tx, head.slot).await?; + } + } + + // Assume that the slot after the head will not be a skip slot. + let next_non_skipped_block = head.slot + 1; + // Don't backfill more than minimally required. + let backfill_block_count = 1; + + // Replace all conflicting ancestors. Perform partial backfill. + reverse_fill_canonical_slots::( + &tx, + &bn, + next_non_skipped_block, + head, + head_root, + backfill_block_count, + ) + .await?; + tx.commit().await?; + + Ok(()) +} + +pub async fn perform_backfill<'a, T: EthSpec>( + db: &mut Database, + bn: &BeaconNodeHttpClient, + max_backfill_slots: usize, +) -> Result<(), Error> { + let tx = db.transaction().await?; + + if let Some(lowest_slot) = Database::lowest_canonical_slot(&tx) + .await? + .filter(|lowest_slot| *lowest_slot != 0) + { + if let Some(header) = get_header(&bn, BlockId::Slot(lowest_slot - 1)).await? { + let header_root = header.canonical_root(); + reverse_fill_canonical_slots::( + &tx, + &bn, + lowest_slot, + header, + header_root, + max_backfill_slots, + ) + .await?; + } + } + + tx.commit().await?; + + Ok(()) +} + +/// Fills the `canonical_slots` table. +/// +/// It is asssumed that the `header` and `header_root` are at the head of the chain. The +/// `next_non_skipped_block` is used to ensure any skip slots between that value and `header.slot` +/// are filled with `header_root`. +/// +/// The `max_count` value determines how many slots should be filled between the lowest canonical +/// slot in the database and the 0 slot (i.e., genesis slot). +/// +/// ## Notes +/// +/// The `max_count` value is not respsected when there is a gap in the canonical slots. This means +/// that if there is some distance between the highest slot in the DB and the `header.slot`, then +/// this function will *always* ensure there is a contigious chain (although that chain may not +/// always go back to genesis). +pub async fn reverse_fill_canonical_slots<'a, T: EthSpec>( + tx: &'a Transaction<'a>, + bn: &BeaconNodeHttpClient, + mut next_non_skipped_block: Slot, + mut header: BeaconBlockHeader, + mut header_root: Hash256, + max_count: usize, +) -> Result<(), Error> { + let mut count = 0; + + loop { + if let Some(known_root) = Database::get_root_at_canonical_slot(&tx, header.slot).await? { + if known_root == header_root { + info!("Reverse fill completed at canonical slot {}", header.slot); + break; + } + // If the lowest slot in the database is the slot of the header, then start to enforce the + // max_count rule. + // + // *Not* applying the max_count rule until we're at the lowest slot ensures that any gaps + // between slots are filled. + } else if Database::lowest_canonical_slot(&tx) + .await? + .map_or(false, |slot| slot >= header.slot) + { + if count >= max_count { + info!( + "Reverse fill stopped at canonical slot {} with {} slots updated", + header.slot, count + ); + break; + } + } + + for slot in header.slot.as_u64()..next_non_skipped_block.as_u64() { + Database::insert_canonical_root::(&tx, slot.into(), header_root).await?; + count += 1; + } + + next_non_skipped_block = header.slot; + header = if let Some(header) = get_header(bn, BlockId::Root(header.parent_root)).await? { + header_root = header.canonical_root(); + header + } else { + info!("Reverse fill exhausted at canonical slot {}", header.slot); + break; + }; + } + + Ok(()) +} + +pub async fn get_header( + bn: &BeaconNodeHttpClient, + block_id: BlockId, +) -> Result, Error> { + Ok(bn + .get_beacon_headers_block_id(block_id) + .await? + .map(|resp| resp.data.header.message)) +} diff --git a/watch/tests/tests.rs b/watch/tests/tests.rs new file mode 100644 index 00000000000..825da00c285 --- /dev/null +++ b/watch/tests/tests.rs @@ -0,0 +1,314 @@ +#![recursion_limit = "256"] + +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, +}; +use eth2::{types::BlockId, BeaconNodeHttpClient}; +use http_api::test_utils::{create_api_server, ApiServer}; +use network::NetworkMessage; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use types::{Hash256, MainnetEthSpec, Slot}; +use url::Url; +use watch::{ + client::WatchHttpClient, + database::{Config, Database}, + server::{start_server, Context}, + update_service::{self, *}, +}; + +type E = MainnetEthSpec; + +const VALIDATOR_COUNT: usize = 32; + +struct Tester { + pub harness: BeaconChainHarness>, + pub client: WatchHttpClient, + pub config: Config, + _bn_network_rx: mpsc::UnboundedReceiver>, + _bn_api_shutdown_tx: oneshot::Sender<()>, + _watch_shutdown_tx: oneshot::Sender<()>, +} + +impl Tester { + pub async fn new() -> Self { + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(VALIDATOR_COUNT) + .fresh_ephemeral_store() + .build(); + + /* + * Spawn a Beacon Node HTTP API. + */ + + let ApiServer { + server, + listening_socket: bn_api_listening_socket, + shutdown_tx: _bn_api_shutdown_tx, + network_rx: _bn_network_rx, + .. + } = create_api_server(harness.chain.clone(), harness.logger().clone()).await; + tokio::spawn(server); + + /* + * Create a watch configuration + */ + + let mut config = Config::default(); + config.server_listen_port = 0; + // Use a random string for a database name. It's not ideal to introduce entropy into tests, + // but I think this should be fine since it shouldn't impact functionality. + config.dbname = random_dbname(); + // Drop the database if it exists, to ensure a clean slate. + config.drop_dbname = true; + config.beacon_node_url = format!( + "http://{}:{}", + bn_api_listening_socket.ip(), + bn_api_listening_socket.port() + ); + + /* + * Create a temporary postgres db + */ + + Database::create(&config).await.unwrap(); + + /* + * Spawn a Watch HTTP API. + */ + + let (_watch_shutdown_tx, watch_shutdown_rx) = oneshot::channel(); + let ctx: Context = Context { + config: config.clone(), + _phantom: <_>::default(), + }; + let (watch_listening_socket, watch_server) = start_server(Arc::new(ctx), async { + let _ = watch_shutdown_rx.await; + }) + .unwrap(); + tokio::spawn(watch_server); + + /* + * Create a HTTP client to talk to the watch HTTP API. + */ + + let client = WatchHttpClient { + client: reqwest::Client::new(), + server: Url::parse(&format!( + "http://{}:{}", + watch_listening_socket.ip(), + watch_listening_socket.port() + )) + .unwrap(), + }; + + Self { + harness, + client, + config, + _bn_network_rx, + _bn_api_shutdown_tx, + _watch_shutdown_tx, + } + } + + /// Extend the chain on the beacon chain harness. Do not update watch. + pub fn extend_chain(self, num_blocks: usize) -> Self { + self.harness.advance_slot(); + self.harness.extend_chain( + num_blocks, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + self + } + + /// Run the watch updater service. + pub async fn run_update_service(self, num_runs: usize) -> Self { + for _ in 0..num_runs { + update_service::run_once::(&self.config).await.unwrap(); + } + self + } + + async fn get_db_and_bn(&self) -> (Database, BeaconNodeHttpClient) { + let db = get_db_connection(&self.config).await.unwrap(); + let bn = get_beacon_client(&self.config).unwrap(); + (db, bn) + } + + pub async fn perform_head_update(self) -> Self { + let (mut db, bn) = self.get_db_and_bn().await; + perform_head_update::(&mut db, &bn).await.unwrap(); + self + } + + pub async fn perform_backfill(self, max_slots: usize) -> Self { + let (mut db, bn) = self.get_db_and_bn().await; + perform_backfill::(&mut db, &bn, max_slots) + .await + .unwrap(); + self + } + + pub async fn update_unknown_blocks(self, max_blocks: i64) -> Self { + let (mut db, bn) = self.get_db_and_bn().await; + update_unknown_blocks(&mut db, &bn, max_blocks) + .await + .unwrap(); + self + } + + pub async fn assert_canonical_slots_empty(self) -> Self { + let lowest_slot = self.client.get_lowest_canonical_slot().await.unwrap(); + + assert_eq!(lowest_slot, None); + + self + } + + pub async fn assert_lowest_canonical_slot(self, expected: Slot) -> Self { + let slot = self + .client + .get_lowest_canonical_slot() + .await + .unwrap() + .unwrap(); + + assert_eq!(slot, expected); + + self + } + + pub async fn assert_highest_canonical_slot(self, expected: Slot) -> Self { + let slot = self + .client + .get_highest_canonical_slot() + .await + .unwrap() + .unwrap(); + + assert_eq!(slot, expected); + + self + } + + pub async fn assert_canonical_slots_not_empty(self) -> Self { + self.client + .get_lowest_canonical_slot() + .await + .unwrap() + .unwrap(); + + self + } + + /// Check that the canonical chain in watch matches that of the harness. Also check that all + /// canonical blocks can be retrieved. + pub async fn assert_canonical_chain_consistent(self) -> Self { + let head_root = self.harness.chain.head_info().unwrap().block_root; + let chain: Vec<(Hash256, Slot)> = self + .harness + .chain + .rev_iter_block_roots_from(head_root) + .unwrap() + .map(Result::unwrap) + .collect(); + + for (root, slot) in &chain { + let block = self + .client + .get_beacon_blocks(BlockId::Root(*root)) + .await + .unwrap() + .unwrap(); + assert_eq!(block.slot, *slot); + } + + self + } +} + +impl Drop for Tester { + fn drop(&mut self) { + let config = self.config.clone(); + tokio::spawn(async move { Database::drop_database(&config).await }); + } +} + +pub fn random_dbname() -> String { + let mut s: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(8) + .map(char::from) + .collect(); + // Postgres gets weird about capitals in database names. + s.make_ascii_lowercase(); + format!("test_{}", s) +} + +#[tokio::test] +async fn short_chain() { + Tester::new() + .await + .extend_chain(BACKFILL_SLOT_COUNT / 2) + .assert_canonical_slots_empty() + .await + .run_update_service(1) + .await + .assert_canonical_slots_not_empty() + .await + .assert_canonical_chain_consistent() + .await; +} + +#[tokio::test] +async fn chain_grows() { + Tester::new() + .await + // Apply four blocks to the chain. + .extend_chain(4) + .perform_head_update() + .await + // Head update should insert one block. + .assert_lowest_canonical_slot(Slot::new(4)) + .await + .assert_highest_canonical_slot(Slot::new(4)) + .await + // Fill back to genesis. + .perform_backfill(4) + .await + // All blocks should be present. + .assert_lowest_canonical_slot(Slot::new(0)) + .await + .assert_highest_canonical_slot(Slot::new(4)) + .await + // Apply one block to the chain. + .extend_chain(1) + .perform_head_update() + .await + // All blocks should be present. + .assert_lowest_canonical_slot(Slot::new(0)) + .await + .assert_highest_canonical_slot(Slot::new(5)) + .await + // Apply two blocks to the chain. + .extend_chain(2) + // Update the head. + .perform_head_update() + .await + // All blocks should be present. + .assert_lowest_canonical_slot(Slot::new(0)) + .await + .assert_highest_canonical_slot(Slot::new(7)) + .await + // Insert all blocks. + .update_unknown_blocks(i64::max_value()) + .await + // Check the chain is consistent + .assert_canonical_chain_consistent() + .await; +}