Skip to content
Closed

WIP #5280

Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
WIP
  • Loading branch information
scottsand-db committed Sep 30, 2025
commit 4b5dd1cec83f66a4d7ae70461d9ad479b3263944
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.clustering.ClusteringMetadataDomain;
import io.delta.kernel.internal.files.ParsedDeltaData;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
Expand All @@ -51,6 +52,117 @@

/** Implementation of {@link Snapshot}. */
public class SnapshotImpl implements Snapshot {

//////////////////////////////////////////
// Static factory methods and constants //
//////////////////////////////////////////

/**
* Creates a post-commit Snapshot after a transaction.
*
* @param engine The engine to use for operations
* @param dataPath The path to the table
* @param previousSnapshot Optional previous snapshot (empty for CREATE transactions)
* @param newlyCommittedDeltaFiles XXXX
* @param committer The committer for the snapshot
* @param txnEffectiveProtocol The effective protocol after the transaction
* @param txnEffectiveMetadata The effective metadata after the transaction
* @param txnInCommitTimestampOpt The in-commit timestamp from the transaction (if available)
* @return A new post-commit Snapshot
*/
public static SnapshotImpl createPostCommitSnapshot(
Engine engine,
Path dataPath,
Optional<SnapshotImpl> previousSnapshot,
List<ParsedDeltaData> newlyCommittedDeltaFiles,
Committer committer,
Protocol txnEffectiveProtocol,
Metadata txnEffectiveMetadata,
Optional<Long> txnInCommitTimestampOpt) {
// TODO: plumb through CRCInfo

// TODO: Create SnapshotQueryContext.forPostCommitSnapshot
final SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forLatestSnapshot(dataPath.toString());

final LogSegment logSegment =
createLogSegmentForPostCommit(dataPath, previousSnapshot, newlyCommittedDeltaFiles);

// TODO: We should make post-commit Snapshots be fully incremental. That is, we replay state
// using any previously-computed state from the previous Snapshot / LogReplay.
final LogReplay logReplay =
new LogReplay(
dataPath,
engine,
new Lazy<>(() -> logSegment),
Optional.empty(), // snapshotHint
snapshotContext.getSnapshotMetrics());

return new SnapshotImpl(
dataPath,
logSegment.getVersion(),
new Lazy<>(() -> logSegment),
logReplay,
txnEffectiveProtocol,
txnEffectiveMetadata,
committer,
snapshotContext,
txnInCommitTimestampOpt);
}

/** Creates an initial Snapshot for a table at a specific version. */
public static SnapshotImpl createInitialSnapshot(
Path dataPath,
long version,
Lazy<LogSegment> lazyLogSegment,
LogReplay logReplay,
Protocol protocol,
Metadata metadata,
Committer committer,
SnapshotQueryContext snapshotContext) {
return new SnapshotImpl(
dataPath,
version,
lazyLogSegment,
logReplay,
protocol,
metadata,
committer,
snapshotContext,
Optional.empty()); // postCommitInCommitTimestampOpt
}

private static LogSegment createLogSegmentForPostCommit(
Path dataPath,
Optional<SnapshotImpl> previousSnapshot,
List<ParsedDeltaData> newlyCommittedDeltaFiles) {
if (previousSnapshot.isPresent()) {
// UPDATE/REPLACE case: extend existing LogSegment
return previousSnapshot
.get()
.getLogSegment()
.copyWithAdditionalDeltas(newlyCommittedDeltaFiles);
} else {
// CREATE case: single delta at version 0
checkArgument(
newlyCommittedDeltaFiles.size() == 1,
"CREATE transaction must have exactly one delta, got %d",
newlyCommittedDeltaFiles.size());
final ParsedDeltaData firstDelta = newlyCommittedDeltaFiles.get(0);

checkArgument(
firstDelta.getVersion() == 0,
"CREATE transaction requires version 0, got %s",
firstDelta.getVersion());

return LogSegment.fromSingleDelta(new Path(dataPath, "_delta_log"), firstDelta);
}
}

//////////////////////////////////
// Member methods and variables //
//////////////////////////////////

private final Path logPath;
private final Path dataPath;
private final long version;
Expand All @@ -71,7 +183,8 @@ public SnapshotImpl(
Protocol protocol,
Metadata metadata,
Committer committer,
SnapshotQueryContext snapshotContext) {
SnapshotQueryContext snapshotContext,
Optional<Long> postCommitInCommitTimestampOpt) {
checkArgument(version >= 0, "A snapshot cannot have version < 0");
this.logPath = new Path(dataPath, "_delta_log");
this.dataPath = dataPath;
Expand All @@ -81,7 +194,7 @@ public SnapshotImpl(
this.protocol = requireNonNull(protocol);
this.metadata = requireNonNull(metadata);
this.committer = committer;
this.inCommitTimestampOpt = Optional.empty();
this.inCommitTimestampOpt = postCommitInCommitTimestampOpt;

// We create the actual Snapshot report lazily (on first access) instead of eagerly in this
// constructor because some Snapshot metrics, like {@link
Expand Down Expand Up @@ -265,4 +378,5 @@ public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
public Optional<Long> getLatestTransactionVersion(Engine engine, String applicationId) {
return logReplay.getLatestTransactionIdentifier(engine, applicationId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private SnapshotImpl createSnapshot(
snapshotContext.getSnapshotMetrics());

final SnapshotImpl snapshot =
new SnapshotImpl(
SnapshotImpl.createInitialSnapshot(
tablePath,
initSegment.getVersion(),
new Lazy<>(() -> initSegment),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshot
final Lazy<LogSegment> lazyLogSegment = getLazyLogSegment(engine, snapshotCtx, versionToLoad);
final LogReplay logReplay = getLogReplay(engine, lazyLogSegment, snapshotCtx);

return new SnapshotImpl(
return SnapshotImpl.createInitialSnapshot(
tablePath,
versionToLoad.orElseGet(() -> lazyLogSegment.get().getVersion()),
lazyLogSegment,
Expand Down
Loading