inlong
inlong copied to clipboard
[Feature] Agent on InLong Transform
Motivation
- Agent (File collection) needs the ability to filter and collect valid data content.
- Agent (Pulsar collection) requires PB protocol parsing and data extraction capabilities.
Solution
- Transform is integrated as an SDK by Agent; Manager will also integrate Transform to provide pre-transformation validation when users configure transformation SQL.
- Before performing transformation processing, the Agent needs to register the transformation configuration pulled from Manager to Transform. When the transformation configuration changes, it needs to re-register the configuration to Transform based on Key: StreamSourceId.
- Agent-Sink passes StreamSourceId and RawData into Transform, and Transform returns zero or more FormalData. Agent-Sink sends the final FormalData to DataProxy.
- For Transform's registered configurations, there is one set of configurations per StreamSourceId, and one StreamSourceId belongs to one GroupId and StreamId's InLong data stream.
- Transform's transformation configuration includes three parts: transformation Source, transformation SQL, and transformation Sink.
- Transformation SQL first provides basic field filtering and field cropping. Other date and time conversion functions and string conversion functions will be supplemented later based on Flink's built-in functions.
Configuration Model
Interface API of Transform SDK
- TransformConfig register(String streamSourceId, TransformConfig config) throws TransformException;
- If the transformation SQL compilation fails, an exception is returned.
- Check the legality and non-null of the configuration, and return an exception if it fails.
- ProtoDefine automatically generates code and compiles it using a dynamic classloader; if it fails, an exception is returned.
- If the configuration validation is successful, the previous TransformConfig is returned; if it is a new configuration, null is returned.
- TransformConfig unregister(String streamSourceId);
- Unregister the configuration, return the previous TransformConfig; if it does not exist, return null.
- List<byte[]> transform(String streamSourceId, byte[] rawdata) throws TransformException;
- Synchronous interface
- Thread-safe
- Processing logic:
- Based on the SourceInfo configuration, parse rawdata, and generate Map<String, byte[]> or ProtoObject/JsonObject.
- Interpret the syntax tree generated by the transformation SQL and generate the result set List<Row>.
- Based on the SinkInfo configuration, convert the result set List<Row> into List<byte[]> and return it.
Task list
Use case
No response
Are you willing to submit PR?
- [X] Yes, I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct