sql
sql copied to clipboard
Add Stream Source
Stream source interface
- Optional<Offset> getLatestOffset()
- return the latest Offset which map to S3Metadata. Notice, the StreamSource does NOT guarantee the Offset is mapping to unread files.
- return empty if there is no data in data stream source.
// read stream from file data source
Set<Files> allFiles = fileDataSource.listAllObjects();
// get unread files
Set<Files> unreadFileds = Sets.*difference*(allFiles, seenObjects);
// update seenObjects
seenFiles = allFiles
Long latestBatchId = fileMetadataLog.getLatest()
if (!unreadFileds.isEmpty()) {
// has unread files
// update batchId, keep it monotonically increasing
latestBatchId += 1;
// update s3MetadataLog
fileMetadataLog.add(latestBatchId, new S3Metadata(unreadFileds, latestBatchId));
return Optional.of(new Offset(latestBatchId));
} else {
return latestBatchId == -1 ? Optional.empty() : Optional.of(new Offset(latestBatchId));
}
- Batch getBatch(Optional<Offset> start, Offset end)
- return the Batch from stream source between (start, end].
Stream source state maintain
- FileMetadataLog maintain the mapping between Offset and FileMetadata. The user of FileMetadataLog MUST maintain the monotonically increasing of Offset.
Optional<Pair<Long, FileMetadata>> getLatest(). return the latest Offset and FileMetaData.List<FileMetadata> get(Optional<Long> start, Optional<Long> end). return the list of FileMetaData between Offset range in [start, end]boolean add(Long offset, T metadata). add Offset and FileMetaData.
- SeenFiles, maintain the seen files from stream source so far.