Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions catalog/glue/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,11 +1148,7 @@ func TestAlterTableIntegration(t *testing.T) {
}
newFields := append(currentSchema.Fields(), addField) // add column 'new_col'
newFields = append(newFields[:1], newFields[2:]...) // drop column 'bar'
updateColumns := table.NewAddSchemaUpdate(
iceberg.NewSchemaWithIdentifiers(newSchemaId, currentSchema.IdentifierFieldIDs, newFields...),
addField.ID,
false,
)
updateColumns := table.NewAddSchemaUpdate(iceberg.NewSchemaWithIdentifiers(newSchemaId, currentSchema.IdentifierFieldIDs, newFields...))
setSchema := table.NewSetCurrentSchemaUpdate(newSchemaId)

_, _, err = ctlg.CommitTable(
Expand Down
16 changes: 13 additions & 3 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,14 @@ func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilde
}
}()

currentSchema, currentSpec := meta.CurrentSchema(), meta.CurrentSpec()
partitionSpec, err := meta.CurrentSpec()
if err != nil || partitionSpec == nil {
yield(nil, fmt.Errorf("%w: cannot add files without a current spec", err))

return
}

currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec

for filePath := range paths {
format := tblutils.FormatFromFileName(filePath)
Expand Down Expand Up @@ -1287,9 +1294,12 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata
if err != nil {
panic(err)
}

currentSpec, err := meta.CurrentSpec()
if err != nil || currentSpec == nil {
panic(fmt.Errorf("%w: cannot write files without a current spec", err))
}
nextCount, stopCount := iter.Pull(args.counter)
if meta.CurrentSpec().IsUnpartitioned() {
if currentSpec.IsUnpartitioned() {
tasks := func(yield func(WriteTask) bool) {
defer stopCount()

Expand Down
78 changes: 54 additions & 24 deletions table/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type MetadataBuilder struct {

// >v1 specific
lastSequenceNumber *int64
// update tracking
lastAddedSchemaID *int
}

func NewMetadataBuilder() (*MetadataBuilder, error) {
Expand Down Expand Up @@ -190,7 +192,8 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
b.schemaList = slices.Clone(metadata.Schemas())
b.currentSchemaID = metadata.CurrentSchema().ID
b.specs = slices.Clone(metadata.PartitionSpecs())
b.defaultSpecID = metadata.DefaultPartitionSpec()
defaultSpecID := metadata.DefaultPartitionSpec()
b.defaultSpecID = defaultSpecID
b.lastPartitionID = metadata.LastPartitionSpecID()
b.props = maps.Clone(metadata.Properties())
b.snapshotList = slices.Clone(metadata.Snapshots())
Expand All @@ -214,8 +217,8 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {

func (b *MetadataBuilder) HasChanges() bool { return len(b.updates) > 0 }

func (b *MetadataBuilder) CurrentSpec() iceberg.PartitionSpec {
return b.specs[b.defaultSpecID]
func (b *MetadataBuilder) CurrentSpec() (*iceberg.PartitionSpec, error) {
return b.GetSpecByID(b.defaultSpecID)
}

func (b *MetadataBuilder) CurrentSchema() *iceberg.Schema {
Expand Down Expand Up @@ -257,21 +260,30 @@ func (b *MetadataBuilder) currentSnapshot() *Snapshot {
return s
}

func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID int, initial bool) (*MetadataBuilder, error) {
func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) (*MetadataBuilder, error) {
newLastColumnID := schema.HighestFieldID()
if newLastColumnID < b.lastColumnId {
return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
}

var schemas []*iceberg.Schema
if initial {
schemas = []*iceberg.Schema{schema}
} else {
schemas = append(b.schemaList, schema)
newSchemaID := b.reuseOrCreateNewSchemaID(schema)

if _, err := b.GetSchemaByID(newSchemaID); err == nil {
if b.lastAddedSchemaID == nil || *b.lastAddedSchemaID != newSchemaID {
b.updates = append(b.updates, NewAddSchemaUpdate(schema))
b.lastAddedSchemaID = &newSchemaID
}

return b, nil
}

b.lastColumnId = newLastColumnID
b.schemaList = schemas
b.updates = append(b.updates, NewAddSchemaUpdate(schema, newLastColumnID, initial))
b.lastColumnId = max(b.lastColumnId, schema.HighestFieldID())

schema.ID = newSchemaID

b.schemaList = append(b.schemaList, schema)
b.updates = append(b.updates, NewAddSchemaUpdate(schema))
b.lastAddedSchemaID = &newSchemaID

return b, nil
}
Expand Down Expand Up @@ -384,14 +396,10 @@ func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, err

func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) (*MetadataBuilder, error) {
if currentSchemaID == -1 {
currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) int {
return s.ID
})
if !slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSchema && u.(*addSchemaUpdate).Schema.ID == currentSchemaID
}) {
if b.lastAddedSchemaID == nil {
return nil, errors.New("can't set current schema to last added schema, no schema has been added")
}
currentSchemaID = *b.lastAddedSchemaID
}

if currentSchemaID == b.currentSchemaID {
Expand Down Expand Up @@ -641,7 +649,12 @@ func (b *MetadataBuilder) SetLastUpdatedMS() *MetadataBuilder {
return b
}

func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
func (b *MetadataBuilder) buildCommonMetadata() (*commonMetadata, error) {
if _, err := b.GetSpecByID(b.defaultSpecID); err != nil {
return nil, fmt.Errorf("defaultSpecID is invalid: %w", err)
}
defaultSpecID := b.defaultSpecID

if b.lastUpdatedMS == 0 {
b.lastUpdatedMS = time.Now().UnixMilli()
}
Expand All @@ -655,7 +668,7 @@ func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
SchemaList: b.schemaList,
CurrentSchemaID: b.currentSchemaID,
Specs: b.specs,
DefaultSpecID: b.defaultSpecID,
DefaultSpecID: defaultSpecID,
LastPartitionID: b.lastPartitionID,
Props: b.props,
SnapshotList: b.snapshotList,
Expand All @@ -665,7 +678,7 @@ func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
SortOrderList: b.sortOrderList,
DefaultSortOrderID: b.defaultSortOrderID,
SnapshotRefs: b.refs,
}
}, nil
}

func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
Expand Down Expand Up @@ -736,7 +749,10 @@ func (b *MetadataBuilder) AppendMetadataLog(entry MetadataLogEntry) *MetadataBui
}

func (b *MetadataBuilder) Build() (Metadata, error) {
common := b.buildCommonMetadata()
common, err := b.buildCommonMetadata()
if err != nil {
return nil, err
}
if err := common.validate(); err != nil {
return nil, err
}
Expand All @@ -748,7 +764,7 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
return nil, fmt.Errorf("can't build metadata, missing schema for schema ID %d: %w", b.currentSchemaID, err)
}

partition, err := b.GetSpecByID(b.defaultSpecID)
partition, err := b.GetSpecByID(common.DefaultSpecID)
if err != nil {
return nil, fmt.Errorf("can't build metadata, missing partition spec for spec ID %d: %w", b.defaultSpecID, err)
}
Expand Down Expand Up @@ -777,10 +793,24 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
}, nil

default:
panic("unreachable: invalid format version")
return nil, fmt.Errorf("unknown format version %d", b.formatVersion)
}
}

