Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Prewarmer
  • Loading branch information
levkk committed May 11, 2023
commit 272d29c8c2a9f9d73948b5dca665977d665d52ea
8 changes: 7 additions & 1 deletion pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,14 @@ connect_timeout = 3000

[plugins]

[plugins.query_logger]
[plugins.prewarmer]
enabled = false
queries = [
"SELECT pg_prewarm('pgbench_accounts')",
]

[plugins.query_logger]
enabled = true

[plugins.table_access]
enabled = false
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ pub struct Plugins {
pub intercept: Option<Intercept>,
pub table_access: Option<TableAccess>,
pub query_logger: Option<QueryLogger>,
pub prewarmer: Option<Prewarmer>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
Expand All @@ -711,6 +712,12 @@ pub struct QueryLogger {
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
pub struct Prewarmer {
pub enabled: bool,
pub queries: Vec<String>,
}

impl Intercept {
pub fn substitute(&mut self, db: &str, user: &str) {
for (_, query) in self.queries.iter_mut() {
Expand Down
1 change: 1 addition & 0 deletions src/plugins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//!

pub mod intercept;
pub mod prewarmer;
pub mod query_logger;
pub mod table_access;

Expand Down
48 changes: 48 additions & 0 deletions src/plugins/prewarmer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Prewarm new connections before giving them to the client.
use crate::{errors::Error, server::Server};
use arc_swap::ArcSwap;
// use async_trait::async_trait;
use log::info;
use once_cell::sync::Lazy;
use std::sync::Arc;

static QUERIES: Lazy<ArcSwap<Vec<String>>> = Lazy::new(|| ArcSwap::from_pointee(Vec::new()));

pub struct Prewarmer;

pub fn setup(queries: &Vec<String>) {
QUERIES.store(Arc::new(queries.clone()));

info!("Query prewarmer enabled.");

for query in queries {
info!("Prewarming query: {}", query);
}
}

pub fn disable() {
QUERIES.store(Arc::new(vec![]));
}

pub fn enabled() -> bool {
!QUERIES.load().is_empty()
}

// TODO: come up with a decent interface for server plugins.

pub async fn run(server: &mut Server) -> Result<(), Error> {
let mut queries = Vec::new();

// Don't want to hold arcswap while we run these potentially slow queries.
{
for query in QUERIES.load().iter() {
queries.push(query.clone());
}
}

for query in queries {
server.query(&query).await?;
}

Ok(())
}
68 changes: 41 additions & 27 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, R
use crate::errors::Error;

use crate::auth_passthrough::AuthPassthrough;
use crate::plugins::prewarmer;
use crate::server::Server;
use crate::sharding::ShardingFunction;
use crate::stats::{AddressStats, ClientStats, PoolStats, ServerStats};
Expand Down Expand Up @@ -209,6 +210,41 @@ impl ConnectionPool {
let mut new_pools = HashMap::new();
let mut address_id: usize = 0;

// Enable plugins, if configured
if let Some(ref plugins) = config.plugins {
if let Some(ref intercept) = plugins.intercept {
if intercept.enabled {
crate::plugins::intercept::setup(intercept, &new_pools);
} else {
crate::plugins::intercept::disable();
}
}

if let Some(ref table_access) = plugins.table_access {
if table_access.enabled {
crate::plugins::table_access::setup(table_access);
} else {
crate::plugins::table_access::disable();
}
}

if let Some(ref query_logger) = plugins.query_logger {
if query_logger.enabled {
crate::plugins::query_logger::setup();
} else {
crate::plugins::query_logger::disable();
}
}

if let Some(ref prewarmer) = plugins.prewarmer {
if prewarmer.enabled {
crate::plugins::prewarmer::setup(&prewarmer.queries);
} else {
crate::plugins::prewarmer::disable();
}
}
}

for (pool_name, pool_config) in &config.pools {
let new_pool_hash_value = pool_config.hash_value();

Expand Down Expand Up @@ -471,32 +507,6 @@ impl ConnectionPool {
}
}

if let Some(ref plugins) = config.plugins {
if let Some(ref intercept) = plugins.intercept {
if intercept.enabled {
crate::plugins::intercept::setup(intercept, &new_pools);
} else {
crate::plugins::intercept::disable();
}
}

if let Some(ref table_access) = plugins.table_access {
if table_access.enabled {
crate::plugins::table_access::setup(table_access);
} else {
crate::plugins::table_access::disable();
}
}

if let Some(ref query_logger) = plugins.query_logger {
if query_logger.enabled {
crate::plugins::query_logger::setup();
} else {
crate::plugins::query_logger::disable();
}
}
}

POOLS.store(Arc::new(new_pools.clone()));
Ok(())
}
Expand Down Expand Up @@ -973,7 +983,11 @@ impl ManageConnection for ServerPool {
)
.await
{
Ok(conn) => {
Ok(mut conn) => {
if prewarmer::enabled() {
prewarmer::run(&mut conn).await?;
}

stats.idle();
Ok(conn)
}
Expand Down
2 changes: 2 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,8 @@ impl Server {
/// It will use the simple query protocol.
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
debug!("Running `{}` on server {:?}", query, self.address);

let query = simple_query(query);

self.send(&query).await?;
Expand Down