tus-node-server icon indicating copy to clipboard operation
tus-node-server copied to clipboard

Datastore dynamic

Open Droppix opened this issue 1 year ago • 3 comments

Initial checklist

  • [x] I understand this is a feature request and questions should be posted in the Community Forum
  • [x] I searched issues and couldn’t find anything (or linked relevant results below)

Problem

Hi,

Unfortunately, an endpoint is currently attached to ‘a single datastore’, which poses a problem, as we cannot have S3s for a single endpoint. In our case, all clients have a private S3, and only one endpoint on our server.

I've tried to find a solution by tinkering, but it seems impossible.

Solution

The problem would be solved if we could allocate an s3Client dynamically during the upload request, for example, using the ‘onIncomingRequest’ or ‘namingFunction’ method.

Alternatives

No alternative solutions

Droppix avatar Mar 16 '25 16:03 Droppix

I actually have a similar question. I was wondering if I can initialize different datastores based on a metadata value or something else.

mfts avatar Jun 25 '25 10:06 mfts

For what it's worth, I implemented it as a separate class extending S3Store

basically injecting the correct S3 config in each of the "public" functions of S3Store

import {
  type StorageConfig,
  getStorageConfig,
} from "@/ee/features/storage/config";
import { S3 } from "@aws-sdk/client-s3";
import { S3Store } from "@tus/s3-store";
import type { Upload } from "@tus/server";
import type { Readable } from "stream";

import { getFeatureFlags } from "@/lib/featureFlags";

/**
 * Multi-region S3Store that routes uploads to different S3 buckets
 * based on team storage preferences. Extends S3Store and dynamically
 * switches the S3 client and bucket based on team feature flags.
 */
export class MultiRegionS3Store extends S3Store {
  private euConfig: StorageConfig;
  private usConfig: StorageConfig;
  private euClient: S3;
  private usClient: S3;
  private teamStorageCache = new Map<string, boolean>(); // teamId -> useUSStorage

  constructor() {
    // Initialize with EU config as default
    const euConfig = getStorageConfig();

    // Create S3 client config for super() call (omit endpoint if empty/undefined)
    const superS3Config: any = {
      bucket: euConfig.bucket,
      region: euConfig.region,
      credentials: {
        accessKeyId: euConfig.accessKeyId,
        secretAccessKey: euConfig.secretAccessKey,
      },
    };

    super({
      partSize: 8 * 1024 * 1024, // 8MiB parts
      s3ClientConfig: superS3Config,
    });

    // Store configurations
    this.euConfig = euConfig;

    // Create EU S3 client configuration (omit endpoint if empty/undefined)
    const euS3Config: any = {
      bucket: euConfig.bucket,
      region: euConfig.region,
      credentials: {
        accessKeyId: euConfig.accessKeyId,
        secretAccessKey: euConfig.secretAccessKey,
      },
    };

    this.euClient = new S3(euS3Config);

    // Initialize US configuration and client
    try {
      this.usConfig = getStorageConfig("us-east-2");

      // Create US S3 client configuration (omit endpoint if empty/undefined)
      const usS3Config: any = {
        bucket: this.usConfig.bucket,
        region: this.usConfig.region,
        credentials: {
          accessKeyId: this.usConfig.accessKeyId,
          secretAccessKey: this.usConfig.secretAccessKey,
        },
      };

      this.usClient = new S3(usS3Config);
    } catch (error) {
      this.usConfig = euConfig;
      this.usClient = this.euClient;
    }
  }

  /**
   * Extracts teamId from upload ID (format: teamId/docId/filename)
   */
  private extractTeamIdFromUploadId(uploadId: string): string | null {
    const parts = uploadId.split("/");
    return parts.length > 0 ? parts[0] : null;
  }

  /**
   * Determines if team should use US storage, with caching
   */
  private async shouldUseUSStorage(teamId: string): Promise<boolean> {
    // Check cache first
    if (this.teamStorageCache.has(teamId)) {
      const cached = this.teamStorageCache.get(teamId)!;

      return cached;
    }

    try {
      const features = await getFeatureFlags({ teamId });
      const useUS = features.usStorage || false;

      // Cache the result for 5 minutes
      this.teamStorageCache.set(teamId, useUS);
      setTimeout(() => this.teamStorageCache.delete(teamId), 5 * 60 * 1000);

      return useUS;
    } catch (error) {
      return false; // Default to EU
    }
  }

  /**
   * Sets the S3 client and bucket for the appropriate region
   */
  private async ensureCorrectRegion(uploadId: string): Promise<void> {
    const teamId = this.extractTeamIdFromUploadId(uploadId);

    if (!teamId) {
      // Default to EU - ensure we're using EU client and bucket
      this.client = this.euClient;
      this.bucket = this.euConfig.bucket;
      return;
    }

    const useUS = await this.shouldUseUSStorage(teamId);
    if (useUS) {
      // Switch to US client and bucket
      this.client = this.usClient;
      this.bucket = this.usConfig.bucket;
    } else {
      // Use EU client and bucket
      this.client = this.euClient;
      this.bucket = this.euConfig.bucket;
    }
  }

  // Override key S3Store methods to ensure correct region
  async create(upload: Upload): Promise<Upload> {
    try {
      await this.ensureCorrectRegion(upload.id);
      return await super.create(upload);
    } catch (error) {
      throw error;
    }
  }

  async write(stream: Readable, id: string, offset: number): Promise<number> {
    try {
      await this.ensureCorrectRegion(id);
      return await super.write(stream, id, offset);
    } catch (error) {
      throw error;
    }
  }

