Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 3 additions & 107 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ use metrics_util::layers::Stack;

use anyhow::Context;
use cnidarium::{StateDelta, Storage};
use ibc_proto::ibc::core::channel::v1::query_server::QueryServer as ChannelQueryServer;
use ibc_proto::ibc::core::client::v1::query_server::QueryServer as ClientQueryServer;
use ibc_proto::ibc::core::connection::v1::query_server::QueryServer as ConnectionQueryServer;
use metrics_exporter_prometheus::PrometheusBuilder;
use pd::{
cli::{Opt, RootCommand, TestnetCommand},
Expand All @@ -23,16 +20,11 @@ use pd::{
join::testnet_join,
},
};
use penumbra_app::{PenumbraHost, SUBSTORE_PREFIXES};
use penumbra_proto::core::component::dex::v1::simulation_service_server::SimulationServiceServer;
use penumbra_proto::util::tendermint_proxy::v1::tendermint_proxy_service_server::TendermintProxyServiceServer;
use penumbra_tendermint_proxy::TendermintProxy;
use penumbra_tower_trace::remote_addr;
use penumbra_app::SUBSTORE_PREFIXES;
use rand::Rng;
use rand_core::OsRng;
use tendermint_config::net::Address as TendermintAddress;
use tokio::runtime;
use tonic::transport::Server;
use tower_http::cors::CorsLayer;
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;
Expand Down Expand Up @@ -128,109 +120,13 @@ async fn main() -> anyhow::Result<()> {
"starting pd"
);

let tm_proxy = TendermintProxy::new(cometbft_addr);
let abci_server = tokio::task::Builder::new()
.name("abci_server")
.spawn(penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind))
.expect("failed to spawn abci server");

let ibc = penumbra_ibc::component::rpc::IbcQuery::<PenumbraHost>::new(storage.clone());

// TODO: Once we migrate to Tonic 0.10.0, we'll be able to use the
// `Routes` structure to have each component define a method that
// returns a `Routes` with all of its query services bundled inside.
//
// This means we won't have to import all this shit and recite every
// single service -- we can e.g., have the app crate assemble all of
// its components' query services into a single `Routes` and then
// just add that to the gRPC server.

use cnidarium::rpc::proto::v1::query_service_server::QueryServiceServer as StorageQueryServiceServer;
use penumbra_proto::core::{
app::v1::query_service_server::QueryServiceServer as AppQueryServiceServer,
component::{
compact_block::v1::query_service_server::QueryServiceServer as CompactBlockQueryServiceServer,
dex::v1::query_service_server::QueryServiceServer as DexQueryServiceServer,
fee::v1::query_service_server::QueryServiceServer as FeeQueryServiceServer,
governance::v1::query_service_server::QueryServiceServer as GovernanceQueryServiceServer,
sct::v1::query_service_server::QueryServiceServer as SctQueryServiceServer,
shielded_pool::v1::query_service_server::QueryServiceServer as ShieldedPoolQueryServiceServer,
stake::v1::query_service_server::QueryServiceServer as StakeQueryServiceServer,
},
};
use tonic_web::enable as we;

use cnidarium::rpc::Server as StorageServer;
use penumbra_app::rpc::Server as AppServer;
use penumbra_compact_block::component::rpc::Server as CompactBlockServer;
use penumbra_dex::component::rpc::Server as DexServer;
use penumbra_fee::component::rpc::Server as FeeServer;
use penumbra_governance::component::rpc::Server as GovernanceServer;
use penumbra_sct::component::rpc::Server as SctServer;
use penumbra_shielded_pool::component::rpc::Server as ShieldedPoolServer;
use penumbra_stake::component::rpc::Server as StakeServer;

let mut grpc_server = Server::builder()
.trace_fn(|req| match remote_addr(req) {
Some(remote_addr) => {
tracing::error_span!("grpc", ?remote_addr)
}
None => tracing::error_span!("grpc"),
})
// Allow HTTP/1, which will be used by grpc-web connections.
// This is particularly important when running locally, as gRPC
// typically uses HTTP/2, which requires HTTPS. Accepting HTTP/2
// allows local applications such as web browsers to talk to pd.
.accept_http1(true)
// As part of #2932, we are disabling all timeouts until we circle back to our
// performance story.
// Sets a timeout for all gRPC requests, but note that in the case of streaming
// requests, the timeout is only applied to the initial request. This means that
// this does not prevent long lived streams, for example to allow clients to obtain
// new blocks.
// .timeout(std::time::Duration::from_secs(7))
// Wrap each of the gRPC services in a tonic-web proxy:
.add_service(we(StorageQueryServiceServer::new(StorageServer::new(
storage.clone(),
))))
.add_service(we(AppQueryServiceServer::new(AppServer::new(
storage.clone(),
))))
.add_service(we(CompactBlockQueryServiceServer::new(
CompactBlockServer::new(storage.clone()),
)))
.add_service(we(DexQueryServiceServer::new(DexServer::new(
storage.clone(),
))))
.add_service(we(FeeQueryServiceServer::new(FeeServer::new(
storage.clone(),
))))
.add_service(we(GovernanceQueryServiceServer::new(
GovernanceServer::new(storage.clone()),
)))
.add_service(we(SctQueryServiceServer::new(SctServer::new(
storage.clone(),
))))
.add_service(we(ShieldedPoolQueryServiceServer::new(
ShieldedPoolServer::new(storage.clone()),
)))
.add_service(we(StakeQueryServiceServer::new(StakeServer::new(
storage.clone(),
))))
.add_service(we(ClientQueryServer::new(ibc.clone())))
.add_service(we(ChannelQueryServer::new(ibc.clone())))
.add_service(we(ConnectionQueryServer::new(ibc.clone())))
.add_service(we(TendermintProxyServiceServer::new(tm_proxy.clone())))
.add_service(we(tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(penumbra_proto::FILE_DESCRIPTOR_SET)
.build()
.with_context(|| "could not configure grpc reflection service")?));

if enable_expensive_rpc {
grpc_server = grpc_server.add_service(we(SimulationServiceServer::new(
DexServer::new(storage.clone()),
)));
}
let grpc_server =
penumbra_app::rpc::router(&storage, cometbft_addr, enable_expensive_rpc)?;

// Create Axum routes for the frontend app.
let frontend = pd::zipserve::router("/app/", pd::MINIFRONT_ARCHIVE_BYTES);
Expand Down
8 changes: 6 additions & 2 deletions crates/core/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bech32 = { workspace = true }
bincode = { workspace = true }
bitvec = { workspace = true }
blake2b_simd = { workspace = true }
cnidarium = { workspace = true, default-features = true }
cnidarium = { workspace = true, features = ["migration", "rpc"], default-features = true }
cnidarium-component = { workspace = true, default-features = true }
decaf377 = { workspace = true, default-features = true }
decaf377-rdsa = { workspace = true }
Expand All @@ -44,7 +44,7 @@ penumbra-fee = { workspace = true, default-features = true }
penumbra-funding = { workspace = true, default-features = true }
penumbra-genesis = { workspace = true }
penumbra-governance = { workspace = true, default-features = true }
penumbra-ibc = { workspace = true, features = ["component"], default-features = true }
penumbra-ibc = { workspace = true, features = ["component", "rpc"], default-features = true }
penumbra-keys = { workspace = true, default-features = true }
penumbra-num = { workspace = true, default-features = true }
penumbra-proof-params = { workspace = true, default-features = true }
Expand All @@ -53,6 +53,7 @@ penumbra-sct = { workspace = true, default-features = true }
penumbra-shielded-pool = { workspace = true, features = ["component"], default-features = true }
penumbra-stake = { workspace = true, default-features = true }
penumbra-tct = { workspace = true, default-features = true }
penumbra-tendermint-proxy = { path = "../../util/tendermint-proxy" }
penumbra-tower-trace = { path = "../../util/tower-trace" }
penumbra-transaction = { workspace = true, features = ["parallel"], default-features = true }
penumbra-txhash = { workspace = true, default-features = true }
Expand All @@ -71,11 +72,14 @@ tendermint-proto = { workspace = true }
tokio = { workspace = true, features = ["full", "tracing"] }
tokio-util = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
tonic-web = { workspace = true }
tower = { workspace = true, features = ["full"] }
tower-abci = "0.11"
tower-actor = "0.1.0"
tower-service = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
ed25519-consensus = { workspace = true }
Expand Down
168 changes: 116 additions & 52 deletions crates/core/app/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,122 @@
use cnidarium::Storage;
use penumbra_proto::core::app::v1::{
query_service_server::QueryService, AppParametersRequest, AppParametersResponse,
TransactionsByHeightRequest, TransactionsByHeightResponse,
};
use tonic::Status;
use tracing::instrument;

