diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index c9e5df9f0407..d0009d0dc9f5 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -131,6 +131,7 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; // Export some version independent functions that are used outside of this mod pub use v14::xlog_utils::encode_logical_message; +pub use v14::xlog_utils::from_pg_timestamp; pub use v14::xlog_utils::get_current_timestamp; pub use v14::xlog_utils::to_pg_timestamp; pub use v14::xlog_utils::XLogFileName; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 61a9c38a84a2..0ca9bd8b4582 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -136,21 +136,42 @@ pub fn get_current_timestamp() -> TimestampTz { to_pg_timestamp(SystemTime::now()) } -pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz { - const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */ - const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */ +// Module to reduce the scope of the constants +mod timestamp_conversions { + use std::time::Duration; + + use super::*; + + const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1) + const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1) const SECS_PER_DAY: u64 = 86400; const USECS_PER_SEC: u64 = 1000000; - match time.duration_since(SystemTime::UNIX_EPOCH) { - Ok(n) => { - ((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) - * USECS_PER_SEC - + n.subsec_micros() as u64) as i64 + const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 = + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY; + + pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz { + match time.duration_since(SystemTime::UNIX_EPOCH) { + Ok(n) => { + ((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC + + n.subsec_micros() as u64) as i64 + } + Err(_) => panic!("SystemTime before UNIX EPOCH!"), } - Err(_) => panic!("SystemTime before UNIX EPOCH!"), + } + + pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime { + let time: u64 = time + .try_into() + .expect("timestamp before millenium (postgres epoch)"); + let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC; + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_micros(since_unix_epoch)) + .expect("SystemTime overflow") } } +pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp}; + // Returns (aligned) end_lsn of the last record in data_dir with WAL segments. // start_lsn must point to some previously known record boundary (beginning of // the next record). If no valid record after is found, start_lsn is returned @@ -481,4 +502,24 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { wal } -// If you need to craft WAL and write tests for this module, put it at wal_craft crate. +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ts_conversion() { + let now = SystemTime::now(); + let round_trip = from_pg_timestamp(to_pg_timestamp(now)); + + let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + assert_eq!(now_since.as_micros(), round_trip_since.as_micros()); + + let now_pg = get_current_timestamp(); + let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg)); + + assert_eq!(now_pg, round_trip_pg); + } + + // If you need to craft WAL and write tests for this module, put it at wal_craft crate. +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 477a2d378d65..f0bf2666a7bf 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -306,6 +306,67 @@ paths: schema: $ref: "#/components/schemas/ServiceUnavailableError" + /v1/tenant/{tenant_id}/timeline/{timeline_id}/get_timestamp_of_lsn: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + - name: timeline_id + in: path + required: true + schema: + type: string + format: hex + get: + description: Get timestamp for a given LSN + parameters: + - name: lsn + in: query + required: true + schema: + type: integer + description: A LSN to get the timestamp + responses: + "200": + description: OK + content: + application/json: + schema: + type: string + format: date-time + "400": + description: Error when no tenant id found in path, no timeline id or invalid timestamp + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "404": + description: Timeline not found, or there is no timestamp information for the given lsn + content: + application/json: + schema: + $ref: "#/components/schemas/NotFoundError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" /v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp: parameters: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c00dad50ace4..142e85d2034d 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2,10 +2,12 @@ //! Management HTTP API //! use std::collections::HashMap; +use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use futures::TryFutureExt; +use humantime::format_rfc3339; use hyper::header::CONTENT_TYPE; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; @@ -502,6 +504,33 @@ async fn get_lsn_by_timestamp_handler( json_response(StatusCode::OK, result) } +async fn get_timestamp_of_lsn_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + + let lsn_str = must_get_query_param(&request, "lsn")?; + let lsn = Lsn::from_str(&lsn_str) + .with_context(|| format!("Invalid LSN: {lsn_str:?}")) + .map_err(ApiError::BadRequest)?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?; + + match result { + Some(time) => { + let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string(); + json_response(StatusCode::OK, time) + } + None => json_response(StatusCode::NOT_FOUND, ()), + } +} + async fn tenant_attach_handler( mut request: Request, _cancel: CancellationToken, @@ -1680,6 +1709,10 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp", |r| api_handler(r, get_lsn_by_timestamp_handler), ) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/get_timestamp_of_lsn", + |r| api_handler(r, get_timestamp_of_lsn_handler), + ) .put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| { api_handler(r, timeline_gc_handler) }) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index ebba3d857950..d27c8a3d5d5e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -19,6 +19,7 @@ use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; +use std::ops::ControlFlow; use std::ops::Range; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -370,7 +371,6 @@ impl Timeline { } } - /// /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'. /// @@ -385,6 +385,50 @@ impl Timeline { found_larger: &mut bool, ctx: &RequestContext, ) -> Result { + self.map_all_timestamps(probe_lsn, ctx, |timestamp| { + if timestamp >= search_timestamp { + *found_larger = true; + return ControlFlow::Break(true); + } else { + *found_smaller = true; + } + ControlFlow::Continue(()) + }) + .await + } + + /// Obtain the possible timestamp range for the given lsn. + /// + /// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps. + pub async fn get_timestamp_for_lsn( + &self, + probe_lsn: Lsn, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let mut max: Option = None; + self.map_all_timestamps(probe_lsn, ctx, |timestamp| { + if let Some(max_prev) = max { + max = Some(max_prev.max(timestamp)); + } else { + max = Some(timestamp); + } + ControlFlow::Continue(()) + }) + .await?; + + Ok(max) + } + + /// Runs the given function on all the timestamps for a given lsn + /// + /// The return value is either given by the closure, or set to the `Default` + /// impl's output. + async fn map_all_timestamps( + &self, + probe_lsn: Lsn, + ctx: &RequestContext, + mut f: impl FnMut(TimestampTz) -> ControlFlow, + ) -> Result { for segno in self .list_slru_segments(SlruKind::Clog, probe_lsn, ctx) .await? @@ -402,16 +446,14 @@ impl Timeline { timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]); let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); - if timestamp >= search_timestamp { - *found_larger = true; - return Ok(true); - } else { - *found_smaller = true; + match f(timestamp) { + ControlFlow::Break(b) => return Ok(b), + ControlFlow::Continue(()) => (), } } } } - Ok(false) + Ok(Default::default()) } /// Get a list of SLRU segments diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 460a30ad56ad..598b48b56fa3 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -453,6 +453,15 @@ def timeline_get_lsn_by_timestamp( res_json = res.json() return res_json + def timeline_get_timestamp_of_lsn(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn): + log.info(f"Requesting time range of lsn {lsn}, tenant {tenant_id}, timeline {timeline_id}") + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_timestamp_of_lsn?lsn={lsn}", + ) + self.verbose_error(res) + res_json = res.json() + return res_json + def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): self.is_testing_enabled_or_skip() diff --git a/test_runner/regress/test_lsn_mapping.py b/test_runner/regress/test_lsn_mapping.py index 8ccfc21cf794..03606e3c1cbf 100644 --- a/test_runner/regress/test_lsn_mapping.py +++ b/test_runner/regress/test_lsn_mapping.py @@ -1,7 +1,10 @@ -from datetime import timedelta +import time +from datetime import datetime, timedelta, timezone from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn +from fixtures.pageserver.http import PageserverApiException +from fixtures.types import Lsn from fixtures.utils import query_scalar @@ -25,13 +28,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder): cur.execute("CREATE TABLE foo (x integer)") tbl = [] for i in range(1000): - cur.execute(f"INSERT INTO foo VALUES({i})") + cur.execute("INSERT INTO foo VALUES(%s)", (i,)) # Get the timestamp at UTC after_timestamp = query_scalar(cur, "SELECT clock_timestamp()").replace(tzinfo=None) tbl.append([i, after_timestamp]) # Execute one more transaction with synchronous_commit enabled, to flush # all the previous transactions + cur.execute("SET synchronous_commit=on") cur.execute("INSERT INTO foo VALUES (-1)") # Wait until WAL is received by pageserver @@ -67,3 +71,100 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder): assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i endpoint_here.stop_and_destroy() + + +# Test pageserver get_timestamp_of_lsn API +def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + + new_timeline_id = env.neon_cli.create_branch("test_ts_of_lsn_api") + endpoint_main = env.endpoints.create_start("test_ts_of_lsn_api") + log.info("postgres is running on 'test_ts_of_lsn_api' branch") + + cur = endpoint_main.connect().cursor() + # Create table, and insert rows, each in a separate transaction + # Disable synchronous_commit to make this initialization go faster. + # + # Each row contains current insert LSN and the current timestamp, when + # the row was inserted. + cur.execute("SET synchronous_commit=off") + cur.execute("CREATE TABLE foo (x integer)") + tbl = [] + for i in range(1000): + cur.execute("INSERT INTO foo VALUES(%s)", (i,)) + # Get the timestamp at UTC + after_timestamp = query_scalar(cur, "SELECT clock_timestamp()").replace(tzinfo=timezone.utc) + after_lsn = query_scalar(cur, "SELECT pg_current_wal_lsn()") + tbl.append([i, after_timestamp, after_lsn]) + time.sleep(0.005) + + # Execute one more transaction with synchronous_commit enabled, to flush + # all the previous transactions + cur.execute("SET synchronous_commit=on") + cur.execute("INSERT INTO foo VALUES (-1)") + + # Wait until WAL is received by pageserver + last_flush_lsn = wait_for_last_flush_lsn( + env, endpoint_main, env.initial_tenant, new_timeline_id + ) + + with env.pageserver.http_client() as client: + # Check edge cases: lsn larger than the last flush lsn + probe_lsn = Lsn(int(last_flush_lsn) * 20 + 80_000) + result = client.timeline_get_timestamp_of_lsn( + env.initial_tenant, + new_timeline_id, + probe_lsn, + ) + + # lsn of zero + try: + probe_lsn = Lsn(0) + result = client.timeline_get_timestamp_of_lsn( + env.initial_tenant, + new_timeline_id, + probe_lsn, + ) + # There should always be an error here. + raise RuntimeError("there should have been an 'Invalid LSN' error") + except PageserverApiException as error: + assert error.status_code == 500 + assert str(error) == "Invalid LSN" + env.pageserver.allowed_errors.append(".*Invalid LSN.*") + + # small lsn before initdb_lsn + try: + probe_lsn = Lsn(64) + result = client.timeline_get_timestamp_of_lsn( + env.initial_tenant, + new_timeline_id, + probe_lsn, + ) + # There should always be an error here. + raise RuntimeError("there should have been an 'could not find data for key' error") + except PageserverApiException as error: + assert error.status_code == 500 + assert str(error).startswith("could not find data for key") + env.pageserver.allowed_errors.append(".*could not find data for key.*") + + # Probe a bunch of timestamps in the valid range + step_size = 100 + for i in range(step_size, len(tbl), step_size): + after_timestamp = tbl[i][1] + after_lsn = tbl[i][2] + result = client.timeline_get_timestamp_of_lsn( + env.initial_tenant, + new_timeline_id, + after_lsn, + ) + log.info("result: %s, after_ts: %s", result, after_timestamp) + + # TODO use fromisoformat once we have Python 3.11+ + # which has https://github.com/python/cpython/pull/92177 + timestamp = datetime.strptime(result, "%Y-%m-%dT%H:%M:%S.%f000Z").replace( + tzinfo=timezone.utc + ) + assert timestamp < after_timestamp, "after_timestamp after timestamp" + if i > 1: + before_timestamp = tbl[i - step_size][1] + assert timestamp >= before_timestamp, "before_timestamp before timestamp"