Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: compute makeCacheItemFromFilePath via worker pool
  • Loading branch information
tstelzer committed Jul 8, 2022
commit a53243eb899e0f916291514842f6c9698ff22319
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { FetchDataError } from '../errors/index.js'
import type { Flags } from '../index.js'
import type { ContentTypeMap, FilePathPatternMap } from '../types.js'
import { DocumentTypeMap, provideDocumentTypeMapState } from './DocumentTypeMap.js'
import { makeCacheItemFromFilePath } from './makeCacheItemFromFilePath.js'
import { fromWorkerPool } from './makeCacheItemFromFilePath.worker.js'
// import {createPool} from './makeCacheItemFromFilePath.worker.js'

export const fetchAllDocuments = ({
Expand Down Expand Up @@ -45,7 +45,7 @@ export const fetchAllDocuments = ({

const concurrencyLimit = os.cpus().length

// TODO createPool
const makeCacheItemFromFilePath = fromWorkerPool()
const { dataErrors, documents, documentTypeMap } = yield* $(
pipe(
allRelativeFilePaths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { FetchDataError } from '../errors/index.js'
import type { ContentTypeMap, FilePathPatternMap } from '../types.js'
import { makeAndProvideDocumentContext } from './DocumentContext.js'
import type { DocumentTypeName, HasDocumentTypeMapState } from './DocumentTypeMap.js'
import { DocumentTypeMapState } from './DocumentTypeMap.js'
import { makeDocument } from './mapping.js'
import type { RawContent, RawContentJSON, RawContentMarkdown, RawContentMDX, RawContentYAML } from './types.js'
import { validateDocumentData } from './validateDocumentData.js'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,128 +1,59 @@
import type * as core from '@contentlayer/core'
import type { PosixFilePath } from '@contentlayer/utils'
import { provideDummyTracing } from '@contentlayer/utils'
import type { HasConsole, OT, These } from '@contentlayer/utils/effect'
import { pipe, provideConsole, T } from '@contentlayer/utils/effect'
import type { _A, _E } from '@effect-ts/core/Utils'
import Pool from 'piscina'

import type { FetchDataError } from '../errors/index.js'
import type { ContentTypeMap, FilePathPatternMap } from '../types.js'
import type { HasDocumentTypeMapState } from './DocumentTypeMap.js'
import { provideDocumentTypeMapState } from './DocumentTypeMap.js'
import * as _ from './makeCacheItemFromFilePath.js'

export type Input = {
relativeFilePath: PosixFilePath
filePathPatternMap: FilePathPatternMap
coreSchemaDef: core.SchemaDef
contentDirPath: PosixFilePath
options: core.PluginOptions
previousCache: core.DataCache.Cache | undefined
contentTypeMap: ContentTypeMap
}
type F = typeof _.makeCacheItemFromFilePath

export type Left<E, A> = { _tag: 'left'; value: E }
export type Right<E, A> = { _tag: 'right'; value: A }
export type Either<E, A> = Left<E, A> | Right<E, A>

type DTO = Either<_E<ReturnType<F>>, _A<ReturnType<F>>>

// FIXME: naming
// This runs on the host, what is passed into the worker at `pool.run` has to
// be serializable.
export function createPool() {
export function fromWorkerPool(): F {
// I believe, by default, #workers = #cpu cores, which is probably what we want?
const pool = new Pool({
// FIXME: get path dynamically
filename:
'/home/ts/dev/code/contentlayer/packages/@contentlayer/source-files/dist/fetchData/makeCacheItemFromFilePath.worker.js',
})

return (
payload: Input,
): T.Effect<
OT.HasTracer & HasConsole & HasDocumentTypeMapState,
never,
These.These<FetchDataError.FetchDataError, core.DataCache.CacheItem>
> =>
return (payload) =>
pipe(
T.promise(() => pool.run(payload, { name: 'makeCacheItemFromFilePath' })),
T.succeedWith(() => JSON.stringify(payload)),
T.chain((value) => T.promise<string>(() => pool.run(value, { name: 'makeCacheItemFromFilePath' }))),
T.chain((value) => T.succeedWith<DTO>(() => JSON.parse(value))),
T.chain(({ _tag, value }) =>
T.if_(
_tag === 'right',
() => T.succeed(value),
// FIXME: effect
() => T.succeedWith(() => value),
// FIXME: Signature claims it doesn't fail.
() => T.die(value),
),
),
)
}

// This runs in the worker, with the input coming via the "wire" from the host,
// the return value has to be serializable.
export function makeCacheItemFromFilePath(payload: Input) {
// This runs IN THE WORKER, with the input coming via "wire" from the host.
// Since input and output cross a process boundary, input has to be de-serialized, output has to be serialized.
export function makeCacheItemFromFilePath(payload: string): Promise<string> {
return pipe(
_.makeCacheItemFromFilePath(payload),
// T.fold(
// value => ({ _tag: 'left', value } as const),
// value => ({ _tag: 'right', value } as const)
// ),
provideDocumentTypeMapState,
T.succeedWith<Parameters<F>[0]>(() => JSON.parse(payload)),
T.chain(_.makeCacheItemFromFilePath),
T.fold(
(value) => ({ _tag: 'left', value } as const),
(value) => ({ _tag: 'right', value } as const),
),
provideConsole,
provideDummyTracing,
T.runPromise,
(p) => p.then((value) => JSON.stringify(value)),
)
}

// const serialized = yield* $(
// pipe(
// SF.DocumentTypeMapState.update((_) => _.add('foo', posixFilePath('bar'))),
// T.chain(() => DocumentTypeMapState.get),
// T.tap((map) => {
// console.log('start', map)
// return T.unit
// }),
// T.chain((map) =>
// T.succeedWith(() => {
// const r: Record<string, PosixFilePath[]> = {}
// for (const [k, v] of map.map) {
// r[k] = v
// }
// return JSON.stringify(r)
// }),
// ),
// T.tap((serialized) => {
// console.log('serialized', serialized)
// return T.unit
// }),
// SF.provideDocumentTypeMapState,
// ),
// )
//
// // worker input
// const layer = yield* $(
// pipe(
// T.succeedWith(() => {
// const i: Record<string, PosixFilePath[]> = JSON.parse(serialized)
// return Object.entries(i).reduce((map, [key, value]) => {
// return HashMap.set(key, value)(map)
// }, HashMap.make<string, PosixFilePath[]>())
// }),
// T.tap((deserialized) => {
// console.log('deserialized', deserialized)
// return T.unit
// }),
// T.map((map) => new SF.DocumentTypeMap({ map })),
// T.tap((instantiated) => {
// console.log('instantiated', instantiated)
// return T.unit
// }),
// ),
// )
//
// // worker -> host
// yield* $(
// pipe(
// SF.DocumentTypeMapState.update((_) => _.add('qux', posixFilePath('baz'))),
// T.chain(() => SF.DocumentTypeMapState.get),
// T.tap((end) => {
// console.log('end', end)
// return T.unit
// }),
// T.provideSomeLayer(DocumentTypeMapState.Live(layer)),
// ),
// )