-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathplugin.ts
More file actions
136 lines (118 loc) · 4.12 KB
/
plugin.ts
File metadata and controls
136 lines (118 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import type { Logger } from 'winston';
import { UnimplementedError, InitializationError } from '../errors/errors.js';
import type { SyncStream, ReadStream, WriteStream } from '../grpc/plugin.js';
import type { Table } from '../schema/table.js';
export type BackendOptions = {
tableName: string;
connection: string;
};
export type TableOptions = {
tables: string[];
skipTables: string[];
skipDependentTables: boolean;
};
export type SyncOptions = {
tables: string[];
skipTables: string[];
skipDependentTables: boolean;
deterministicCQId: boolean;
backendOptions: BackendOptions;
stream: SyncStream;
};
export type NewClientOptions = {
noConnection: boolean;
};
export type NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => Promise<Client>;
export type PluginKind = 'source' | 'destination';
export type BuildTarget = {
os: 'linux' | 'darwin' | 'windows';
arch: 'amd64' | 'arm64';
};
export type PluginOptions = {
team: string;
kind: PluginKind;
dockerFile?: string;
buildTargets?: BuildTarget[];
jsonSchema?: string;
};
export interface SourceClient {
tables: (options: TableOptions) => Promise<Table[]>;
sync: (options: SyncOptions) => void;
}
export interface DestinationClient {
read: (stream: ReadStream) => void;
write: (stream: WriteStream) => Promise<void>;
}
export interface Client extends SourceClient, DestinationClient {
close: () => Promise<void>;
}
export interface Plugin extends Client {
getLogger: () => Logger;
setLogger: (logger: Logger) => void;
name: () => string;
version: () => string;
team: () => string | undefined;
kind: () => PluginKind | undefined;
jsonSchema: () => string | undefined;
dockerFile: () => string;
buildTargets: () => BuildTarget[];
init: (spec: string, options: NewClientOptions) => Promise<void>;
testConnection?: (spec: string) => Promise<{ success?: boolean; failureCode?: string; failureDescription?: string }>;
}
export const newUnimplementedSource = (): SourceClient => {
return {
tables: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'tables' } })),
sync: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'sync' } })),
};
};
export const newUnimplementedDestination = (): DestinationClient => {
return {
read: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'read' } })),
write: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'write' } })),
};
};
const defaultBuildTargets: BuildTarget[] = [
{ os: 'linux', arch: 'amd64' },
{ os: 'linux', arch: 'arm64' },
];
export const newPlugin = (
name: string,
version: string,
newClient: NewClientFunction,
options?: PluginOptions,
): Plugin => {
const plugin = {
client: undefined as Client | undefined,
logger: undefined as Logger | undefined,
name: () => name,
version: () => version,
team: () => options?.team,
kind: () => options?.kind,
jsonSchema: () => options?.jsonSchema,
dockerFile: () => options?.dockerFile || 'Dockerfile',
buildTargets: () => options?.buildTargets || defaultBuildTargets,
write: (stream: WriteStream) => {
return plugin.client?.write(stream) ?? Promise.reject(new InitializationError('client not initialized'));
},
read: (stream: ReadStream) => {
return plugin.client?.read(stream) ?? new InitializationError('client not initialized');
},
getLogger: () => {
return plugin.logger!;
},
setLogger: (logger: Logger) => {
plugin.logger = logger;
},
sync: (options: SyncOptions) => {
return plugin.client?.sync(options) ?? new InitializationError('client not initialized');
},
tables: (options: TableOptions) => {
return plugin.client?.tables(options) ?? Promise.reject(new InitializationError('client not initialized'));
},
init: async (spec: string, options: NewClientOptions) => {
plugin.client = await newClient(plugin.logger!, spec, options);
},
close: () => plugin.client?.close() ?? Promise.reject(new InitializationError('client not initialized')),
};
return plugin;
};