diff --git a/apps/kdot-port-reverse/Dockerfile b/apps/kdot-port-reverse/Dockerfile new file mode 100644 index 00000000..d830bfd6 --- /dev/null +++ b/apps/kdot-port-reverse/Dockerfile @@ -0,0 +1,18 @@ +FROM node:15.10-buster + +ENV NODE_ENV production + +# Create and set the working directory. +RUN mkdir -p /opt/kdot-port-reverse +WORKDIR /opt/kdot-port-reverse + +# Install dependencies. +COPY yarn.lock . +COPY package.json . +COPY apps/kdot-port-reverse/package.json . +RUN yarn + +# Copy all of the other files over. +COPY apps/kdot-port-reverse . + +CMD ["yarn", "serve"] diff --git a/apps/kdot-port-reverse/LICENSE b/apps/kdot-port-reverse/LICENSE new file mode 100644 index 00000000..4187b692 --- /dev/null +++ b/apps/kdot-port-reverse/LICENSE @@ -0,0 +1,35 @@ +Copyright Ian Walter (https://ianwalter.dev) + +Licensed under the Apache License, Version 2.0 (the "License") modified with +Commons Clause Restriction; you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed +under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. + +"Commons Clause" License Condition v1.0 + +The Software is provided to you by the Licensor under the License, as defined +below, subject to the following condition. + +Without limiting other conditions in the License, the grant of rights under the +License will not include, and the License does not grant to you, the right to +Sell the Software. + +For purposes of the foregoing, "Sell" means practicing any or all of the rights +granted to you under the License to provide to third parties, for a fee or other +consideration (including without limitation fees for hosting or +consulting/support services related to the Software), a product or service whose +value derives, entirely or substantially, from the functionality of the +Software. Any license notice or attribution required by the License must also +include this Commons Clause License Condition notice. + +Software: kdot-port-reverse + +License: Apache 2.0 + +Licensor: Ian Walter diff --git a/apps/kdot-port-reverse/index.js b/apps/kdot-port-reverse/index.js new file mode 100644 index 00000000..2eb560ac --- /dev/null +++ b/apps/kdot-port-reverse/index.js @@ -0,0 +1,132 @@ +import net from 'net' +import { createLogger } from '@generates/logger' +import WebSocket from 'ws' +import { nanoid } from 'nanoid' +import { stripIndent } from 'common-tags' + +const level = process.env.LOG_LEVEL || 'info' +const logger = createLogger({ level, namespace: 'kdot-port-reverse' }) + +// Create a server that will listen for incoming HTTP requests that can be +// relayed to the client via websocket after which the client will relay to the +// local server. +const server = net.createServer() + +// Keep track of connections by the ID thats generated on the initial +// connection. +const connections = {} + +// Create a websocket server on the configured port so that the client can +// connect and communicate with this server. +const port = process.env.TUNNEL_PORT || '28199' +const wss = new WebSocket.Server({ port }) + +// Store the websocket connection when a connection is received so it can be +// used when receiving HTTP requests. +let ws +wss.on('connection', websocket => { + logger.info('Websocket connection on port', port) + ws = websocket + + ws.on('message', message => { + const { id, event, data, err } = JSON.parse(message) + const info = { id, event, err } + logger.debug('Client message', info) + + const socket = connections[id] + if (socket) { + if (event === 'end') { + logger.debug('Server data end', info) + + // + socket.end() + } else if (event === 'close') { + // The client has indicated that the connection has been closed. + logger.debug('Closing requester connection', info) + + // Remove the connection from the connection store. + delete connections[id] + + // Close the requester's connection. + socket.destroy(err) + } else { + // + logger.debug('Server data', info) + socket.write(Buffer.from(data)) + } + } else { + logger.warn('Received event or unknown connection', info) + } + }) +}) + +server.on('connection', socket => { + // Create a unique ID for the connection. + const id = nanoid() + logger.info(`Connection ${id}`) + + // Store the socket as a "connection" so it can be retrieved when theres a + // response received through the websocket. + if (ws) connections[id] = socket + + // Handle request data sent by the requester over the socket. + socket.on('data', requestData => { + let dataSlice + if (level === 'debug') dataSlice = requestData.slice(0, 2048).toString() + logger.debug('Request data', { id, dataSlice }) + + if (ws) { + // If a websocket connection has been established, relay the request data + // to the client. + ws.send(JSON.stringify({ id, event: 'data', data: requestData })) + } else { + // If a websocket connection hasn't been established, return a response to + // the requester explaining the situation. + const message = 'Server connection with no client websocket connection' + logger.warn(message, { id }) + + const buf = Buffer.from(message, 'utf8') + socket.write(stripIndent` + HTTP/1.1 500 Internal Server Error + Accept-Ranges: bytes + Content-Length: ${buf.length} + Connection: close + Content-Type: text/plain + + ${buf} + `) + socket.destroy() + } + }) + + // + socket.on('end', () => { + logger.debug('Requester data end:', id) + ws.send(JSON.stringify({ id, event: 'end' })) + }) + + socket.on('close', hadError => { + // If the requester has closed the connection, send a message to the client + // so it can clean up the corresponding connection to the server if it has + // established one. + if (ws && connections[id]) { + if (hadError) { + logger.warn('Requester closed connection and had an error:', id) + } else { + logger.debug('Requester closed connection:', id) + } + ws.send(JSON.stringify({ id, event: 'close' })) + } + + // Remove the connection from the store if it exists. + delete connections[id] + }) +}) + +// Listen on the configured port so that the server can receive HTTP requests. +server.listen(process.env.PORT, () => { + logger.info('kdot-port-reverse started', { + port: process.env.PORT, + tunnelPort: port + }) +}) diff --git a/apps/kdot-port-reverse/package.json b/apps/kdot-port-reverse/package.json new file mode 100644 index 00000000..8c2ae86e --- /dev/null +++ b/apps/kdot-port-reverse/package.json @@ -0,0 +1,16 @@ +{ + "private": true, + "name": "kdot-port-reverse", + "version": "0.0.0", + "license": "SEE LICENSE IN LICENSE", + "type": "module", + "scripts": { + "serve": "node index.js" + }, + "dependencies": { + "@generates/logger": "^0.1.2", + "common-tags": "^1.8.0", + "nanoid": "^3.1.22", + "ws": "^7.4.4" + } +} diff --git a/packages/kdot-port-reverse/index.js b/packages/kdot-port-reverse/index.js new file mode 100644 index 00000000..50e0eeb2 --- /dev/null +++ b/packages/kdot-port-reverse/index.js @@ -0,0 +1,10 @@ +export default function kdotPortReverse (config = {}) { + const { port = 4000 } = config + return { + image: 'generates/kdot-port-reverse', + ports: { + app: { port } + }, + env: { PORT_REVERSE_TARGET: config.target, PORT: port } + } +} diff --git a/packages/kdot/lib/applyResource.js b/packages/kdot/lib/applyResource.js index abb87f67..cc3e6bd6 100644 --- a/packages/kdot/lib/applyResource.js +++ b/packages/kdot/lib/applyResource.js @@ -27,7 +27,19 @@ export default async function applyResource ({ app, ...resource }, input = {}) { } } catch (err) { const msg = `Failed to apply ${resource.kind}:` + + let req + if (err.response?.request?.body) { + try { + req = JSON.parse(err.response?.request?.body) + } catch (err) { + // Ignore JSON parse error. + } + } + logger[logLevel](msg, name, err.response?.body || err) + logger.debug('Failed request body', req) + if (input.failFast) process.exit(1) } } diff --git a/packages/kdot/lib/commands/fwd.js b/packages/kdot/lib/commands/fwd.js index 3aac5cfe..ace26caa 100644 --- a/packages/kdot/lib/commands/fwd.js +++ b/packages/kdot/lib/commands/fwd.js @@ -5,6 +5,7 @@ import { oneLine } from 'common-tags' import { PortForward, kc } from '../k8s.js' import getRunningPods from '../getRunningPods.js' import configure from '../configure/index.js' +import reversePort from '../reversePort.js' const logger = createLogger({ namespace: 'kdot.fwd', level: 'info' }) const pollConfig = { interval: 1000, timeout: 300000, limit: 1 } @@ -96,7 +97,27 @@ export default async function fwd (input) { const pod = await getRunningPods(namespace, app.name, pollConfig) for (const [name, portConfig] of Object.entries(app.ports)) { portConfig.name = name - await forwardPort(app, pod, portConfig) + + // + const localPort = portConfig.localPort || portConfig.port + if (localPort !== portConfig.reversePort) { + await forwardPort(app, pod, portConfig) + } + + // + if (portConfig.reversePort) { + setTimeout( + () => { + reversePort({ + app: app.name, + port: portConfig.port, + reversePort: portConfig.reversePort, + kprPort: app.ports.kpr?.port + }) + }, + 5000 + ) + } } } catch (err) { logger.error(err) diff --git a/packages/kdot/lib/configure/index.js b/packages/kdot/lib/configure/index.js index 8856f199..e4b83bfd 100644 --- a/packages/kdot/lib/configure/index.js +++ b/packages/kdot/lib/configure/index.js @@ -19,6 +19,9 @@ const logger = createLogger({ namespace: 'kdot.configure', level: 'info' }) const labels = { managedBy: 'kdot' } const containerAttrs = V1Container.attributeTypeMap.map(a => a.name) +// +let kprPort = 28199 + function toContainerPorts (ports) { if (ports) return Object.values(ports).map(p => ({ containerPort: p.port })) } @@ -122,6 +125,15 @@ export default async function configure ({ ext, ...input }) { Object.defineProperty(app, 'taggedImage', { get: taggedImage }) Object.defineProperty(app, 'taggedImages', { get: taggedImages }) + // + if (Object.values(app.ports).find(p => p.reversePort)) { + kprPort++ + app.image = 'generates/kdot-port-reverse:v0.0.3' + app.ports.kpr = { port: kprPort } + app.env.push({ name: 'TUNNEL_PORT', value: `${kprPort}` }) + app.env.push({ name: 'LOG_LEVEL', value: 'debug' }) + } + cfg.resources.push({ app, apiVersion: 'apps/v1', diff --git a/packages/kdot/lib/reversePort.js b/packages/kdot/lib/reversePort.js new file mode 100644 index 00000000..6844b925 --- /dev/null +++ b/packages/kdot/lib/reversePort.js @@ -0,0 +1,93 @@ +import net from 'net' +import { createLogger } from '@generates/logger' +import WebSocket from 'ws' + +const logger = createLogger({ level: 'info', namespace: 'kdot.fwd' }) + +export default function reversePort (config) { + const { kprPort = 28199, app } = config + + // Create a websocket connection the the kdot-port-reverse server running in + // the cluster. + const url = `ws://localhost:${kprPort}/` + logger.info('Attempting reverse port websocket connection:', url) + const ws = new WebSocket(url) + + // Store connections to the local server so that subsequent messages can be + // routed to them. + const connections = {} + + // Listen for messages from the kdot-port-reverse server. + ws.on('message', message => { + const { id, event, data } = JSON.parse(message) + logger.debug('Port reverse message', { app, id, event }) + + // Retrieve the connection from the store by ID if it exists. + const conn = connections[id] + const info = { app, id } + + if (conn && event === 'end') { + // + logger.debug('Reqeuster data end', info) + + // + conn.end() + } else if (conn && event === 'close') { + // The server has indicated that the connection has been closed by the + // requester. + logger.debug('Requester connection closed', info) + + // Remove the connection from the store. + delete connections[id] + + // Close the connection to the local server. + conn.close() + } else if (conn && event === 'data') { + // Relay data to the local server over the existing connection. + conn.write(Buffer.from(data)) + } else if (event === 'data') { + // Create a new connection to the local server so that the request from + // the cluster can be relayed to it. + const conn = net.createConnection({ port: config.reversePort }, () => { + logger.debug('Connected to server', { app, id }) + + // Forward the data received from kdot-port-reverse to the local server. + conn.write(Buffer.from(data), err => { + logger.debug('Server request', { app, id }) + if (err) { + logger.error('Server request error', { app, id }, '\n', err) + ws.send(JSON.stringify({ id, event: 'close', err: err.message })) + } + }) + + // Pass data returned by the local server to kdot-port-reverse. + conn.on('data', data => { + logger.debug('Server response data', { app, id }) + ws.send(JSON.stringify({ id, event: 'data', data })) + }) + + // + conn.on('end', () => { + logger.debug('Server data end', { app, id }) + ws.send(JSON.stringify({ id, event: 'end' })) + }) + + conn.on('close', () => { + if (connections[id]) { + // When the local server closes the connection, tell + // kdot-port-reverse to close it's connection to the requesting + // client. + logger.debug('Server response close', { app, id }) + ws.send(JSON.stringify({ id, event: 'close' })) + + // Remove the connection from the store. + delete connections[id] + } + }) + }) + + // Store the new connection by it's ID. + connections[id] = conn + } + }) +} diff --git a/packages/kdot/package.json b/packages/kdot/package.json index 6490f967..a7777ede 100644 --- a/packages/kdot/package.json +++ b/packages/kdot/package.json @@ -61,7 +61,8 @@ "got": "^11.8.1", "p-reduce": "^2.1.0", "p-timeout": "^4.1.0", - "server-destroy": "^1.0.1" + "server-destroy": "^1.0.1", + "ws": "^7.4.4" }, "kdot": { "config": [ @@ -72,5 +73,9 @@ "test": { "v": "0.0.1" } + }, + "optionalDependencies": { + "bufferutil": "^4.0.3", + "utf-8-validate": "^5.0.4" } } diff --git a/yarn.lock b/yarn.lock index ceb9facc..60d27ad0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3034,6 +3034,13 @@ buffer@^5.1.0: base64-js "^1.3.1" ieee754 "^1.1.13" +bufferutil@^4.0.3: + version "4.0.3" + resolved "https://registry.yarnpkg.com/bufferutil/-/bufferutil-4.0.3.tgz#66724b756bed23cd7c28c4d306d7994f9943cc6b" + integrity sha512-yEYTwGndELGvfXsImMBLop58eaGW+YdONi1fNjTINSY98tmMmFijBG6WXgdkfuLNt4imzQNtIE+eBp1PVpMCSw== + dependencies: + node-gyp-build "^4.2.0" + builtin-modules@^3.1.0: version "3.2.0" resolved "https://registry.yarnpkg.com/builtin-modules/-/builtin-modules-3.2.0.tgz#45d5db99e7ee5e6bc4f362e008bf917ab5049887" @@ -7167,6 +7174,11 @@ nanoid@^3.1.20: resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.20.tgz#badc263c6b1dcf14b71efaa85f6ab4c1d6cfc788" integrity sha512-a1cQNyczgKbLX9jwbS/+d7W8fX/RfgYR7lVWwWOGIPNgK2m0MWvrGF6/m4kk6U3QcFMnZf3RIhL0v2Jgh/0Uxw== +nanoid@^3.1.22: + version "3.1.22" + resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.22.tgz#b35f8fb7d151990a8aebd5aa5015c03cf726f844" + integrity sha512-/2ZUaJX2ANuLtTvqTlgqBQNJoQO398KyJgZloL0PZkC0dpysjncRUPsFe3DUPzz/y3h+u7C46np8RMuvF3jsSQ== + nanomatch@^1.2.9: version "1.2.13" resolved "https://registry.yarnpkg.com/nanomatch/-/nanomatch-1.2.13.tgz#b87a8aa4fc0de8fe6be88895b38983ff265bd119" @@ -7228,7 +7240,7 @@ node-emoji@^1.10.0: dependencies: lodash.toarray "^4.4.0" -node-gyp-build@^4.2.2: +node-gyp-build@^4.2.0, node-gyp-build@^4.2.2: version "4.2.3" resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.2.3.tgz#ce6277f853835f718829efb47db20f3e4d9c4739" integrity sha512-MN6ZpzmfNCRM+3t57PTJHgHyw/h4OWnZ6mR8P5j/uZtqQr46RRuDE/P+g3n0YR/AiYXeWixZZzaip77gdICfRg== @@ -10045,6 +10057,13 @@ use@^3.1.0: resolved "https://registry.yarnpkg.com/use/-/use-3.1.1.tgz#d50c8cac79a19fbc20f2911f56eb973f4e10070f" integrity sha512-cwESVXlO3url9YWlFW/TA9cshCEhtu7IKJ/p5soJ/gGpj7vbvFrAY/eIioQ6Dw23KjZhYgiIo8HOs1nQ2vr/oQ== +utf-8-validate@^5.0.4: + version "5.0.4" + resolved "https://registry.yarnpkg.com/utf-8-validate/-/utf-8-validate-5.0.4.tgz#72a1735983ddf7a05a43a9c6b67c5ce1c910f9b8" + integrity sha512-MEF05cPSq3AwJ2C7B7sHAA6i53vONoZbMGX8My5auEVm6W+dJ2Jd/TZPyGJ5CH42V2XtbI5FD28HeHeqlPzZ3Q== + dependencies: + node-gyp-build "^4.2.0" + util-deprecate@^1.0.1, util-deprecate@~1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf" @@ -10320,6 +10339,11 @@ ws@^7.3.1: resolved "https://registry.yarnpkg.com/ws/-/ws-7.4.2.tgz#782100048e54eb36fe9843363ab1c68672b261dd" integrity sha512-T4tewALS3+qsrpGI/8dqNMLIVdq/g/85U98HPMa6F0m6xTbvhXU6RCQLqPH3+SlomNV/LdY6RXEbBpMH6EOJnA== +ws@^7.4.4: + version "7.4.4" + resolved "https://registry.yarnpkg.com/ws/-/ws-7.4.4.tgz#383bc9742cb202292c9077ceab6f6047b17f2d59" + integrity sha512-Qm8k8ojNQIMx7S+Zp8u/uHOx7Qazv3Yv4q68MiWWWOJhiwG5W3x7iqmRtJo8xxrciZUY4vRxUTJCKuRnF28ZZw== + xml-name-validator@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/xml-name-validator/-/xml-name-validator-3.0.0.tgz#6ae73e06de4d8c6e47f9fb181f78d648ad457c6a"