func (b *MetadataBuilder) reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) int {
newSchemaID := newSchema.ID
for _, schema := range b.schemaList {
if newSchema.Equals(schema) {
return schema.ID
}
if schema.ID >= newSchemaID {
newSchemaID = schema.ID + 1
}
}

return newSchemaID
}

// maxBy returns the maximum value of extract(e) for all e in elems.
// If elems is empty, returns 0.
func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int {
Expand Down
71 changes: 71 additions & 0 deletions table/metadata_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,77 @@ func TestMetadataBuilderSetDefaultSpecIDLastPartition(t *testing.T) {
assert.Equal(t, 0, builder.defaultSpecID)
}

func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
builder, err := NewMetadataBuilder()
assert.NoError(t, err)
_, err = builder.SetFormatVersion(2)
assert.NoError(t, err)
schema := iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true},
)
_, err = builder.AddSchema(schema)
assert.NoError(t, err)
_, err = builder.SetCurrentSchemaID(-1)
assert.NoError(t, err)

partitionSpec := iceberg.NewPartitionSpecID(0)
_, err = builder.AddPartitionSpec(&partitionSpec, false)
assert.NoError(t, err)

_, err = builder.SetDefaultSpecID(-1)
assert.NoError(t, err)

meta, err := builder.Build()
assert.NoError(t, err)
assert.Equal(t, schema.ID, meta.CurrentSchema().ID)
assert.True(t, schema.Equals(meta.CurrentSchema()))
}

func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) {
builder, err := NewMetadataBuilder()
assert.NoError(t, err)
_, err = builder.SetFormatVersion(2)
assert.NoError(t, err)
schema := iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true},
)
_, err = builder.AddSchema(schema)
assert.NoError(t, err)

schema = iceberg.NewSchema(3,
iceberg.NestedField{ID: 3, Name: "foo", Type: iceberg.StringType{}, Required: true},
)
_, err = builder.AddSchema(schema)
assert.NoError(t, err)

