Datastore dynamic
Initial checklist
- [x] I understand this is a feature request and questions should be posted in the Community Forum
- [x] I searched issues and couldn’t find anything (or linked relevant results below)
Problem
Hi,
Unfortunately, an endpoint is currently attached to ‘a single datastore’, which poses a problem, as we cannot have S3s for a single endpoint. In our case, all clients have a private S3, and only one endpoint on our server.
I've tried to find a solution by tinkering, but it seems impossible.
Solution
The problem would be solved if we could allocate an s3Client dynamically during the upload request, for example, using the ‘onIncomingRequest’ or ‘namingFunction’ method.
Alternatives
No alternative solutions
I actually have a similar question. I was wondering if I can initialize different datastores based on a metadata value or something else.
For what it's worth, I implemented it as a separate class extending S3Store
basically injecting the correct S3 config in each of the "public" functions of S3Store
import {
type StorageConfig,
getStorageConfig,
} from "@/ee/features/storage/config";
import { S3 } from "@aws-sdk/client-s3";
import { S3Store } from "@tus/s3-store";
import type { Upload } from "@tus/server";
import type { Readable } from "stream";
import { getFeatureFlags } from "@/lib/featureFlags";
/**
* Multi-region S3Store that routes uploads to different S3 buckets
* based on team storage preferences. Extends S3Store and dynamically
* switches the S3 client and bucket based on team feature flags.
*/
export class MultiRegionS3Store extends S3Store {
private euConfig: StorageConfig;
private usConfig: StorageConfig;
private euClient: S3;
private usClient: S3;
private teamStorageCache = new Map<string, boolean>(); // teamId -> useUSStorage
constructor() {
// Initialize with EU config as default
const euConfig = getStorageConfig();
// Create S3 client config for super() call (omit endpoint if empty/undefined)
const superS3Config: any = {
bucket: euConfig.bucket,
region: euConfig.region,
credentials: {
accessKeyId: euConfig.accessKeyId,
secretAccessKey: euConfig.secretAccessKey,
},
};
super({
partSize: 8 * 1024 * 1024, // 8MiB parts
s3ClientConfig: superS3Config,
});
// Store configurations
this.euConfig = euConfig;
// Create EU S3 client configuration (omit endpoint if empty/undefined)
const euS3Config: any = {
bucket: euConfig.bucket,
region: euConfig.region,
credentials: {
accessKeyId: euConfig.accessKeyId,
secretAccessKey: euConfig.secretAccessKey,
},
};
this.euClient = new S3(euS3Config);
// Initialize US configuration and client
try {
this.usConfig = getStorageConfig("us-east-2");
// Create US S3 client configuration (omit endpoint if empty/undefined)
const usS3Config: any = {
bucket: this.usConfig.bucket,
region: this.usConfig.region,
credentials: {
accessKeyId: this.usConfig.accessKeyId,
secretAccessKey: this.usConfig.secretAccessKey,
},
};
this.usClient = new S3(usS3Config);
} catch (error) {
this.usConfig = euConfig;
this.usClient = this.euClient;
}
}
/**
* Extracts teamId from upload ID (format: teamId/docId/filename)
*/
private extractTeamIdFromUploadId(uploadId: string): string | null {
const parts = uploadId.split("/");
return parts.length > 0 ? parts[0] : null;
}
/**
* Determines if team should use US storage, with caching
*/
private async shouldUseUSStorage(teamId: string): Promise<boolean> {
// Check cache first
if (this.teamStorageCache.has(teamId)) {
const cached = this.teamStorageCache.get(teamId)!;
return cached;
}
try {
const features = await getFeatureFlags({ teamId });
const useUS = features.usStorage || false;
// Cache the result for 5 minutes
this.teamStorageCache.set(teamId, useUS);
setTimeout(() => this.teamStorageCache.delete(teamId), 5 * 60 * 1000);
return useUS;
} catch (error) {
return false; // Default to EU
}
}
/**
* Sets the S3 client and bucket for the appropriate region
*/
private async ensureCorrectRegion(uploadId: string): Promise<void> {
const teamId = this.extractTeamIdFromUploadId(uploadId);
if (!teamId) {
// Default to EU - ensure we're using EU client and bucket
this.client = this.euClient;
this.bucket = this.euConfig.bucket;
return;
}
const useUS = await this.shouldUseUSStorage(teamId);
if (useUS) {
// Switch to US client and bucket
this.client = this.usClient;
this.bucket = this.usConfig.bucket;
} else {
// Use EU client and bucket
this.client = this.euClient;
this.bucket = this.euConfig.bucket;
}
}
// Override key S3Store methods to ensure correct region
async create(upload: Upload): Promise<Upload> {
try {
await this.ensureCorrectRegion(upload.id);
return await super.create(upload);
} catch (error) {
throw error;
}
}
async write(stream: Readable, id: string, offset: number): Promise<number> {
try {
await this.ensureCorrectRegion(id);
return await super.write(stream, id, offset);
} catch (error) {
throw error;
}
}
async getUpload(id: string): Promise<Upload> {
try {
await this.ensureCorrectRegion(id);
return await super.getUpload(id);
} catch (error) {
throw error;
}
}
async remove(id: string): Promise<void> {
try {
await this.ensureCorrectRegion(id);
// Clean up cache entry
const teamId = this.extractTeamIdFromUploadId(id);
if (teamId) {
this.teamStorageCache.delete(teamId);
}
await super.remove(id);
} catch (error) {
throw error;
}
}
async declareUploadLength(id: string, length: number): Promise<void> {
try {
await this.ensureCorrectRegion(id);
await super.declareUploadLength(id, length);
} catch (error) {
throw error;
}
}
async read(id: string): Promise<Readable> {
try {
await this.ensureCorrectRegion(id);
return await super.read(id);
} catch (error) {
throw error;
}
}
}
my solution is overrite some protected methods in S3Store to my custom class. s3Store.bucket is also protected variable, so we must do overrite to your own class to use custom bucketName. On client i use uppy, and reffering bucketname and prefix into metadata on moment initing uppy. My code - is example how it can works, but you need to think how to dispose out DynamicBucketS3Store.targetBucket in state of that class. i ll do it later
import { S3Client } from "@aws-sdk/client-s3"
import { IncomingMessage } from "http"
import { s3Config } from "./config.ts"
import { ERRORS, Upload } from "@tus/server"
import type AWS from "@aws-sdk/client-s3"
const defaultBucket = process.env.DEFAULT_BUCKET
export class DynamicBucketS3Store extends S3Store {
private targetBucket: string = ""
override async create(upload: Upload): Promise<Upload> {
const bucketName = upload.metadata!.bucket!
upload.metadata!["tus-bucket"] = bucketName
delete upload.metadata!.bucket
this.bucket = bucketName
this.targetBucket = bucketName
return await super.create(upload)
}
// тут метод write
async getUpload(id: string): Promise<Upload> {
let metadata: MetadataValue
try {
metadata = await this.getMetadata(id)
} catch (error) {
throw ERRORS.FILE_NOT_FOUND
}
let offset = 0
try {
const parts = await this.retrieveParts(id)
offset = calcOffsetFromParts(parts)
} catch (error: any) {
// Check if the error is caused by the upload not being found. This happens
// when the multipart upload has already been completed or aborted. Since
// we already found the info object, we know that the upload has been
// completed and therefore can ensure the the offset is the size.
// AWS S3 returns NoSuchUpload, but other implementations, such as DigitalOcean
// Spaces, can also return NoSuchKey.
if (error.Code === "NoSuchUpload" || error.Code === "NoSuchKey") {
return new Upload({
...metadata.file,
offset: metadata.file.size as number,
size: metadata.file.size,
metadata: metadata.file.metadata,
storage: metadata.file.storage,
})
}
throw error
}
const incompletePartSize = await this.getIncompletePartSize(id)
return new Upload({
...metadata.file,
offset: offset + (incompletePartSize ?? 0),
size: metadata.file.size,
storage: metadata.file.storage,
})
}
async getMetadata(id: string): Promise<MetadataValue> {
const cached = await this.cache.get(id)
if (cached) {
return cached
}
const { Metadata, Body } = await this.client.getObject({
Bucket: this.targetBucket,
Key: this.infoKey(id),
})
const file = JSON.parse((await Body?.transformToString()) as string)
const metadata: MetadataValue = {
"tus-version": Metadata?.["tus-version"] as string,
"upload-id": Metadata?.["upload-id"] as string,
file: new Upload({
id,
size: Number.isFinite(file.size) ? Number.parseInt(file.size, 10) : undefined,
offset: Number.parseInt(file.offset, 10),
metadata: file.metadata,
creation_date: file.creation_date,
storage: file.storage,
}),
}
await this.cache.set(id, metadata)
return metadata
}
async retrieveParts(id: string, partNumberMarker?: string): Promise<Array<AWS.Part>> {
const metadata = await this.getMetadata(id)
const params: AWS.ListPartsCommandInput = {
Bucket: this.targetBucket,
Key: id,
UploadId: metadata["upload-id"],
PartNumberMarker: partNumberMarker,
}
const data = await this.client.listParts(params)
let parts = data.Parts ?? []
if (data.IsTruncated) {
const rest = await this.retrieveParts(id, data.NextPartNumberMarker)
parts = [...parts, ...rest]
}
if (!partNumberMarker) {
// biome-ignore lint/style/noNonNullAssertion: it's fine
parts.sort((a, b) => a.PartNumber! - b.PartNumber!)
}
return parts
}
async remove(id: string): Promise<void> {
const upload = await this.getUpload(id)
const bucketName = this.targetBucket
try {
return await super.remove(id)
} finally {
this.bucket = bucketName
}
}
}
function calcOffsetFromParts(parts?: any) {
// @ts-expect-error not undefined
return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0
}