Skip to content

Commit 4dbb136

Browse files
authored
feat!: streaming blockstores (#358)
Changes the blockstore interface to return (and accept) streams of `Uint8Array`s instead of individual `Uint8Array`s. This is to enable future work on large blocks (for streaming hashes) and also to reduce memory usage since we can stream from the filesystem in smaller chunks (typically 64KiB) rather than 256KiB at a time. BREAKING CHANGE: blockstore.get and similar now return streams of bytes
1 parent edb5a1f commit 4dbb136

File tree

41 files changed

+391
-237
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+391
-237
lines changed

packages/blockstore-core/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,16 @@
187187
"@libp2p/logger": "^6.0.0",
188188
"interface-blockstore": "^5.0.0",
189189
"interface-store": "^6.0.0",
190+
"it-all": "^3.0.9",
190191
"it-filter": "^3.1.3",
191192
"it-merge": "^3.0.11",
192193
"multiformats": "^13.3.6"
193194
},
194195
"devDependencies": {
195196
"aegir": "^47.0.16",
196197
"interface-blockstore-tests": "^7.0.0",
197-
"it-all": "^3.0.8",
198198
"it-drain": "^3.0.9",
199+
"it-to-buffer": "^4.0.10",
199200
"uint8arrays": "^5.1.0"
200201
}
201202
}
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
1-
import type { Blockstore, Pair } from 'interface-blockstore'
2-
import type { AbortOptions, Await, AwaitIterable } from 'interface-store'
1+
import type { Blockstore, InputPair, Pair } from 'interface-blockstore'
2+
import type { AbortOptions, Await, AwaitGenerator, AwaitIterable } from 'interface-store'
33
import type { CID } from 'multiformats/cid'
44

