sql icon indicating copy to clipboard operation
sql copied to clipboard

[RFC] Dynamic Fields in PPL Calcite Implementation

Open ykmr1224 opened this issue 3 months ago • 6 comments

RFC: Dynamic Fields in PPL Calcite Implementation

Abstract

This RFC proposes an implementation approach for dynamic fields support in PPL Calcite integration and evaluates the MAP-based approach against alternative solutions. Dynamic fields enable PPL commands like spath and timechart to handle semi-structured data where column names and types are determined at runtime rather than query planning time.

Related issue: https://github.com/opensearch-project/sql/issues/4112

Background

Problem Statement

OpenSearch documents often contain nested JSON structures and semi-structured data where field names and types are not known until query execution. Traditional SQL systems assume fixed schemas at planning time, creating challenges for:

  1. spath Command: Extracting arbitrary JSON paths from documents
  2. timechart Command: Creating pivot-style results where fields depend on data values
  3. Schema Evolution: Handling documents with varying structures
  4. Type Preservation: Maintaining original JSON types (int, boolean, double)

Requirements

  • Runtime field Generation: Support fields determined by data content
  • Calcite Integration: Work within Apache Calcite's type system and optimization framework
  • Compatibility: Works with existing (and future) PPL commands
  • Performance: Minimal overhead compared to static field access
  • Extensibility: Foundation for additional dynamic fields commands
  • Type Preservation: Maintain original JSON data types

Proposed Solution: MAP-Based Approach

Architecture Overview

The proposed solution uses a MAP-based storage approach to handle dynamic fields in PPL commands. Instead of trying to predict unknown field names at planning time, this approach stores dynamic fields in a structured MAP container that can be accessed efficiently during query execution.

Core Principles

1. Structured Dynamic Storage

Dynamic fields are stored in a MAP<STRING, ANY> structure where:

  • Keys: Field names determined at runtime (e.g., JSON paths, pivot column names)
  • Values: Actual field values with preserved types
  • Container: Well-defined MAP type that Calcite can optimize

2. Type System Integration

  • MAP Type Support: Leverages Calcite's native MAP type support
  • Type Preservation: Uses ANY type to maintain original JSON data types
  • Schema Visibility: MAP field appears in schema metadata for tools and optimization

3. Two-Phase Field Resolution

  • Static Fields: Resolved normally through existing schema
  • Dynamic Fields: Resolved to MAP access operations when field not found in static schema
  • Fallback Strategy: Clear error handling when fields don't exist in either location

Implementation Details

Schema Extension

// Row type with dynamic fields support
RelDataType rowType = typeFactory.createStructType(
    List.of(
        // Static fields
        typeFactory.createSqlType(SqlTypeName.INTEGER),  // id
        typeFactory.createSqlType(SqlTypeName.VARCHAR),  // name
        // Dynamic fields MAP
        typeFactory.createMapType(
            typeFactory.createSqlType(SqlTypeName.VARCHAR), // keys
            typeFactory.createSqlType(SqlTypeName.ANY)      // values
        )
    ),
    List.of("id", "name", "_MAP")
);

Field Resolution

// Enhanced field resolution for dynamic columns
public RexNode resolveField(String fieldName, RelDataType rowType) {
    // Try static fields first
    if (hasStaticField(fieldName, rowType)) {
        return createStaticFieldAccess(fieldName, rowType);
    }
    
    // Check for dynamic columns in MAP
    if (hasDynamicColumns(rowType)) {
        return createMapAccess(fieldName, rowType);
    }
    
    throw new FieldNotFoundException(fieldName);
}

Keep fields unique

To avoid inconsistency, we need to keep fields unique among static/dynamic fields. When new static field is assigned, we need to remove the field from _MAP. When new dynamic field is added, we need to merge the dynamic fields with static fields.

frame0 = [attr1, attr2, _MAP[attr3, attr4]]
 _MAP is somehow updated (could include field addition, but it depends on the data)
frame1 = [attr1, attr2, _MAP[attr3, attr4, ?]]
 
frame2 = [attr1=coalesce(_MAP[attr1], attr1), attr2=coalesce(_MAP[attr2], attr2), _MAP=_MAP.remove(attr1, attr2)]

Join

Join will require merge between two schema. This includes map merge and dedupe between static fields and dynamic fields.

left = [attr1, attr2, _MAP[attr3, attr4]]
right = [attr1, attr3, attr5, _MAP[attr2, attr4, attr6]]

result = [attr1, attr2, attr3, attr5, _MAP[attr4, attr6]]
where:
attr1 = right.attr1
attr2 = coalesce(right._MAP[attr2], left.attr2)
attr3 = right.attr3
attr5 = right.attr5
_MAP = merge(left._MAP, right._MAP).remove(attr1, attr2, attr3, attr5)