schema = iceberg.NewSchema(2,
iceberg.NestedField{ID: 4, Name: "foo", Type: iceberg.StringType{}, Required: true},
)
_, err = builder.AddSchema(schema)
assert.NoError(t, err)

assert.Equal(t, 1, builder.schemaList[0].ID)
assert.Equal(t, 3, builder.schemaList[1].ID)
assert.Equal(t, 4, builder.schemaList[2].ID)
}

func TestMetadataBuilderReuseSchema(t *testing.T) {
builder, err := NewMetadataBuilder()
assert.NoError(t, err)
schema := iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true},
)
_, err = builder.AddSchema(schema)
assert.NoError(t, err)
schema2 := iceberg.NewSchema(15,
iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true},
)
_, err = builder.AddSchema(schema2)
assert.NoError(t, err)
assert.Equal(t, len(builder.schemaList), 1)
assert.Equal(t, *builder.lastAddedSchemaID, 1)
}

func TestMetadataV1Validation(t *testing.T) {
// Test case 1: JSON with no last-column-id field
noColumnID := `{
Expand Down
14 changes: 10 additions & 4 deletions table/snapshot_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,12 @@ func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) {
defer out.Close()

counter := &internal.CountingWriter{W: out}

currentSpec, err := sp.txn.meta.CurrentSpec()
if err != nil || currentSpec == nil {
return fmt.Errorf("could not get current partition spec: %w", err)
}
wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter,
sp.txn.meta.CurrentSpec(), sp.txn.meta.CurrentSchema(),
*currentSpec, sp.txn.meta.CurrentSchema(),
sp.snapshotID)
if err != nil {
return err
Expand Down Expand Up @@ -615,9 +618,12 @@ func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) {
ssc.setPartitionSummaryLimit(partitionSummaryLimit)

currentSchema := sp.txn.meta.CurrentSchema()
partitionSpec := sp.txn.meta.CurrentSpec()
partitionSpec, err := sp.txn.meta.CurrentSpec()
if err != nil || partitionSpec == nil {
return Summary{}, fmt.Errorf("could not get current partition spec: %w", err)
}
for _, df := range sp.addedFiles {
ssc.addFile(df, currentSchema, partitionSpec)
ssc.addFile(df, currentSchema, *partitionSpec)
}

if len(sp.deletedFiles) > 0 {
Expand Down
19 changes: 7 additions & 12 deletions table/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,25 +180,20 @@ func (u *upgradeFormatVersionUpdate) Apply(builder *MetadataBuilder) error {

type addSchemaUpdate struct {
baseUpdate
Schema *iceberg.Schema `json:"schema"`
LastColumnID int `json:"last-column-id"`
initial bool
Schema *iceberg.Schema `json:"schema"`
initial bool
}

// NewAddSchemaUpdate creates a new update that adds the given schema and last column ID to
// the table metadata. If the initial flag is set to true, the schema is considered the initial
// schema of the table, and all previously added schemas in the metadata builder are removed.
func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial bool) *addSchemaUpdate {
// NewAddSchemaUpdate creates a new update that adds the given schema and updates the lastColumnID based on the schema.
func NewAddSchemaUpdate(schema *iceberg.Schema) *addSchemaUpdate {
return &addSchemaUpdate{
baseUpdate: baseUpdate{ActionName: UpdateAddSchema},
Schema: schema,
LastColumnID: lastColumnID,
initial: initial,
baseUpdate: baseUpdate{ActionName: UpdateAddSchema},
Schema: schema,
}
}

func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial)
_, err := builder.AddSchema(u.Schema)

return err
}
Expand Down
9 changes: 3 additions & 6 deletions table/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,9 @@ func TestUnmarshalUpdates(t *testing.T) {
}
]`),
expected: Updates{
NewAddSchemaUpdate(
iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true},
), 1, false,
),
NewAddSchemaUpdate(iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true},
)),
NewAddPartitionSpecUpdate(
&spec, false),
NewAddSortOrderUpdate(&sortOrder, false),
Expand Down Expand Up @@ -202,7 +200,6 @@ func TestUnmarshalUpdates(t *testing.T) {
actualAddSchema := actual[idx].(*addSchemaUpdate)
assert.True(t, expectedAddSchema.Schema.Equals(actualAddSchema.Schema))
assert.Equal(t, actualAddSchema.initial, expectedAddSchema.initial)
assert.Equal(t, actualAddSchema.LastColumnID, expectedAddSchema.LastColumnID)
case "add-partition-spec":
expectedAddPartitionSpec := u.(*addPartitionSpecUpdate)
actualAddPartitionSpec := actual[idx].(*addPartitionSpecUpdate)
Expand Down
Loading