55
export class BaseBlockstore implements Blockstore {
66
has (key: CID, options?: AbortOptions): Await<boolean> {
77
return Promise.reject(new Error('.has is not implemented'))
88
}
99

10-
put (key: CID, val: Uint8Array, options?: AbortOptions): Await<CID> {
10+
put (key: CID, val: Uint8Array | AwaitIterable<Uint8Array>, options?: AbortOptions): Await<CID> {
1111
return Promise.reject(new Error('.put is not implemented'))
1212
}
1313

14-
async * putMany (source: AwaitIterable<Pair>, options?: AbortOptions): AwaitIterable<CID> {
15-
for await (const { cid, block } of source) {
16-
await this.put(cid, block, options)
14+
async * putMany (source: AwaitIterable<InputPair>, options?: AbortOptions): AwaitGenerator<CID> {
15+
for await (const { cid, bytes } of source) {
16+
await this.put(cid, bytes, options)
1717
yield cid
1818
}
1919
}
2020

21-
get (key: CID, options?: AbortOptions): Await<Uint8Array> {
22-
return Promise.reject(new Error('.get is not implemented'))
21+
get (key: CID, options?: AbortOptions): AwaitGenerator<Uint8Array> {
22+
throw new Error('.get is not implemented')
2323
}
2424

25-
async * getMany (source: AwaitIterable<CID>, options?: AbortOptions): AwaitIterable<Pair> {
25+
async * getMany (source: AwaitIterable<CID>, options?: AbortOptions): AwaitGenerator<Pair> {
2626
for await (const key of source) {
2727
yield {
2828
cid: key,
29-
block: await this.get(key, options)
29+
bytes: this.get(key, options)
3030
}
3131
}
3232
}
@@ -35,7 +35,7 @@ export class BaseBlockstore implements Blockstore {
3535
return Promise.reject(new Error('.delete is not implemented'))
3636
}
3737

38-
async * deleteMany (source: AwaitIterable<CID>, options?: AbortOptions): AwaitIterable<CID> {
38+
async * deleteMany (source: AwaitIterable<CID>, options?: AbortOptions): AwaitGenerator<CID> {
3939
for await (const key of source) {
4040
await this.delete(key, options)
4141
yield key
@@ -45,7 +45,7 @@ export class BaseBlockstore implements Blockstore {
4545
/**
4646
* Extending classes should override `query` or implement this method
4747
*/
48-
async * getAll (options?: AbortOptions): AwaitIterable<Pair> { // eslint-disable-line require-yield
48+
async * getAll (options?: AbortOptions): AwaitGenerator<Pair> { // eslint-disable-line require-yield
4949
throw new Error('.getAll is not implemented')
5050
}
5151
}

packages/blockstore-core/src/black-hole.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
import { NotFoundError } from 'interface-store'
22
import { BaseBlockstore } from './base.js'
33
import type { Pair } from 'interface-blockstore'
4-
import type { AbortOptions, Await, AwaitIterable } from 'interface-store'
4+
import type { AbortOptions, Await, AwaitGenerator, AwaitIterable } from 'interface-store'
55
import type { CID } from 'multiformats/cid'
66

77
export class BlackHoleBlockstore extends BaseBlockstore {
8-
put (key: CID, value: Uint8Array, options?: AbortOptions): Await<CID> {
8+
put (key: CID, value: Uint8Array | AwaitIterable<Uint8Array>, options?: AbortOptions): Await<CID> {
99
options?.signal?.throwIfAborted()
1010
return key
1111
}
1212

13-
get (key: CID, options?: AbortOptions): Await<Uint8Array> {
13+
get (key: CID, options?: AbortOptions): AwaitGenerator<Uint8Array> {
1414
options?.signal?.throwIfAborted()
1515
throw new NotFoundError()
1616
}
@@ -25,7 +25,7 @@ export class BlackHoleBlockstore extends BaseBlockstore {
2525
}
2626

2727
// eslint-disable-next-line require-yield
28-
async * getAll (options?: AbortOptions): AwaitIterable<Pair> {
28+
async * getAll (options?: AbortOptions): AwaitGenerator<Pair> {
2929
options?.signal?.throwIfAborted()
3030
}
3131
}

packages/blockstore-core/src/identity.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { NotFoundError } from 'interface-store'
22
import { BaseBlockstore } from './base.js'
33
import type { Blockstore, Pair } from 'interface-blockstore'
4-
import type { AbortOptions, Await, AwaitIterable } from 'interface-store'
4+
import type { AbortOptions, Await, AwaitGenerator, AwaitIterable } from 'interface-store'
55
import type { CID } from 'multiformats/cid'
66

77
// https://github.com/multiformats/multicodec/blob/d06fc6194710e8909bac64273c43f16b56ca4c34/table.csv#L2
@@ -16,7 +16,7 @@ export class IdentityBlockstore extends BaseBlockstore {
1616
this.child = child
1717
}
1818

19-
put (key: CID, block: Uint8Array, options?: AbortOptions): Await<CID> {
19+
put (key: CID, block: Uint8Array | AwaitIterable<Uint8Array>, options?: AbortOptions): Await<CID> {
2020
if (key.multihash.code === IDENTITY_CODEC) {
2121
options?.signal?.throwIfAborted()
2222
return key
@@ -30,18 +30,19 @@ export class IdentityBlockstore extends BaseBlockstore {
3030
return this.child.put(key, block, options)
3131
}
3232

33-
get (key: CID, options?: AbortOptions): Await<Uint8Array> {
33+
* get (key: CID, options?: AbortOptions): AwaitGenerator<Uint8Array> {
3434
if (key.multihash.code === IDENTITY_CODEC) {
3535
options?.signal?.throwIfAborted()
36-
return key.multihash.digest
36+
yield key.multihash.digest
37+
return
3738
}
3839

3940
if (this.child == null) {
4041
options?.signal?.throwIfAborted()
4142
throw new NotFoundError()
4243
}
4344

44-
return this.child.get(key, options)
45+
yield * this.child.get(key, options)
4546
}
4647

4748
has (key: CID, options?: AbortOptions): Await<boolean> {
@@ -69,12 +70,11 @@ export class IdentityBlockstore extends BaseBlockstore {
6970
}
7071
}
7172

72-
getAll (options?: AbortOptions): AwaitIterable<Pair> {
73+
* getAll (options?: AbortOptions): AwaitGenerator<Pair> {
7374
if (this.child != null) {
74-
return this.child.getAll(options)
75+
yield * this.child.getAll(options)
7576
}
7677

7778
options?.signal?.throwIfAborted()
78-
return []
7979
}
8080
}

packages/blockstore-core/src/memory.ts

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,65 @@
11
import { NotFoundError } from 'interface-store'
2+
import all from 'it-all'
23
import { base32 } from 'multiformats/bases/base32'
34
import { CID } from 'multiformats/cid'
45
import * as raw from 'multiformats/codecs/raw'
56
import * as Digest from 'multiformats/hashes/digest'
67
import { BaseBlockstore } from './base.js'
78
import type { Pair } from 'interface-blockstore'
8-
import type { AbortOptions, Await, AwaitIterable } from 'interface-store'
9+
import type { AbortOptions, Await, AwaitGenerator, AwaitIterable } from 'interface-store'
10+
11+
function isPromise <T> (p?: any): p is Promise<T> {
12+
return typeof p?.then === 'function'
13+
}
914

1015
export class MemoryBlockstore extends BaseBlockstore {
11-
private readonly data: Map<string, Uint8Array>
16+
private readonly data: Map<string, Uint8Array[]>
1217

1318
constructor () {
1419
super()
1520

1621
this.data = new Map()
1722
}
1823

19-
put (key: CID, val: Uint8Array, options?: AbortOptions): Await<CID> {
24+
put (key: CID, val: Uint8Array | AwaitIterable<Uint8Array>, options?: AbortOptions): Await<CID> {
25+
options?.signal?.throwIfAborted()
26+
27+
let buf: Uint8Array[]
28+
29+
if (val instanceof Uint8Array) {
30+
buf = [val]
31+
} else {
32+
const result = all(val)
33+
34+
if (isPromise<Uint8Array[]>(result)) {
35+
return result.then(val => {
36+
return this._put(key, val, options)
37+
})
38+
} else {
39+
buf = result
40+
}
41+
}
42+
43+
return this._put(key, buf, options)
44+
}
45+
46+
private _put (key: CID, val: Uint8Array[], options?: AbortOptions): Await<CID> {
2047
options?.signal?.throwIfAborted()
48+
2149
this.data.set(base32.encode(key.multihash.bytes), val)
2250

2351
return key
2452
}
2553

26-
get (key: CID, options?: AbortOptions): Await<Uint8Array> {
54+
* get (key: CID, options?: AbortOptions): AwaitGenerator<Uint8Array> {
2755
options?.signal?.throwIfAborted()
2856
const buf = this.data.get(base32.encode(key.multihash.bytes))
2957

3058
if (buf == null) {
3159
throw new NotFoundError()
3260
}
3361

34-
return buf
62+
yield * buf
3563
}
3664

3765
has (key: CID, options?: AbortOptions): Await<boolean> {
@@ -44,13 +72,15 @@ export class MemoryBlockstore extends BaseBlockstore {
4472
this.data.delete(base32.encode(key.multihash.bytes))
4573
}
4674

47-
async * getAll (options?: AbortOptions): AwaitIterable<Pair> {
75+
* getAll (options?: AbortOptions): AwaitGenerator<Pair> {
4876
options?.signal?.throwIfAborted()
4977

5078
for (const [key, value] of this.data.entries()) {
5179
yield {
5280
cid: CID.createV1(raw.code, Digest.decode(base32.decode(key))),
53-
block: value
81+
bytes: (async function * () {
82+
yield * value
83+
})()
5484
}
5585
options?.signal?.throwIfAborted()
5686
}

packages/blockstore-core/src/tiered.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import { NotFoundError } from 'interface-store'
33
import filter from 'it-filter'
44
import merge from 'it-merge'
55
import { BaseBlockstore } from './base.js'
6-
import type { Blockstore, Pair } from 'interface-blockstore'
7-
import type { AbortOptions, AwaitIterable } from 'interface-store'
6+
import type { Blockstore, InputPair, Pair } from 'interface-blockstore'
7+
import type { AbortOptions, AwaitGenerator, AwaitIterable } from 'interface-store'
88
import type { CID } from 'multiformats/cid'
99

1010
const log = logger('blockstore:core:tiered')
@@ -24,7 +24,7 @@ export class TieredBlockstore extends BaseBlockstore {
2424
this.stores = stores.slice()
2525
}
2626

27-
async put (key: CID, value: Uint8Array, options?: AbortOptions): Promise<CID> {
27+
async put (key: CID, value: Uint8Array | AwaitIterable<Uint8Array>, options?: AbortOptions): Promise<CID> {
2828
await Promise.all(
2929
this.stores.map(async store => {
3030
await store.put(key, value, options)
@@ -34,16 +34,13 @@ export class TieredBlockstore extends BaseBlockstore {
3434
return key
3535
}
3636

37-
async get (key: CID, options?: AbortOptions): Promise<Uint8Array> {
37+
async * get (key: CID, options?: AbortOptions): AwaitGenerator<Uint8Array> {
3838
let error: Error | undefined
3939

4040
for (const store of this.stores) {
4141
try {
42-
const res = await store.get(key, options)
43-
44-
if (res != null) {
45-
return res
46-
}
42+
yield * store.get(key, options)
43+
return
4744
} catch (err: any) {
4845
error = err
4946
log.error(err)
@@ -71,21 +68,21 @@ export class TieredBlockstore extends BaseBlockstore {
7168
)
7269
}
7370

74-
async * putMany (source: AwaitIterable<Pair>, options: AbortOptions = {}): AsyncIterable<CID> {
71+
async * putMany (source: AwaitIterable<InputPair>, options: AbortOptions = {}): AwaitGenerator<CID> {
7572
for await (const pair of source) {
76-
await this.put(pair.cid, pair.block, options)
73+
await this.put(pair.cid, pair.bytes, options)
7774
yield pair.cid
7875
}
7976
}
8077

81-
async * deleteMany (source: AwaitIterable<CID>, options: AbortOptions = {}): AsyncIterable<CID> {
78+
async * deleteMany (source: AwaitIterable<CID>, options: AbortOptions = {}): AwaitGenerator<CID> {
8279
for await (const cid of source) {
8380
await this.delete(cid, options)
8481
yield cid
8582
}
8683
}
8784

88-
async * getAll (options?: AbortOptions): AwaitIterable<Pair> {
85+
async * getAll (options?: AbortOptions): AwaitGenerator<Pair> {
8986
// deduplicate yielded pairs
9087
const seen = new Set<string>()
9188

packages/blockstore-core/test/identity.spec.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import { expect } from 'aegir/chai'
44
import all from 'it-all'
55
import drain from 'it-drain'
6+
import toBuffer from 'it-to-buffer'
67
import { CID } from 'multiformats/cid'
78
import * as raw from 'multiformats/codecs/raw'
89
import { identity } from 'multiformats/hashes/identity'
@@ -20,13 +21,13 @@ describe('identity', () => {
2021
child = new MemoryBlockstore()
2122
})
2223

23-
it('has an identity CID', () => {
24+
it('has an identity CID', async () => {
2425
const block = Uint8Array.from([0, 1, 2, 3, 4])
2526
const multihash = identity.digest(block)
2627
const cid = CID.createV1(identity.code, multihash)
2728

2829
expect(blockstore.has(cid)).to.be.true()
29-
expect(blockstore.get(cid)).to.equalBytes(block)
30+
expect(toBuffer(await all(blockstore.get(cid)))).to.equalBytes(block)
3031
})
3132

3233
it('does not have a non-identity CID', async () => {
@@ -70,7 +71,7 @@ describe('identity', () => {
7071

7172
await blockstore.put(cid, block)
7273
expect(child.has(cid)).to.be.true()
73-
expect(child.get(cid)).to.equalBytes(block)
74+
expect(toBuffer(await all(child.get(cid)))).to.equalBytes(block)
7475
})
7576

7677
it('gets CIDs from child', async () => {
@@ -82,7 +83,7 @@ describe('identity', () => {
8283

8384
blockstore = new IdentityBlockstore(child)
8485
expect(blockstore.has(cid)).to.be.true()
85-
expect(blockstore.get(cid)).to.equalBytes(block)
86+
expect(toBuffer(await all(blockstore.get(cid)))).to.equalBytes(block)
8687
})
8788

8889
it('has CIDs from child', async () => {

0 commit comments

Comments
 (0)