Type can be inferred for static/static field merge (attr1, attr3, attr5 will derive same type from right). static/dynamic or dynamic/dynamic field merge (attr2, attr4, attr6) will result in ANY type.

Wildcard pattern expansion

fields command allow wildcard pattern to specify field names and its order, like fields foo*, bar, *. This requires special handling as we cannot store field order information within _MAP. We can only do the field selection, and leave the ordering until it is actually needed. -> this could cause problem when join is done after fields (It might be better make it a limitation, though there are some workaround to handle it)

Field name filtering is feasible with _MAP (just simply remove keys based the name)

Type Preservation

The implementation preserves original JSON types through the ANY type system:

{"age": 30} → age: 30 (integer)
{"active": true} → active: true (boolean)
{"price": 99.99} → price: 99.99 (double)

To apply type specific functions, it requires explicit cast. One option is providing automatic cast for ease of use. User can remap to specific type by eval like: eval age=cast(age as integer)

Concerns

  • Complexity Resulting query plan will become complex due to map attribute reference and map merge
  • Stability Each command can easily ignore existence of _MAP field, which could lead to incorrect result

Alternative Implementation: MAP-Only Schema

An alternative approach within the MAP-based solution is to use only the _MAP field when dynamic fields are needed, eliminating the hybrid static/dynamic schema complexity. To address type information loss, we can create a custom RelDataType that maintains known field types during planning while using a single _MAP field at runtime:

// Custom RelDataType that tracks known fields while using MAP storage
public class MapOnlyRelDataType extends RelRecordType {
    private final Map<String, RelDataType> knownFieldTypes;
    private final RelDataType mapFieldType;
    
    public MapOnlyRelDataType(RelDataTypeFactory typeFactory, 
                              Map<String, RelDataType> knownFields) {
        // Physical schema: single MAP field
        super(List.of(
            new RelDataTypeFieldImpl("_MAP", 0, 
                typeFactory.createMapType(
                    typeFactory.createSqlType(SqlTypeName.VARCHAR),
                    typeFactory.createSqlType(SqlTypeName.ANY)
                ))
        ));
        
        // Logical schema: track known field types for planning
        this.knownFieldTypes = new HashMap<>(knownFields);
        this.mapFieldType = getFieldList().get(0).getType();
    }
    
    public RelDataType getKnownFieldType(String fieldName) {
        return knownFieldTypes.get(fieldName);
    }
    
    public boolean hasKnownField(String fieldName) {
        return knownFieldTypes.containsKey(fieldName);
    }
}

Type-Aware Field Resolution

// Field resolution with type information preservation
public RexNode resolveField(String fieldName, RelDataType rowType) {
    if (rowType instanceof MapOnlyRelDataType mapOnlyType) {
        RexNode mapAccess = createMapAccess(fieldName, 0);
        
        // Apply known type information for optimization
        if (mapOnlyType.hasKnownField(fieldName)) {
            RelDataType knownType = mapOnlyType.getKnownFieldType(fieldName);
            return rexBuilder.makeCast(knownType, mapAccess);
        }
        
        return mapAccess; // Unknown field, keep as ANY
    }
    
    return resolve(fieldName, rowType);
}

Benefits of MAP-Only

  • Runtime Simplicity: Single MAP field eliminates schema complexity
  • Type Information Preserved: Known fields retain type information for optimization
  • Planning-Time Optimization: Calcite can optimize based on known field types
  • Flexible Evolution: New fields can be added without schema changes

Trade-offs

  • Implementation Complexity: Any command which project field need to consider the _MAP field to reflect the update.
  • Memory Overhead: All fields are stored as key-value pairs instead of direct struct fields
  • Cast Overhead: Known fields require runtime casting from ANY to specific types
  • Optimization complexity: Harder to detect the optimization opportunity to convert to fixed schema plan

Alternative Approach: DynamicRecordType

Overview

Apache Calcite provides DynamicRecordTypeImpl for handling dynamic schemas. However, this approach is designed for Calcite's SQL implementation workflow, which differs significantly from the current PPL implementation architecture.

How DynamicRecordType Works in Calcite SQL

In Calcite's SQL implementation, DynamicRecordType follows a specific workflow that relies on the query validation phase:

1. Query Validation Phase

During SQL query validation, DynamicRecordTypeImpl automatically collects all field names that are referenced in the query. When a field is accessed on a DynamicRecordType, it tracks these field references internally.

2. Static Schema Generation

After validation completes, the DynamicRecordType has collected all the field names that were referenced in the query. This information is used to generate a static record type that includes all the referenced fields with appropriate types.

3. Planning with Static Schema

The planning phase then works with this static schema where all field references are known at planning time. This enables Calcite's full optimization capabilities since the schema is fixed during planning.