  async getUpload(id: string): Promise<Upload> {
    try {
      await this.ensureCorrectRegion(id);
      return await super.getUpload(id);
    } catch (error) {
      throw error;
    }
  }

  async remove(id: string): Promise<void> {
    try {
      await this.ensureCorrectRegion(id);
      // Clean up cache entry
      const teamId = this.extractTeamIdFromUploadId(id);
      if (teamId) {
        this.teamStorageCache.delete(teamId);
      }
      await super.remove(id);
    } catch (error) {
      throw error;
    }
  }

  async declareUploadLength(id: string, length: number): Promise<void> {
    try {
      await this.ensureCorrectRegion(id);
      await super.declareUploadLength(id, length);
    } catch (error) {
      throw error;
    }
  }

  async read(id: string): Promise<Readable> {
    try {
      await this.ensureCorrectRegion(id);
      return await super.read(id);
    } catch (error) {
      throw error;
    }
  }
}

mfts avatar Jun 25 '25 15:06 mfts

my solution is overrite some protected methods in S3Store to my custom class. s3Store.bucket is also protected variable, so we must do overrite to your own class to use custom bucketName. On client i use uppy, and reffering bucketname and prefix into metadata on moment initing uppy. My code - is example how it can works, but you need to think how to dispose out DynamicBucketS3Store.targetBucket in state of that class. i ll do it later

import { S3Client } from "@aws-sdk/client-s3"
import { IncomingMessage } from "http"
import { s3Config } from "./config.ts"
import { ERRORS, Upload } from "@tus/server"
import type AWS from "@aws-sdk/client-s3"

const defaultBucket = process.env.DEFAULT_BUCKET

export class DynamicBucketS3Store extends S3Store {
    private targetBucket: string = ""
    override async create(upload: Upload): Promise<Upload> {
        const bucketName = upload.metadata!.bucket!

        upload.metadata!["tus-bucket"] = bucketName
        delete upload.metadata!.bucket
        this.bucket = bucketName

        this.targetBucket = bucketName

        return await super.create(upload)
    }

    // тут метод write
    async getUpload(id: string): Promise<Upload> {
        let metadata: MetadataValue
        try {
            metadata = await this.getMetadata(id)
        } catch (error) {
            throw ERRORS.FILE_NOT_FOUND
        }

        let offset = 0

        try {
            const parts = await this.retrieveParts(id)
            offset = calcOffsetFromParts(parts)
        } catch (error: any) {
            // Check if the error is caused by the upload not being found. This happens
            // when the multipart upload has already been completed or aborted. Since
            // we already found the info object, we know that the upload has been
            // completed and therefore can ensure the the offset is the size.
            // AWS S3 returns NoSuchUpload, but other implementations, such as DigitalOcean
            // Spaces, can also return NoSuchKey.
            if (error.Code === "NoSuchUpload" || error.Code === "NoSuchKey") {
                return new Upload({
                    ...metadata.file,
                    offset: metadata.file.size as number,
                    size: metadata.file.size,
                    metadata: metadata.file.metadata,
                    storage: metadata.file.storage,
                })
            }

            throw error
        }

        const incompletePartSize = await this.getIncompletePartSize(id)

        return new Upload({
            ...metadata.file,
            offset: offset + (incompletePartSize ?? 0),
            size: metadata.file.size,
            storage: metadata.file.storage,
        })
    }
    async getMetadata(id: string): Promise<MetadataValue> {
        const cached = await this.cache.get(id)
        if (cached) {
            return cached
        }

        const { Metadata, Body } = await this.client.getObject({
            Bucket: this.targetBucket,
            Key: this.infoKey(id),
        })
        const file = JSON.parse((await Body?.transformToString()) as string)
        const metadata: MetadataValue = {
            "tus-version": Metadata?.["tus-version"] as string,
            "upload-id": Metadata?.["upload-id"] as string,
            file: new Upload({
                id,
                size: Number.isFinite(file.size) ? Number.parseInt(file.size, 10) : undefined,
                offset: Number.parseInt(file.offset, 10),
                metadata: file.metadata,
                creation_date: file.creation_date,
                storage: file.storage,
            }),
        }
        await this.cache.set(id, metadata)
        return metadata
    }
    async retrieveParts(id: string, partNumberMarker?: string): Promise<Array<AWS.Part>> {
        const metadata = await this.getMetadata(id)

        const params: AWS.ListPartsCommandInput = {
            Bucket: this.targetBucket,
            Key: id,
            UploadId: metadata["upload-id"],
            PartNumberMarker: partNumberMarker,
        }

        const data = await this.client.listParts(params)

        let parts = data.Parts ?? []

        if (data.IsTruncated) {
            const rest = await this.retrieveParts(id, data.NextPartNumberMarker)
            parts = [...parts, ...rest]
        }

        if (!partNumberMarker) {
            // biome-ignore lint/style/noNonNullAssertion: it's fine
            parts.sort((a, b) => a.PartNumber! - b.PartNumber!)
        }

        return parts
    }

    async remove(id: string): Promise<void> {
        const upload = await this.getUpload(id)
        const bucketName = this.targetBucket

        try {
            return await super.remove(id)
        } finally {
            this.bucket = bucketName
        }
    }
}

function calcOffsetFromParts(parts?: any) {
    // @ts-expect-error not undefined
    return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0
}

vyachglukhov-misis avatar Aug 15 '25 09:08 vyachglukhov-misis