Convert Compactor/Coordinator into generic distributed job execution service
Is your feature request related to a problem? Please describe.
As tablet management functions move from the TabletServer to the Manager in the elasticity branch, there is a concern that the Manager could become overloaded. Tasks such as examining RFile indices for splitting and compaction file selection (#3526).
Describe the solution you'd like Accumulo already contains a distributed job execution service, but it's hardcoded to only perform compactions. If you look at the api you will see that the Compactor's API is:
tabletserver.TExternalCompactionJob getRunningCompaction
string getRunningCompactionId
list<tabletserver.ActiveCompaction> getActiveCompactions
void cancel
and the Coordinator's API is:
tabletserver.TExternalCompactionJob getCompactionJob
void compactionCompleted
void compactionFailed
void updateCompactionStatus
TExternalCompactionList getRunningCompactions
TExternalCompactionList getCompletedCompactions
void cancel
If you were to add additional job types to this API, then you would end up with a bunch of similar methods that just use different object types as Thrift doesn't support inheritance (only composition, FWIW GRPC is the same with Protobuf). However, one approach would be to pass the job details, tabletserver.TExternalCompactionJob in this case, as a serialized Java object over the Thrift API. I think this work could be done in 3 steps:
- Make the Thrift API generic a. rename the methods to remove compaction from the name b. Modify the data structures to make them more generic
- Modify the Coordinator and Compactor to pass serialized Java objects over the Thrift API. The Java objects will be deserialized on the receiving end and the appropriate action taken. For example, the Compactor would call
Coordinator.getJob, deserialize the response and execute the logic for the job type. - Rename the components with more generic names.
Describe alternatives you've considered I surveyed some of the solutions listed at https://github.com/meirwah/awesome-workflow-engines and while I think some of them could work, they seem like overkill.
Additional context Add any other context or screenshots about the feature request here.
Java serialization is a bit iffy from a security perspective, thinking about this led to opening #3660. Could possibly use json for this with something like the following and we can limit the classes.
{
"class":"fully qualified name of class to deser"
"data":{"data the above class knows how to deser using gson"}
}
We could also consider using this new generic system for walog sorting. We currently have DistributedWorkQueue that facilitates farming walog sort work out to tservers. It might be nice to move that functionality out of ZK as it would remove a source of ZK writes.
This code is where the manager actually opens up data files to find a split point.
Java serialization is a bit iffy from a security perspective
Right, so I didn't specify implementation, but I was surely thinking of using Kryo to turn the Java object into a byte array and then pass that over Thrift using list<byte>. I don't have the API fully fleshed out, but if you passed the opid, java class name, and serialized object over the wire, that might be enough to facilitate a generic api.
Java serialization is a bit iffy from a security perspective
Java 17 records should be safer.
I have been doing research trying to find a cross-language solution so that others can write task workers in other languages or frameworks. The problem that I'm finding is that I can't create a generic message type, but then enforce specific attribute values in certain cases.
A Thrift example of this would be something like:
enum TaskMessageType {
COMPACTION_JOB_REQUEST,
COMPACTION_JOB_RESPONSE,
COMPACTION_STATUS_MSG,
COMPACTION_COMPLETED_MSG,
COMPACTION_FAILED_MSG
COMPACTIONS_COMPLETED_RESPONSE
}
struct TaskMessage {
1:TaskMessageType messageType
1:binary serializedMessage
}
service TaskManager {
TaskObject getTask(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:TaskMessage message
4:string taskID
)
void taskCompleted(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:TaskMessage message
)
void taskFailed(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:TaskMessage message
)
}
With this structure above, there is no way to enforce that when taskCompleted is called that the TaskMessageType is COMPACTION_COMPLETED_MSG in the TaskMessage. Even if we were to move the enum out of TaskMessage and add it as a field on taskFailed, there is no way to enforce that it has that value. In addition to Thrift, I looked at GRPC which uses ProtocolBuffers, and I don't see a way to do it there either. Obviously if we write the server component we can enforce the message type in the service method implementation. But if someone were to write a worker implementation in Go for example, it would not be clear to them from the Thrift IDL what message types are acceptable for the different service methods. My initial thought is to abandon the idea of supporting cross-language or user-supplied workers in any easy manner. I think that I can use a structure like this for the manager and worker implementations that we write. Thoughts on this?
I have been doing research trying to find a cross-language solution so that others can write task workers in other languages or frameworks.
Instead of trying to solve this at the RPC layer, it'd be better to just have a pluggable component, with an SPI. Users can plug in their own implementation, using whatever RPC they want for that. I don't think we need to provide a cross-language RPC for this. Cross-language RPC libraries seem to come with a lot of baggage and introduce a lot of complexity that we don't need to be tied to. Creating a pluggable SPI means the user can deploy as simple or as complex an implementation as they wish, and we just call the local interface. What it does after that is implementation details we don't need to be concerned with.
So, I abandoned the notion of a cross-language solution. I also did not go down the route of an SPI. What I have at [1] is a replacement for the existing CompactionCoordinator <-> Compactor Thrift APIs. The replacement is a generic TaskManager <-> TaskWorker APi (see tasks.thrift at [1]). If a complex type needs to be passed across the API, then it is of the Thrift type Task which contains a messageType and a message. The messageType is one of the values from the enum TaskMessageType and the message is a serialized JSON object (see TaskMessage.toThriftTask and TaskMessage.fromThriftTask). The serialized JSON objects are subclasses of TaskMessage and they can contain Thrift objects that are currently used by the code (see CompactionTask). These Thrift objects are serialized using the TBinaryProtocol (see ThriftSerializers) and then Base64 encoded when the java object is serialized to JSON.
I have created duplicate methods in Compactor and CompactionCoordinator for the generic API and I think my next step will be to remove the compaction specific methods. I should be able to prove pretty quickly that this can work using the existing ITs. I think the overall design here would be:
- A
TaskWorker(aka Compactor) would callgetTaskon theTaskManager(aka CompactionCoordinator) - The
TaskManagerretrieves the highest priority task from its Top-N PriorityQueue of work - The
TaskWorkerexecutes that task (just like compactions are executed today)
This would allow any TaskWorker to get any Task, but I think we want to shape that a little with the resource groups. I need to think about that further.
[1] https://github.com/apache/accumulo/compare/elasticity...dlmarion:accumulo:generic-task-api?expand=1
I looked at the linked changes and the following are some things I was thinking about while looking.
Not every Task will have a corresponding fate op. Noticed TaskMessage had a fate tx id.
May not want to assume a TaskRunner will only run one task at a time. For example a single TaskRunner process to works on finding split points could work on finding many split points concurrently. A task runner could request multiple task at a time and report on multiple task. Compactors would probably just run one task at a time still, but maybe not other task runners.
What is the field Task.taskManager expected to contain?
Wondering what the advantages and disadvantages of using the thrift enum type for the Task.messageType field.
Wondering if the Task thrift struct should have a string taskId field. Assuming this would be a uuid if following the compactor model where every task has a uuid.
Wondering if a class like CompactionTask could have a CompactionTask(Task task) constructor that knows how to deser that Task. When a compactor request a task, it knows what type it request and could directly call that constructor.
I've been thinking about this issue, and wondering about its scope. As written, this issue describes a "generic distributed job execution service". I would caution against being so generic that this capability subsumes Accumulo's purpose as a scalable database, or reinvents the wheel. We don't want Accumulo to sit alongside other distributed job frameworks, as though it were one of them. This server type should focus on performing Accumulo maintenance tasks, specifically, rather than generic jobs from anywhere (like fate is currently narrowly scoped to maintenance tasks, or how iterators are not general purpose execution services, but specifically process data in Accumulo). I'm sure the implementation is headed in the direction of serving Accumulo specifically, but I wanted to raise the point, just to check the scope of this. I think it's worth some caution and occasionally pressing the brakes on this, to keep the scope from being too broad.
I still think maybe it's worth writing a simple SPI for executing Accumulo maintenance tasks. Users could configure them to be run in a different service, but by default would use our own executor service implementation. I think that's how external compactions should have been written earlier, as well. I think generalizing that service allows us to revisit that idea of having this be pluggable, before it's finalized in another major release.
@ctubbsii said:
I've been thinking about this issue, and wondering about its scope. As written, this issue describes a "generic distributed job execution service". I would caution against being so generic that this capability subsumes Accumulo's purpose as a scalable database, or reinvents the wheel. We don't want Accumulo to sit alongside other distributed job frameworks, as though it were one of them. This server type should focus on performing Accumulo maintenance tasks, specifically, rather than generic jobs from anywhere (like fate is currently narrowly scoped to maintenance tasks, or how iterators are not general purpose execution services, but specifically process data in Accumulo). I'm sure the implementation is headed in the direction of serving Accumulo specifically, but I wanted to raise the point, just to check the scope of this. I think it's worth some caution and occasionally pressing the brakes on this, to keep the scope from being too broad.
Agreed, the intention here is to create a more generic API for running Accumulo maintenance tasks.
I still think maybe it's worth writing a simple SPI for executing Accumulo maintenance tasks. Users could configure them to be run in a different service, but by default would use our own executor service implementation. I think that's how external compactions should have been written earlier, as well. I think generalizing that service allows us to revisit that idea of having this be pluggable, before it's finalized in another major release.
I tried to wrap my head around how to do this / where to even start and couldn't come up with anything. It's possible that it's too early in the development stage to do this as the api is not stable. I'm punting on this for now. If someone else wants to take a crack at it, go for it.
@keith-turner said:
May not want to assume a TaskRunner will only run one task at a time. For example a single TaskRunner process to works on finding split points could work on finding many split points concurrently. A task runner could request multiple task at a time and report on multiple task. Compactors would probably just run one task at a time still, but maybe not other task runners.
I don't think this design precludes that, but I'm wondering how that works in a partial failure scenario. I think for now it might make sense to punt on this and continue down the current path and see where that leads us. For the partial failure scenario, we might want to think about the task containing multiple instructions and they either all succeed or all fail as a unit of work.
What is the field Task.taskManager expected to contain?
The address of the TaskManager (CompactionCoordinator)
Wondering what the advantages and disadvantages of using the thrift enum type for the Task.messageType field.
So, I looked at this. We can create an enum for the message type, but then we can't build logic into the enum like I have done for TaskMessageType. What I have now is a generic api for passing units of work, but a tight coupling on the message type and message body. When a Task thrift object is received, then TaskMessage.fromThiftTask is called to deserialize the message. The message class is derived from the message type, there is a tight coupling there for type enforcement.
Wondering if the Task thrift struct should have a string taskId field. Assuming this would be a uuid if following the compactor model where every task has a uuid.
Easy change to make.
I would caution against being so generic that this capability subsumes Accumulo's purpose as a scalable database, or reinvents the wheel.
Yeah I would agree we do not want solve more uses cases than we need. The goal of this work is support a limited set of known internal use cases and share code within Accumulo. So it only needs to be generic enough to support these known internal use cases.
I still think maybe it's worth writing a simple SPI for executing Accumulo maintenance tasks
External compactions and log sort both farm out work to different processes and use completely different internal implementation to do this. Getting those to use the same internal code and getting new elasticity use cases to use this new internal common code is going to be a learning experience. Creating an SPI (if its ever done) would be best defered until we learn more from creating this internal implementation.
I think that's how external compactions should have been written earlier, as well.
I think it turned out best that this was not done. That proposed SPI would have been tightly fitted to how Accumulo used to work for compaction and the SPI would have probably needed a major overhaul to adjust to the large changes in how compactions work now in elasticity. The SPIs for compactions that just do thing like select what files to compact in what queue and do not care how or where that is actually done have turned out to be fairly resilient to the changes and will only need slight modifications.
Creating an SPI (if its ever done) would be best defered until we learn more from creating this internal implementation.
I'm definitely supportive of holding off on an SPI until we flesh out what we would even need it to contain. Mostly I just wanted to plant the seed, so it could be considered as the work progresses. Thinking about modularizing well enough to support an eventual SPI throughout the development is a great way to inject some discipline into our internal APIs and can result in a better code base in the end, I think.
@keith-turner, @ctubbsii, others:
I have pushed changes to my branch (diff here) that modify the Compactor and CompactionCoordinator to use the new TaskManager and TaskRunner Thrift APIs. The RPCs between these two components are now using the new API methods and objects. The External Compaction ITs all pass. Wanted to get any feedback you might have on the approach before moving forward. The next step would be to rename the Compactor and make it more generic so that it can handle different types of tasks.