PPL Implementation Challenges

1: Lack of validation phase in Calcite PPL

The current PPL implementation lacks the validation phase that Calcite SQL relies on. To use DynamicRecordType in PPL, we need to introduce:

  • Option A: Validation Phase
    • This would require huge effort to separate field reference resolution from current single phase for building logical plan from AST (It would be full refactoring of CalciteRelNodeVisitor/CalciteRexNodeVisitor)
  • Option B: Planning-Time analysis
    • Possible workaround is to introduce planning time analysis to go through parent nodes in AST and come up with the list of fields to be used (This approach would not utilize DynamicRecordType)

2. DynamicRecordType cannot handle field listing

Some PPL commands requires field listing, but DynamicRecordType cannot handle it.

  • fillnull: apply to all the fields if field list is not specified
  • join: use field names to do natural join
  • fields / table: when wildcard pattern (e.g. foo*, *foo) is used

Alternative Approach: RelMetadata-Based Schema Probing

Overview

Another alternative approach is to utilize Calcite's RelMetadata system to perform probing queries that determine the actual schema during RelNode tree construction. This approach would query the data source to discover available fields before finalizing the schema.

Basic Concept

// Custom RelMetadata for dynamic field discovery
public interface DynamicFieldMetadata extends Metadata {
    MetadataDef<DynamicFieldMetadata> DEF = MetadataDef.of(DynamicFieldMetadata.class);
    
    // Probe data source to discover available fields
    Set<String> getAvailableFields(RelNode rel);
    
    // Get field type information from probing
    Map<String, RelDataType> getFieldTypes(RelNode rel);
}

// Schema determination during planning
public RelDataType buildDynamicSchema(RelNode input) {
    DynamicFieldMetadata metadata = input.getCluster().getMetadataQuery()
        .metadata(DynamicFieldMetadata.class, input);
    
    Set<String> availableFields = metadata.getAvailableFields(input);
    Map<String, RelDataType> fieldTypes = metadata.getFieldTypes(input);
    
    // Build schema based on discovered fields
    return createSchemaFromDiscoveredFields(availableFields, fieldTypes);
}

Benefits

  • Static Schema: Results in traditional static schema that Calcite optimizes well
  • Data-Driven: Schema based on actual data content rather than predictions
  • Full Optimization: Enables all Calcite optimization capabilities

Limitations

  • Performance Overhead: Requires additional queries to probe schema
  • Data Dependency: Schema determination depends on current data state, and could be inaccurate/unstable
  • Complexity: Requires implementing custom RelMetadata providers

This approach is mentioned for completeness but is not recommended due to the performance overhead and complexity involved.

ykmr1224 avatar Oct 03 '25 00:10 ykmr1224

How to combine spath output with existing output? e.g.

t: msg="{\"age\":\"10\", \"b\": 1}"

source=t | eval b=2 | spath

expect is 
age=10, b=[1,2]

penghuo avatar Oct 03 '25 15:10 penghuo

How to combine spath output with existing output? e.g.

  1. MAP(mixed) approach
eval: [msg, b]
spath(step1): [msg, b, _MAP=json_extract_all(msg)]
spath(step2): [msg=mvappend(msg, _MAP['msg']), b=mvappend(b, _MAP['b']), _MAP=remove(_MAP, 'msg', 'b')]
result: [msg, b, _MAP{age}]   # type for msg and b is tracked by schema even at runtime

(mvappend is a function to append values and make multivalue as needed)

  1. MAP only approach
eval: [msg, b]
spath: [_MAP=map_merge({'msg': msg, 'b': b}, json_extract_all(msg)]
result: [_MAP{msg, b, age}]   # type for msg and b is tracked by RelDataType only during planning

(map_merge will merge maps, and internally does mvappend when the key overwraps)

ykmr1224 avatar Oct 03 '25 16:10 ykmr1224

Do we have any benchmarks that compare the three approaches?

I know there was a bit of implementation work on the alternatives, but not sure how testable they are.

Swiddis avatar Oct 03 '25 16:10 Swiddis

Based on offline discussion with @penghuo , We prefer MAP-Based Approach (mixed) considering we can utilize it to automatically handle extra fields not defined in schema. Action item: verify we can automatically eliminate the _MAP related operation when we don't need to handle extra fields.

ykmr1224 avatar Oct 03 '25 17:10 ykmr1224

Do we have any benchmarks that compare the three approaches?

Not so far. DynamicRecordType approach would not have much performance degradation for execution in my understanding, and we can see how much overhead MAP based approach have.

ykmr1224 avatar Oct 03 '25 17:10 ykmr1224

Could the current dynamic fields feature resolve the issue https://github.com/opensearch-project/sql/issues/4173 @ykmr1224

LantaoJin avatar Dec 05 '25 03:12 LantaoJin