Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion commonspace/headsync/headstorage/headstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
DeletedStatusKey = "d"
derivedStatusKey = "r"
parentIdKey = "p"
lastAddSeqKey = "q"
HeadsCollectionName = "heads"
)

Expand All @@ -37,6 +38,7 @@ type HeadsEntry struct {
DeletedStatus DeletedStatus
IsDerived bool
ParentId string
LastAddSeq uint64
}

type HeadsUpdate struct {
Expand All @@ -46,6 +48,7 @@ type HeadsUpdate struct {
DeletedStatus *DeletedStatus
IsDerived *bool
ParentId *string
LastAddSeq *uint64
}

type EntryIterator func(entry HeadsEntry) (bool, error)
Expand All @@ -59,6 +62,7 @@ type HeadStorage interface {
IterateEntries(ctx context.Context, iterOpts IterOpts, iter EntryIterator) error
GetEntry(ctx context.Context, id string) (HeadsEntry, error)
GetEntriesByParentId(ctx context.Context, parentId string) ([]HeadsEntry, error)
MaxLastAddSeq(ctx context.Context) (uint64, error)
DeleteEntry(ctx context.Context, id string) error
UpdateEntry(ctx context.Context, update HeadsUpdate) error
}
Expand Down Expand Up @@ -94,10 +98,17 @@ func New(ctx context.Context, store anystore.DB) (HeadStorage, error) {
Fields: []string{parentIdKey},
Sparse: true,
}
lastAddSeqIdx := anystore.IndexInfo{
Name: lastAddSeqKey,
Fields: []string{lastAddSeqKey},
}
if err := st.headsColl.EnsureIndex(ctx, deletedIdx); err != nil {
return nil, err
}
return st, st.headsColl.EnsureIndex(ctx, parentIdx)
if err := st.headsColl.EnsureIndex(ctx, parentIdx); err != nil {
return nil, err
}
return st, st.headsColl.EnsureIndex(ctx, lastAddSeqIdx)
}

func (h *headStorage) AddObserver(observer Observer) {
Expand Down Expand Up @@ -189,13 +200,35 @@ func (h *headStorage) UpdateEntry(ctx context.Context, update HeadsUpdate) (err
}
}

if update.LastAddSeq != nil {
if storeutil.ModifyKey(v, lastAddSeqKey, a.NewNumberInt(int(*update.LastAddSeq))) {
modified = true
}
}

resultEntry = entryFromVal(v)
return v, modified, nil
})
modResult, err = h.headsColl.UpsertId(ctx, update.Id, mod)
return
}

func (h *headStorage) MaxLastAddSeq(ctx context.Context) (uint64, error) {
iter, err := h.headsColl.Find(nil).Sort("-" + lastAddSeqKey).Limit(1).Iter(ctx)
if err != nil {
return 0, err
}
defer iter.Close()
if iter.Next() {
doc, err := iter.Doc()
if err != nil {
return 0, err
}
return uint64(doc.Value().GetInt(lastAddSeqKey)), nil
}
return 0, nil
}

