texera
texera copied to clipboard
Amber Fault Tolerance: Logging
This PR includes a full lifecycle of log-based fault tolerance:
- Added
LogStorage
, which is an abstraction of where the log is persisted. Currently implemented LocalFS and HDFS. - Added
AsyncLogWriter
, which is a thread that pushes the serialized log records intoLogStorage
. - Added
SerializationManager
, which does the compression and conversion of determinant from in-mem format to byte array, controlled by theAsyncLogWriter
. - Added
LogManager
, which handles the logging for both data and control determinants and manages the lifecycle ofAsyncLogWriter
andLogStorage
. - Added
TimeService
andOperatorContext
, the operator can access the time service through the context in its own logic to get the timestamp logged by the log manager. We can also use the same context to put the pause manager if we want to let the operator pause itself. Right now due to some scala-java incompatibility issues oftrait
, I cannot put a member variable insideIOperatorExecutor
. We need to do code refactoring for this.
Current Design:
Purposed Design:
logging-related TODOs:
- ~~For better integration on the python side, the serialization of control messages transforms the scala control payload to protobuf objects. However, not all control messages can be transformed. This issue needs to be addressed. I'm thinking let the scala side also uses protobuf object directly.~~ In order to do this, we need to refactor a lot of objects to protobuf for the controller, which is not our focus right now. So I decided to leave this integration aside for now.
Future TODOs:
- I and zuozhi decided to keep all data sent in sender's memory until a checkpoint is made. This part will be added in the checkpoint implementation.