sql icon indicating copy to clipboard operation
sql copied to clipboard

Add Stream Source

Open penghuo opened this issue 3 years ago • 0 comments

Stream source interface

  • Optional<Offset> getLatestOffset()
    • return the latest Offset which map to S3Metadata. Notice, the StreamSource does NOT guarantee the Offset is mapping to unread files.
    • return empty if there is no data in data stream source.
// read stream from file data source
Set<Files> allFiles = fileDataSource.listAllObjects();

// get unread files
Set<Files> unreadFileds = Sets.*difference*(allFiles, seenObjects);

// update seenObjects
seenFiles = allFiles

Long latestBatchId = fileMetadataLog.getLatest()

if (!unreadFileds.isEmpty()) {
// has unread files
// update batchId, keep it monotonically increasing
    latestBatchId += 1;
// update s3MetadataLog    
    fileMetadataLog.add(latestBatchId, new S3Metadata(unreadFileds, latestBatchId));
    return Optional.of(new Offset(latestBatchId));
} else {
    return latestBatchId == -1 ? Optional.empty() : Optional.of(new Offset(latestBatchId));
}
  • Batch getBatch(Optional<Offset> start, Offset end)
    • return the Batch from stream source between (start, end].

Stream source state maintain

  • FileMetadataLog maintain the mapping between Offset and FileMetadata. The user of FileMetadataLog MUST maintain the monotonically increasing of Offset.
    • Optional<Pair<Long, FileMetadata>> getLatest(). return the latest Offset and FileMetaData.
    • List<FileMetadata> get(Optional<Long> start, Optional<Long> end). return the list of FileMetaData between Offset range in [start, end]
    • boolean add(Long offset, T metadata). add Offset and FileMetaData.
  • SeenFiles, maintain the seen files from stream source so far.

penghuo avatar Oct 26 '22 18:10 penghuo