func (h *headStorage) DeleteEntry(ctx context.Context, id string) error {
return h.headsColl.DeleteId(ctx, id)
}
Expand Down Expand Up @@ -227,5 +260,6 @@ func entryFromVal(val *anyenc.Value) HeadsEntry {
DeletedStatus: DeletedStatus(val.GetInt(DeletedStatusKey)),
IsDerived: val.GetBool(derivedStatusKey),
ParentId: val.GetString(parentIdKey),
LastAddSeq: uint64(val.GetInt(lastAddSeqKey)),
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions commonspace/object/tree/objecttree/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Change struct {
IsNew bool
ParentId string
OrderId string
AddSeq uint64 // Space-global apply sequence from storage

// SnapshotCounter is the number of previous snapshots in the current branch. For the first snapshot it's zero, for all next changes
// it increases every time a new snapshot is created. You can think of it as depth of the node in a snapshots tree.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions commonspace/object/tree/objecttree/objecttree.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (ot *objectTree) AddContentWithValidator(ctx context.Context, content Signa
if err != nil {
panic(err)
}
err = ot.storage.AddAll(ctx, []StorageChange{storageChange}, ot.Heads(), ot.tree.root.Id)
added := []StorageChange{storageChange}
err = ot.storage.AddAll(ctx, added, ot.Heads(), ot.tree.root.Id)
if err != nil {
return
}
Expand All @@ -301,7 +302,7 @@ func (ot *objectTree) AddContentWithValidator(ctx context.Context, content Signa
res = AddResult{
OldHeads: oldHeads,
Heads: []string{objChange.Id},
Added: []StorageChange{storageChange},
Added: added,
Mode: mode,
}
log.With("treeId", ot.id).With("head", objChange.Id).
Expand Down
11 changes: 11 additions & 0 deletions commonspace/object/tree/objecttree/objecttree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
aStore, err := CreateStorage(ctx, root, aHeadsStorage, store)
require.NoError(t, err)
initTestAddSeq(aStore)
aTree, err := BuildKeyFilterableObjectTree(aStore, aAccount.Acl)
require.NoError(t, err)
err = aTree.Delete()
Expand Down Expand Up @@ -298,6 +299,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
aStore, err := CreateStorage(ctx, root, aHeadsStorage, storeA)
require.NoError(t, err)
initTestAddSeq(aStore)
aTree, err := BuildKeyFilterableObjectTree(aStore, aAccount.Acl)
require.NoError(t, err)
_, err = aTree.AddContent(ctx, SignableChangeContent{
Expand All @@ -313,6 +315,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
bStore, err := NewStorage(ctx, root.Id, bHeadsStorage, storeB)
require.NoError(t, err)
initTestAddSeq(bStore)
bTree, err := BuildKeyFilterableObjectTree(bStore, bAccount.Acl)
require.NoError(t, err)
err = exec.Execute("a.remove::b")
Expand Down Expand Up @@ -408,6 +411,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
treeStorage, err := CreateStorage(ctx, root, headStorage, store)
require.NoError(t, err)
initTestAddSeq(treeStorage)
_, err = BuildKeyFilterableObjectTree(treeStorage, beforeAcl)
require.True(t, errors.Is(err, list.ErrNoSuchRecord))
})
Expand Down Expand Up @@ -443,6 +447,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
aStore, err := CreateStorage(ctx, root, aHeadsStorage, storeA)
require.NoError(t, err)
initTestAddSeq(aStore)
aTree, err := BuildKeyFilterableObjectTree(aStore, aAccount.Acl)
require.NoError(t, err)
_, err = aTree.AddContent(ctx, SignableChangeContent{
Expand All @@ -458,6 +463,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
bStore, err := NewStorage(ctx, root.Id, bHeadsStorage, storeB)
require.NoError(t, err)
initTestAddSeq(bStore)
// copying old version of storage
prevAclRecs, err := bAccount.Acl.RecordsAfter(ctx, "")
require.NoError(t, err)
Expand Down Expand Up @@ -510,6 +516,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
storage, err := CreateStorage(ctx, root, headsStorage, store)
require.NoError(t, err)
initTestAddSeq(storage)
oTree, err := BuildObjectTree(storage, aclList)
require.NoError(t, err)

Expand Down Expand Up @@ -583,6 +590,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
storage, err := CreateStorage(ctx, root, headsStorage, store)
require.NoError(t, err)
initTestAddSeq(storage)
oTree, err := BuildObjectTree(storage, aclList)
require.NoError(t, err)
emptyDataTreeDeps = nonVerifiableTreeDeps
Expand All @@ -608,6 +616,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
storage, err := CreateStorage(ctx, root, headsStorage, store)
require.NoError(t, err)
initTestAddSeq(storage)
oTree, err := BuildObjectTree(storage, aclList)
require.NoError(t, err)
validateStore := createStore(ctx, t)
Expand Down Expand Up @@ -645,6 +654,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
storage, err := CreateStorage(ctx, root, headsStorage, store)
require.NoError(t, err)
initTestAddSeq(storage)
oTree, err := BuildObjectTree(storage, aclList)
require.NoError(t, err)
_, err = oTree.AddContent(ctx, SignableChangeContent{
Expand Down Expand Up @@ -695,6 +705,7 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err)
storage, err := CreateStorage(ctx, root, headsStorage, store)
require.NoError(t, err)
initTestAddSeq(storage)
oTree, err := BuildObjectTree(storage, aclList)
require.NoError(t, err)
_, err = oTree.AddContent(ctx, SignableChangeContent{
Expand Down
19 changes: 17 additions & 2 deletions commonspace/object/tree/objecttree/objecttreevalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package objecttree
import (
"context"
"fmt"
"sync/atomic"

anystore "github.com/anyproto/any-store"

Expand All @@ -21,20 +22,34 @@ type tempTreeStorageCreator struct {
store anystore.DB
}

// noopAddSeq is a zero-valued counter for ephemeral validation storages
// that don't participate in incremental sync.
var noopAddSeq atomic.Uint64

func (t *tempTreeStorageCreator) CreateStorageWithDeferredCreation(ctx context.Context, payload treestorage.TreeStorageCreatePayload) (Storage, error) {
headStorage, err := headstorage.New(ctx, t.store)
if err != nil {
return nil, err
}
return CreateStorageWithDeferredCreation(ctx, payload.RootRawChange, headStorage, t.store)
st, err := CreateStorageWithDeferredCreation(ctx, payload.RootRawChange, headStorage, t.store)
if err != nil {
return nil, err
}
st.(*storageDeferredCreation).SetAddSeq(&noopAddSeq)
return st, nil
}

func (t *tempTreeStorageCreator) CreateTreeStorage(ctx context.Context, payload treestorage.TreeStorageCreatePayload) (Storage, error) {
headStorage, err := headstorage.New(ctx, t.store)
if err != nil {
return nil, err
}
return CreateStorage(ctx, payload.RootRawChange, headStorage, t.store)
st, err := CreateStorage(ctx, payload.RootRawChange, headStorage, t.store)
if err != nil {
return nil, err
}
st.(*storage).SetAddSeq(&noopAddSeq)
return st, nil
}

type ValidatorFunc func(payload treestorage.TreeStorageCreatePayload, storageCreator TreeStorageCreator, aclList list.AclList) (ret ObjectTree, err error)
Expand Down
Loading
Loading