Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/http/routes/object/createObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { FromSchema } from 'json-schema-to-ts'
import { createDefaultSchema } from '../../routes-helper'
import { ROUTE_OPERATIONS } from '../operations'
import fastifyMultipart from '@fastify/multipart'
import { fileUploadFromRequest } from '@storage/uploader'

const createObjectParamsSchema = {
type: 'object',
Expand Down Expand Up @@ -74,14 +75,26 @@ export default async function routes(fastify: FastifyInstance) {
const isUpsert = request.headers['x-upsert'] === 'true'
const owner = request.owner

const { objectMetadata, path, id } = await request.storage
.from(bucketName)
.uploadFromRequest(request, {
// Get bucket information once for better error context
const bucket = await request.storage
.asSuperUser()
.findBucket(bucketName, 'id, name, file_size_limit, allowed_mime_types')

const { objectMetadata, path, id } = await request.storage.from(bucketName).uploadNewObject({
file: await fileUploadFromRequest(request, {
objectName,
signal: request.signals.body.signal,
owner: owner,
isUpsert,
})
fileSizeLimit: bucket.file_size_limit,
allowedMimeTypes: bucket.allowed_mime_types || [],
}),
objectName,
signal: request.signals.body.signal,
owner: owner,
isUpsert,
bucketContext: {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
},
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Id: id,
Expand Down
27 changes: 20 additions & 7 deletions src/http/routes/object/updateObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { FromSchema } from 'json-schema-to-ts'
import { createDefaultSchema } from '../../routes-helper'
import { ROUTE_OPERATIONS } from '../operations'
import fastifyMultipart from '@fastify/multipart'
import { fileUploadFromRequest } from '@storage/uploader'

const updateObjectParamsSchema = {
type: 'object',
Expand Down Expand Up @@ -69,14 +70,26 @@ export default async function routes(fastify: FastifyInstance) {
const objectName = request.params['*']
const owner = request.owner as string

const { objectMetadata, path, id } = await request.storage
.from(bucketName)
.uploadFromRequest(request, {
// Get bucket information once for better error context
const bucket = await request.storage
.asSuperUser()
.findBucket(bucketName, 'id, name, file_size_limit, allowed_mime_types')

const { objectMetadata, path, id } = await request.storage.from(bucketName).uploadNewObject({
file: await fileUploadFromRequest(request, {
objectName,
signal: request.signals.body.signal,
owner: owner,
isUpsert: true,
})
fileSizeLimit: bucket.file_size_limit,
allowedMimeTypes: bucket.allowed_mime_types || [],
}),
objectName,
signal: request.signals.body.signal,
owner: owner,
isUpsert: true,
bucketContext: {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
},
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Id: id,
Expand Down
19 changes: 17 additions & 2 deletions src/http/routes/object/uploadSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { FastifyInstance } from 'fastify'
import { FromSchema } from 'json-schema-to-ts'
import { ROUTE_OPERATIONS } from '../operations'
import fastifyMultipart from '@fastify/multipart'
import { fileUploadFromRequest } from '@storage/uploader'

const uploadSignedObjectParamsSchema = {
type: 'object',
Expand Down Expand Up @@ -87,14 +88,28 @@ export default async function routes(fastify: FastifyInstance) {
.from(bucketName)
.verifyObjectSignature(token, objectName)

// Get bucket information once for better error context
const bucket = await request.storage
.asSuperUser()
.findBucket(bucketName, 'id, name, file_size_limit, allowed_mime_types')

const { objectMetadata, path } = await request.storage
.asSuperUser()
.from(bucketName)
.uploadFromRequest(request, {
owner,
.uploadNewObject({
file: await fileUploadFromRequest(request, {
objectName,
fileSizeLimit: bucket.file_size_limit,
allowedMimeTypes: bucket.allowed_mime_types || [],
}),
objectName,
owner,
isUpsert: upsert,
signal: request.signals.body.signal,
bucketContext: {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
},
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
70 changes: 49 additions & 21 deletions src/http/routes/s3/commands/put-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export default function PutObject(s3Router: S3Router) {

const bucket = await ctx.storage
.asSuperUser()
.findBucket(req.Params.Bucket, 'id,file_size_limit,allowed_mime_types')
.findBucket(req.Params.Bucket, 'id,name,file_size_limit,allowed_mime_types')

const uploadRequest = await fileUploadFromRequest(ctx.req, {
objectName: key,
Expand All @@ -137,7 +137,11 @@ export default function PutObject(s3Router: S3Router) {

return pipeline(
uploadRequest.body,
new ByteLimitTransformStream(uploadRequest.maxFileSize),
new ByteLimitTransformStream(uploadRequest.maxFileSize, {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
globalLimit: uploadRequest.globalLimit,
}),
ctx.req.streamingSignatureV4 || new PassThrough(),
async (fileStream) => {
return s3Protocol.putObject(
Expand All @@ -151,7 +155,15 @@ export default function PutObject(s3Router: S3Router) {
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
{ signal: ctx.signals.body, isTruncated: uploadRequest.isTruncated }
{
signal: ctx.signals.body,
isTruncated: uploadRequest.isTruncated,
bucketContext: {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
globalLimit: uploadRequest.globalLimit,
},
}
)
}
)
Expand All @@ -176,29 +188,45 @@ export default function PutObject(s3Router: S3Router) {

const bucket = await ctx.storage
.asSuperUser()
.findBucket(req.Params.Bucket, 'id,file_size_limit,allowed_mime_types')
.findBucket(req.Params.Bucket, 'id,name,file_size_limit,allowed_mime_types')

const fieldsObject = fieldsToObject(file?.fields || {})
const metadata = s3Protocol.parseMetadataHeaders(fieldsObject)
const expiresField = fieldsObject.expires

const maxFileSize = await getStandardMaxFileSizeLimit(ctx.tenantId, bucket.file_size_limit)

return pipeline(file.file, new ByteLimitTransformStream(maxFileSize), async (fileStream) => {
return s3Protocol.putObject(
{
Body: fileStream as stream.Readable,
Bucket: req.Params.Bucket,
Key: fieldsObject.key as string,
CacheControl: fieldsObject['cache-control'] as string,
ContentType: fieldsObject['content-type'] as string,
Expires: expiresField ? new Date(expiresField) : undefined,
ContentEncoding: fieldsObject['content-encoding'] as string,
Metadata: metadata,
},
{ signal: ctx.signals.body, isTruncated: () => file.file.truncated }
)
})
const limits = await getStandardMaxFileSizeLimit(ctx.tenantId, bucket.file_size_limit)

return pipeline(
file.file,
new ByteLimitTransformStream(limits.maxFileSize, {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
globalLimit: limits.globalLimit,
}),
async (fileStream) => {
return s3Protocol.putObject(
{
Body: fileStream as stream.Readable,
Bucket: req.Params.Bucket,
Key: fieldsObject.key as string,
CacheControl: fieldsObject['cache-control'] as string,
ContentType: fieldsObject['content-type'] as string,
Expires: expiresField ? new Date(expiresField) : undefined,
ContentEncoding: fieldsObject['content-encoding'] as string,
Metadata: metadata,
},
{
signal: ctx.signals.body,
isTruncated: () => file.file.truncated,
bucketContext: {
name: bucket.name,
fileSizeLimit: bucket.file_size_limit,
globalLimit: limits.globalLimit,
},
}
)
}
)
}
)
}
Expand Down
35 changes: 32 additions & 3 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { StorageBackendError } from './storage-error'

function formatBytes(bytes: number): string {
if (bytes === 0) return '0 B'

const k = 1024
const sizes = ['B', 'KB', 'MB', 'GB', 'TB']
const i = Math.floor(Math.log(bytes) / Math.log(k))

return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i]
}

export enum ErrorCode {
NoSuchBucket = 'NoSuchBucket',
NoSuchKey = 'NoSuchKey',
Expand Down Expand Up @@ -232,7 +242,7 @@ export const ERRORS = {
error: 'invalid_mime_type',
code: ErrorCode.InvalidMimeType,
httpStatusCode: 415,
message: `mime type ${mimeType} is not supported`,
message: `MIME type ${mimeType} is not supported`,
}),

InvalidRange: () =>
Expand All @@ -243,12 +253,31 @@ export const ERRORS = {
message: `invalid range provided`,
}),

EntityTooLarge: (e?: Error, entity = 'object') =>
EntityTooLarge: (
e?: Error,
entity = 'object',
context?: { bucketName?: string; bucketLimit?: number; globalLimit?: number }
) =>
new StorageBackendError({
error: 'Payload too large',
code: ErrorCode.EntityTooLarge,
httpStatusCode: 413,
message: `The ${entity} exceeded the maximum allowed size`,
message:
context?.bucketName && context?.bucketLimit
? `The ${entity} exceeded the maximum allowed size for bucket "${
context.bucketName
}" (${formatBytes(context.bucketLimit)}). ${
context.globalLimit && context.bucketLimit < context.globalLimit
? `This bucket has a lower limit than your global setting (${formatBytes(
context.globalLimit
)}). You can increase the bucket limit in your Storage settings.`
: ''
}`
: context?.globalLimit
? `The ${entity} exceeded the maximum allowed size in your global settings (${formatBytes(
context.globalLimit
)})`
: `The ${entity} exceeded the maximum allowed size`,
originalError: e,
}),

Expand Down
41 changes: 10 additions & 31 deletions src/storage/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getJwtSecret } from '@internal/database'
import { ObjectMetadata, StorageBackendAdapter } from './backend'
import { Database, FindObjectFilters, SearchObjectOption } from './database'
import { mustBeValidKey } from './limits'
import { fileUploadFromRequest, Uploader, UploadRequest } from './uploader'
import { Uploader, UploadRequest } from './uploader'
import { getConfig } from '../config'
import {
ObjectAdminDelete,
Expand All @@ -16,7 +16,6 @@ import {
ObjectRemovedMove,
ObjectUpdatedMetadata,
} from './events'
import { FastifyRequest } from 'fastify/types/request'
import { Obj } from '@storage/schemas'
import { StorageObjectLocator } from '@storage/locator'

Expand Down Expand Up @@ -66,39 +65,18 @@ export class ObjectStorage {
return new ObjectStorage(this.backend, this.db.asSuperUser(), this.location, this.bucketId)
}

async uploadFromRequest(
request: FastifyRequest,
file: {
objectName: string
owner?: string
isUpsert: boolean
signal?: AbortSignal
}
) {
const bucket = await this.db
.asSuperUser()
.findBucketById(this.bucketId, 'id, file_size_limit, allowed_mime_types')

const uploadRequest = await fileUploadFromRequest(request, {
objectName: file.objectName,
fileSizeLimit: bucket.file_size_limit,
allowedMimeTypes: bucket.allowed_mime_types || [],
})

return this.uploadNewObject({
file: uploadRequest,
objectName: file.objectName,
owner: file.owner,
isUpsert: Boolean(file.isUpsert),
signal: file.signal,
})
}

/**
* Upload a new object to a storage
* @param request
*/
async uploadNewObject(request: Omit<UploadRequest, 'bucketId' | 'uploadType'>) {
async uploadNewObject(
request: Omit<UploadRequest, 'bucketId' | 'uploadType'> & {
bucketContext?: {
name: string
fileSizeLimit?: number | null
}
}
) {
mustBeValidKey(request.objectName)

const path = `${this.bucketId}/${request.objectName}`
Expand All @@ -107,6 +85,7 @@ export class ObjectStorage {
...request,
bucketId: this.bucketId,
uploadType: 'standard',
bucketContext: request.bucketContext,
})

return { objectMetadata: metadata, path, id: obj.id }
Expand Down
18 changes: 16 additions & 2 deletions src/storage/protocols/s3/byte-limit-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ import { ERRORS } from '@internal/errors'
export class ByteLimitTransformStream extends Transform {
bytesProcessed = 0

constructor(private readonly limit: number) {
constructor(
private readonly limit: number,
private readonly bucketContext?: {
name: string
fileSizeLimit?: number | null
globalLimit?: number
}
) {
super()
}

_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback) {
this.bytesProcessed += chunk.length

if (this.bytesProcessed > this.limit) {
callback(ERRORS.EntityTooLarge())
const context = this.bucketContext
? {
bucketName: this.bucketContext.name,
bucketLimit: this.bucketContext.fileSizeLimit || undefined,
globalLimit: this.bucketContext.globalLimit,
}
: undefined
callback(ERRORS.EntityTooLarge(undefined, 'object', context))
} else {
callback(null, chunk)
}
Expand Down
Loading
Loading