inlong icon indicating copy to clipboard operation
inlong copied to clipboard

[Feature] Agent on InLong Transform

Open luchunliang opened this issue 1 year ago • 0 comments

Motivation

  • Agent (File collection) needs the ability to filter and collect valid data content.
  • Agent (Pulsar collection) requires PB protocol parsing and data extraction capabilities.

Solution

  • Transform is integrated as an SDK by Agent; Manager will also integrate Transform to provide pre-transformation validation when users configure transformation SQL.
  • Before performing transformation processing, the Agent needs to register the transformation configuration pulled from Manager to Transform. When the transformation configuration changes, it needs to re-register the configuration to Transform based on Key: StreamSourceId.
  • Agent-Sink passes StreamSourceId and RawData into Transform, and Transform returns zero or more FormalData. Agent-Sink sends the final FormalData to DataProxy.
  • For Transform's registered configurations, there is one set of configurations per StreamSourceId, and one StreamSourceId belongs to one GroupId and StreamId's InLong data stream.
  • Transform's transformation configuration includes three parts: transformation Source, transformation SQL, and transformation Sink.
  • Transformation SQL first provides basic field filtering and field cropping. Other date and time conversion functions and string conversion functions will be supplemented later based on Flink's built-in functions. image

Configuration Model

image

Interface API of Transform SDK

  • TransformConfig register(String streamSourceId, TransformConfig config) throws TransformException;
    • If the transformation SQL compilation fails, an exception is returned.
    • Check the legality and non-null of the configuration, and return an exception if it fails.
    • ProtoDefine automatically generates code and compiles it using a dynamic classloader; if it fails, an exception is returned.
    • If the configuration validation is successful, the previous TransformConfig is returned; if it is a new configuration, null is returned.
  • TransformConfig unregister(String streamSourceId);
    • Unregister the configuration, return the previous TransformConfig; if it does not exist, return null.
  • List<byte[]> transform(String streamSourceId, byte[] rawdata) throws TransformException;
    • Synchronous interface
    • Thread-safe
    • Processing logic:
      • Based on the SourceInfo configuration, parse rawdata, and generate Map<String, byte[]> or ProtoObject/JsonObject.
      • Interpret the syntax tree generated by the transformation SQL and generate the result set List<Row>.
      • Based on the SinkInfo configuration, convert the result set List<Row> into List<byte[]> and return it.

Task list

Use case

No response

Are you willing to submit PR?

  • [X] Yes, I am willing to submit a PR!

Code of Conduct

luchunliang avatar Apr 20 '24 09:04 luchunliang