Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ async fn get_bigtable(
credential_type: CredentialType::Filepath(Some(args.crediential_path.unwrap())),
instance_name: args.instance_name,
app_profile_id: args.app_profile_id,
max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE,
},
)
.await
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub struct RpcBigtableConfig {
pub bigtable_instance_name: String,
pub bigtable_app_profile_id: String,
pub timeout: Option<Duration>,
pub max_message_size: usize,
}

impl Default for RpcBigtableConfig {
Expand All @@ -181,6 +182,7 @@ impl Default for RpcBigtableConfig {
bigtable_instance_name,
bigtable_app_profile_id,
timeout: None,
max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ impl JsonRpcService {
ref bigtable_instance_name,
ref bigtable_app_profile_id,
timeout,
max_message_size,
}) = config.rpc_bigtable_config
{
let bigtable_config = solana_storage_bigtable::LedgerStorageConfig {
Expand All @@ -414,6 +415,7 @@ impl JsonRpcService {
credential_type: CredentialType::Filepath(None),
instance_name: bigtable_instance_name.clone(),
app_profile_id: bigtable_app_profile_id.clone(),
max_message_size,
};
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new_with_config(
Expand Down
1,047 changes: 972 additions & 75 deletions storage-bigtable/proto/google.api.rs

Large diffs are not rendered by default.

788 changes: 751 additions & 37 deletions storage-bigtable/proto/google.bigtable.v2.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions storage-bigtable/proto/google.rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Status {
/// The status code, which should be an enum value of \[google.rpc.Code][google.rpc.Code\].
/// The status code, which should be an enum value of
/// \[google.rpc.Code][google.rpc.Code\].
#[prost(int32, tag = "1")]
pub code: i32,
/// A developer-facing error message, which should be in English. Any
/// user-facing error message should be localized and sent in the
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized by the client.
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized
/// by the client.
#[prost(string, tag = "2")]
pub message: ::prost::alloc::string::String,
/// A list of messages that carry the error details. There is a common set of
Expand Down
27 changes: 25 additions & 2 deletions storage-bigtable/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub struct BigTableConnection {
table_prefix: String,
app_profile_id: String,
timeout: Option<Duration>,
max_message_size: usize,
}

impl BigTableConnection {
Expand All @@ -141,11 +142,18 @@ impl BigTableConnection {
read_only: bool,
timeout: Option<Duration>,
credential_type: CredentialType,
max_message_size: usize,
) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint);
Self::new_for_emulator(instance_name, app_profile_id, &endpoint, timeout)
Self::new_for_emulator(
instance_name,
app_profile_id,
&endpoint,
timeout,
max_message_size,
)
}

Err(_) => {
Expand Down Expand Up @@ -210,6 +218,7 @@ impl BigTableConnection {
table_prefix,
app_profile_id: app_profile_id.to_string(),
timeout,
max_message_size,
})
}
}
Expand All @@ -220,6 +229,7 @@ impl BigTableConnection {
app_profile_id: &str,
endpoint: &str,
timeout: Option<Duration>,
max_message_size: usize,
) -> Result<Self> {
Ok(Self {
access_token: None,
Expand All @@ -229,6 +239,7 @@ impl BigTableConnection {
table_prefix: format!("projects/emulator/instances/{instance_name}/tables/"),
app_profile_id: app_profile_id.to_string(),
timeout,
max_message_size,
})
}

Expand All @@ -254,7 +265,9 @@ impl BigTableConnection {
}
Ok(req)
},
);
)
.max_decoding_message_size(self.max_message_size)
.max_encoding_message_size(self.max_message_size);
BigTable {
access_token: self.access_token.clone(),
client,
Expand Down Expand Up @@ -469,6 +482,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
],
})),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand All @@ -494,6 +509,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
filter: Some(RowFilter {
filter: Some(row_filter::Filter::StripValueTransformer(true)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down Expand Up @@ -545,6 +562,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down Expand Up @@ -577,6 +596,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down Expand Up @@ -610,6 +631,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down
6 changes: 6 additions & 0 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {

pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
pub const DEFAULT_APP_PROFILE_ID: &str = "default";
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB

#[derive(Debug)]
pub enum CredentialType {
Expand All @@ -395,6 +396,7 @@ pub struct LedgerStorageConfig {
pub credential_type: CredentialType,
pub instance_name: String,
pub app_profile_id: String,
pub max_message_size: usize,
}

impl Default for LedgerStorageConfig {
Expand All @@ -405,6 +407,7 @@ impl Default for LedgerStorageConfig {
credential_type: CredentialType::Filepath(None),
instance_name: DEFAULT_INSTANCE_NAME.to_string(),
app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
Expand Down Expand Up @@ -471,6 +474,7 @@ impl LedgerStorage {
app_profile_id,
endpoint,
timeout,
LedgerStorageConfig::default().max_message_size,
)?,
stats,
})
Expand All @@ -484,13 +488,15 @@ impl LedgerStorage {
instance_name,
app_profile_id,
credential_type,
max_message_size,
} = config;
let connection = bigtable::BigTableConnection::new(
instance_name.as_str(),
app_profile_id.as_str(),
read_only,
timeout,
credential_type,
max_message_size,
)
.await?;
Ok(Self { stats, connection })
Expand Down
1 change: 1 addition & 0 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ fn main() {
String
),
timeout: None,
..RpcBigtableConfig::default()
})
} else {
None
Expand Down
12 changes: 12 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_bigtable_app_profile_id)
.help("Bigtable application profile id to use in requests")
)
.arg(
Arg::with_name("rpc_bigtable_max_message_size")
.long("rpc-bigtable-max-message-size")
.value_name("BYTES")
.validator(is_parsable::<usize>)
.takes_value(true)
.default_value(&default_args.rpc_bigtable_max_message_size)
.help("Max encoding and decoding message size used in Bigtable Grpc client"),
)
.arg(
Arg::with_name("rpc_pubsub_worker_threads")
.long("rpc-pubsub-worker-threads")
Expand Down Expand Up @@ -1925,6 +1934,7 @@ pub struct DefaultArgs {
pub rpc_bigtable_timeout: String,
pub rpc_bigtable_instance_name: String,
pub rpc_bigtable_app_profile_id: String,
pub rpc_bigtable_max_message_size: String,
pub rpc_max_request_body_size: String,
pub rpc_pubsub_worker_threads: String,

Expand Down Expand Up @@ -2010,6 +2020,8 @@ impl DefaultArgs {
rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(),
rpc_bigtable_app_profile_id: solana_storage_bigtable::DEFAULT_APP_PROFILE_ID
.to_string(),
rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE
.to_string(),
rpc_pubsub_worker_threads: "4".to_string(),
accountsdb_repl_threads: num_cpus::get().to_string(),
maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN
Expand Down
1 change: 1 addition & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ pub fn main() {
timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
.ok()
.map(Duration::from_secs),
max_message_size: value_t_or_exit!(matches, "rpc_bigtable_max_message_size", usize),
})
} else {
None
Expand Down