nats.go icon indicating copy to clipboard operation
nats.go copied to clipboard

Partial PUT and Get objectstore

Open Qhodok opened this issue 2 years ago • 11 comments

Feature Request

  • append object with slice byte
  • watch / get object with total byte

Use Case:

Proposed Change:

stream.PutBytes("key",[]byte{},ObjectOpt.Append, ObjectOpt.Any) stream.PutBytes("key",[]byte{},ObjectOpt.Append, ObjectOpt.EOF) stream.GetBytes("key",ObjectOpt.From(0),ObjectOpt.Total(3200));

type ObjectInfo struct { ObjectMeta Bucket string json:"bucket" NUID string json:"nuid" Size uint64 json:"size" ModTime time.Time json:"mtime" Chunks uint32 json:"chunks" Digest string json:"digest,omitempty" Deleted bool json:"deleted,omitempty" EOF bool `json:"eof" }

Who Benefits From The Change(s)?

anyone need streaming method with minimum latensi we can get or put with small byte, prevent failed upload a huge object because of poor connection. client can get partial byte before producer compleate send object

Alternative Approaches

Qhodok avatar Jul 26 '22 01:07 Qhodok

I am playing with the object store at the moment and try to implement an rclone backend for it. Everything seems to work (copy, sync, ls etc.) but i think it is not possible to fullfill seek or range requests at the moment: https://github.com/HeavyHorst/rclone/blob/master/backend/nats/nats.go#L331

It would be nice to be able to specify an offset and limit to the Get methods.

HeavyHorst avatar Dec 29 '22 15:12 HeavyHorst

any update about it?

Qhodok avatar Apr 06 '23 05:04 Qhodok

This is a fairly different design than the current implementation that we have. One thing to realize is that the object store is a client abstraction that has a foundation on jetstream. So the server doesn't have any logic outside of serving data from a stream.

Currently the entry (metadata) captures the number of bytes and checksum, etc. But when the client gets the object, it simply plays back the storage for the object. If we were to update the metadata with each put, it would become possible for the client to receive more data than initially expected (as per the append - as additional data would be received) - the same is true for ranged requests, but perhaps we can address this with additional metadata that describes the size/checksum for each of the chunks that is stored.

With that said, there's something we should be able to do here. We are about to revisit the current implementations of object store feedback such as this is incredibly useful in guiding the implementation.

aricart avatar Apr 17 '23 16:04 aricart

I would also really like to have this functionality. I'm experimenting with implementing an OCI registry on top of JetStream Object Store, and the ability to append data through an io.Write interface seems to be the only missing piece.

With the current API, I am forced to buffer large uploads to files. Resuming uploads also requires workarounds to the filesystem.

I would be happy to share code, if you like.

robinkb avatar Mar 22 '24 14:03 robinkb

I've decided to take a crack at providing this functionality by implementing an ObjectWriter that satisfies the io.WriteCloser interface, based heavily on the code in ObjectStore.Put. Based on that work, I can try to implement appending functionality to the ObjectWriter. I will publish the code when it's ready.

robinkb avatar Mar 25 '24 11:03 robinkb

The appending functionality means that the metadata will be updated (and will have widely differing timestamp than the other chunks. It also means that additional chunks will have to be added and an update to the checksum. Not sure if this is a better strategy than having multiple objects represent the contents that are required. @robinkb could you explain a bit more about the data that your are storing? In other platforms, you would be required to read the entire contents of the data, modify, and then re-put.

aricart avatar Mar 26 '24 14:03 aricart

I'm looking into this as part of a personal experiment on implementing an OCI Registry on top of NATS JetStreams' Object Store. Basically, what I'm doing is taking the CNCF Distribution project and implementing a storage driver, using Distribution's testsuite to validate it.

For quick reference, the driver has to implement the following interface:

// StorageDriver defines methods that a Storage Driver must implement for a
// filesystem-like key/value object storage. Storage Drivers are automatically
// registered via an internal registration mechanism, and generally created
// via the StorageDriverFactory interface (https://godoc.org/github.com/distribution/distribution/registry/storage/driver/factory).
// Please see the aforementioned factory package for example code showing how to get an instance
// of a StorageDriver
type StorageDriver interface {
	// Name returns the human-readable "name" of the driver, useful in error
	// messages and logging. By convention, this will just be the registration
	// name, but drivers may provide other information here.
	Name() string

	// GetContent retrieves the content stored at "path" as a []byte.
	// This should primarily be used for small objects.
	GetContent(ctx context.Context, path string) ([]byte, error)

	// PutContent stores the []byte content at a location designated by "path".
	// This should primarily be used for small objects.
	PutContent(ctx context.Context, path string, content []byte) error

	// Reader retrieves an io.ReadCloser for the content stored at "path"
	// with a given byte offset.
	// May be used to resume reading a stream by providing a nonzero offset.
	Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error)

	// Writer returns a FileWriter which will store the content written to it
	// at the location designated by "path" after the call to Commit.
	// A path may be appended to if it has not been committed, or if the
	// existing committed content is zero length.
	//
	// The behaviour of appending to paths with non-empty committed content is
	// undefined. Specific implementations may document their own behavior.
	Writer(ctx context.Context, path string, append bool) (FileWriter, error)

	// Stat retrieves the FileInfo for the given path, including the current
	// size in bytes and the creation time.
	Stat(ctx context.Context, path string) (FileInfo, error)

	// List returns a list of the objects that are direct descendants of the
	// given path.
	List(ctx context.Context, path string) ([]string, error)

	// Move moves an object stored at sourcePath to destPath, removing the
	// original object.
	// Note: This may be no more efficient than a copy followed by a delete for
	// many implementations.
	Move(ctx context.Context, sourcePath string, destPath string) error

	// Delete recursively deletes all objects stored at "path" and its subpaths.
	Delete(ctx context.Context, path string) error

	// RedirectURL returns a URL which the client of the request r may use
	// to retrieve the content stored at path. Returning the empty string
	// signals that the request may not be redirected.
	RedirectURL(r *http.Request, path string) (string, error)

	// Walk traverses a filesystem defined within driver, starting
	// from the given path, calling f on each file.
	// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
	// to a directory, the directory will not be entered and Walk
	// will continue the traversal.
	// If the returned error from the WalkFn is ErrFilledBuffer, processing stops.
	Walk(ctx context.Context, path string, f WalkFn, options ...func(*WalkOptions)) error
}

Along with the FileWriter:

// FileWriter provides an abstraction for an opened writable file-like object in
// the storage backend. The FileWriter must flush all content written to it on
// the call to Close, but is only required to make its content readable on a
// call to Commit.
type FileWriter interface {
	io.WriteCloser

	// Size returns the number of bytes written to this FileWriter.
	Size() int64

	// Cancel removes any written content from this FileWriter.
	Cancel(context.Context) error

	// Commit flushes all content written to this FileWriter and makes it
	// available for future calls to StorageDriver.GetContent and
	// StorageDriver.Reader.
	Commit(context.Context) error
}

The source code for my experiment is published at robinkb/cascade-registry. It fully implements the StorageDriver interface and passes all of the tests, using my customizations in nats.go.

All of this code is very much me hacking away. I did get appending working though. See AppendWriter. The trick is persisting the internal state of the sha256 hasher in the Object's Header and restoring it when appending. Then when the AppendWriter is closed, update the info again.

I probably did some ugly stuff along the way, because this is the first time that I am working with NATS at all :sweat_smile:

robinkb avatar Mar 26 '24 17:03 robinkb

Ah, I just realized that persisting and restoring the state of the hasher is Go-specific. This is not compatible with other language clients, and so not a good solution. So unless there is a way to make that language-agnostic, you are absolutely right.

I do think that the ObjectWriter is useful, though.

robinkb avatar Mar 27 '24 10:03 robinkb

Correct, it adds complexity that is not necessary. Instead I suggest that you think of object store as a tree of objects. There's nothing preventing the entries from pointing to different objects or suggesting that entries follow some sequential order. For example /data/a.../data/z or a.1...a.23 - You can even consider one of the entries in the object store to be an index where you specify the entries that make the set.

aricart avatar Mar 27 '24 15:03 aricart