use crate::app::StateReadExt as _;

// TODO: Hide this and only expose a Router?
pub struct Server {
storage: Storage,
}
mod query;

impl Server {
pub fn new(storage: Storage) -> Self {
Self { storage }
}
}

#[tonic::async_trait]
impl QueryService for Server {
#[instrument(skip(self, request))]
async fn transactions_by_height(
&self,
request: tonic::Request<TransactionsByHeightRequest>,
) -> Result<tonic::Response<TransactionsByHeightResponse>, Status> {
let state = self.storage.latest_snapshot();
let request_inner = request.into_inner();
let block_height = request_inner.block_height;
// TODO: Once we migrate to Tonic 0.10.0, we'll be able to use the `Routes` structure to have each
// component define a method that returns a `Routes` with all of its query services bundled inside.
//
// This means we won't have to import all this shit and recite every single service -- we can e.g.,
// have the app crate assemble all of its components' query services into a single `Routes` and
// then just add that to the gRPC server.
use {
self::query::AppQueryServer,
crate::PenumbraHost,
anyhow::Context,
cnidarium::rpc::{
proto::v1::query_service_server::QueryServiceServer as StorageQueryServiceServer,
Server as StorageServer,
},
ibc_proto::ibc::core::{
channel::v1::query_server::QueryServer as ChannelQueryServer,
client::v1::query_server::QueryServer as ClientQueryServer,
connection::v1::query_server::QueryServer as ConnectionQueryServer,
},
penumbra_compact_block::component::rpc::Server as CompactBlockServer,
penumbra_dex::component::rpc::Server as DexServer,
penumbra_fee::component::rpc::Server as FeeServer,
penumbra_governance::component::rpc::Server as GovernanceServer,
penumbra_proto::{
core::{
app::v1::query_service_server::QueryServiceServer as AppQueryServiceServer,
component::{
compact_block::v1::query_service_server::QueryServiceServer as CompactBlockQueryServiceServer,
dex::v1::{
query_service_server::QueryServiceServer as DexQueryServiceServer,
simulation_service_server::SimulationServiceServer,
},
fee::v1::query_service_server::QueryServiceServer as FeeQueryServiceServer,
governance::v1::query_service_server::QueryServiceServer as GovernanceQueryServiceServer,
sct::v1::query_service_server::QueryServiceServer as SctQueryServiceServer,
shielded_pool::v1::query_service_server::QueryServiceServer as ShieldedPoolQueryServiceServer,
stake::v1::query_service_server::QueryServiceServer as StakeQueryServiceServer,
},
},
util::tendermint_proxy::v1::tendermint_proxy_service_server::TendermintProxyServiceServer,
},
penumbra_sct::component::rpc::Server as SctServer,
penumbra_shielded_pool::component::rpc::Server as ShieldedPoolServer,
penumbra_stake::component::rpc::Server as StakeServer,
penumbra_tendermint_proxy::TendermintProxy,
penumbra_tower_trace::remote_addr,
tonic_web::enable as we,
};

let tx_response = state
.transactions_by_height(block_height)
.await
.map_err(|e| tonic::Status::internal(format!("transaction response bad: {e}")))?;
pub fn router(
storage: &cnidarium::Storage,
cometbft_addr: url::Url,
enable_expensive_rpc: bool,
) -> anyhow::Result<tonic::transport::server::Router> {
let tm_proxy = TendermintProxy::new(cometbft_addr);
let ibc = penumbra_ibc::component::rpc::IbcQuery::<PenumbraHost>::new(storage.clone());
let mut grpc_server = tonic::transport::server::Server::builder()
.trace_fn(|req| match remote_addr(req) {
Some(remote_addr) => {
tracing::error_span!("grpc", ?remote_addr)
}
None => tracing::error_span!("grpc"),
})
// Allow HTTP/1, which will be used by grpc-web connections.
// This is particularly important when running locally, as gRPC
// typically uses HTTP/2, which requires HTTPS. Accepting HTTP/2
// allows local applications such as web browsers to talk to pd.
.accept_http1(true)
// As part of #2932, we are disabling all timeouts until we circle back to our
// performance story.
// Sets a timeout for all gRPC requests, but note that in the case of streaming
// requests, the timeout is only applied to the initial request. This means that
// this does not prevent long lived streams, for example to allow clients to obtain
// new blocks.
// .timeout(std::time::Duration::from_secs(7))
// Wrap each of the gRPC services in a tonic-web proxy:
.add_service(we(StorageQueryServiceServer::new(StorageServer::new(
storage.clone(),
))))
.add_service(we(AppQueryServiceServer::new(AppQueryServer::new(
storage.clone(),
))))
.add_service(we(CompactBlockQueryServiceServer::new(
CompactBlockServer::new(storage.clone()),
)))
.add_service(we(DexQueryServiceServer::new(DexServer::new(
storage.clone(),
))))
.add_service(we(FeeQueryServiceServer::new(FeeServer::new(
storage.clone(),
))))
.add_service(we(GovernanceQueryServiceServer::new(
GovernanceServer::new(storage.clone()),
)))
.add_service(we(SctQueryServiceServer::new(SctServer::new(
storage.clone(),
))))
.add_service(we(ShieldedPoolQueryServiceServer::new(
ShieldedPoolServer::new(storage.clone()),
)))
.add_service(we(StakeQueryServiceServer::new(StakeServer::new(
storage.clone(),
))))
.add_service(we(ClientQueryServer::new(ibc.clone())))
.add_service(we(ChannelQueryServer::new(ibc.clone())))
.add_service(we(ConnectionQueryServer::new(ibc.clone())))
.add_service(we(TendermintProxyServiceServer::new(tm_proxy.clone())))
.add_service(we(tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(penumbra_proto::FILE_DESCRIPTOR_SET)
.build()
.with_context(|| "could not configure grpc reflection service")?));

Ok(tonic::Response::new(tx_response))
if enable_expensive_rpc {
grpc_server = grpc_server.add_service(we(SimulationServiceServer::new(DexServer::new(
storage.clone(),
))));
}

#[instrument(skip(self, _request))]
async fn app_parameters(
&self,
_request: tonic::Request<AppParametersRequest>,
) -> Result<tonic::Response<AppParametersResponse>, Status> {
let state = self.storage.latest_snapshot();
// We map the error here to avoid including `tonic` as a dependency
// in the `chain` crate, to support its compilation to wasm.

let app_parameters = state.get_app_params().await.map_err(|e| {
tonic::Status::unavailable(format!("error getting app parameters: {e}"))
})?;

Ok(tonic::Response::new(AppParametersResponse {
app_parameters: Some(app_parameters.into()),
}))
}
Ok(grpc_server)
}
Loading