[RFC] JSON IDL
Tracking issue
https://github.com/flyteorg/flyte/issues/5318
Why are the changes needed?
What changes were proposed in this pull request?
How was this patch tested?
Setup process
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [ ] All commits are signed-off.
Related PRs
Docs link
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 36.21%. Comparing base (
025296a) to head (ed2497f). Report is 220 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #5607 +/- ##
==========================================
+ Coverage 35.89% 36.21% +0.31%
==========================================
Files 1301 1303 +2
Lines 109419 109560 +141
==========================================
+ Hits 39281 39674 +393
+ Misses 66041 65766 -275
- Partials 4097 4120 +23
| Flag | Coverage Δ | |
|---|---|---|
| unittests-datacatalog | 51.37% <ø> (ø) |
|
| unittests-flyteadmin | 55.63% <ø> (+1.93%) |
:arrow_up: |
| unittests-flytecopilot | 12.17% <ø> (ø) |
|
| unittests-flytectl | 62.17% <ø> (-0.11%) |
:arrow_down: |
| unittests-flyteidl | 7.12% <ø> (+0.03%) |
:arrow_up: |
| unittests-flyteplugins | 53.35% <ø> (+0.03%) |
:arrow_up: |
| unittests-flytepropeller | 41.76% <ø> (+<0.01%) |
:arrow_up: |
| unittests-flytestdlib | 55.33% <ø> (+0.06%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
A while ago we discussed this issue which is also related to how dataclasses etc. are serialized. @eapolinario remarked:
We're in the process of revisiting the dataclass transformer and will include this in the process. A good point raised during today's contributor meeting is that simply adding the tags to the struct message is not enough as that would break inheritance.
Is this RFC the mentioned process to revisit the dataclass transformer?
A while ago we discussed this issue which is also related to how dataclasses etc. are serialized. @eapolinario remarked:
We're in the process of revisiting the dataclass transformer and will include this in the process. A good point raised during today's contributor meeting is that simply adding the tags to the struct message is not enough as that would break inheritance.
Is this RFC the mentioned process to revisit the dataclass transformer?
yes, exactly. We've been experimenting with this idea and decided to make it official by following the RFC process.
A while ago we discussed this issue which is also related to how dataclasses etc. are serialized. @eapolinario remarked:
We're in the process of revisiting the dataclass transformer and will include this in the process. A good point raised during today's contributor meeting is that simply adding the tags to the struct message is not enough as that would break inheritance.
Is this RFC the mentioned process to revisit the dataclass transformer?
I can tell what might be a potential solution for it.
We should put identifier to metadata for different dataclasses.
@fg91 @eapolinario @wild-endeavor Does it look good to you? Can we merge it?
Hi all, I recently conducted performance tests and discovered that mashumaro does not support pydantic BaseModel. This means we need to handle different serialization behaviors between dataclass/dict and pydantic BaseModel in both the Flyte backend and frontend.
Additionally, we need to handle cases like the ones in this PR.
If we don't use the approach of Dataclass -> JSON String -> Msgpack Bytes, we will need to design a solution to support serialization/deserialization of subclasses of a generic class.
Serialization behavior:
- Dataclass/Dict: Python value -> MessagePack bytes
- Pydantic BaseModel: Python value -> JSON string -> MessagePack bytes
performance
encode
| Method | Elapsed Time (seconds) |
|---|---|
| Dataclass with Msgpack Mixin | 0.009213 |
| Dynamically Create Msgpack Mixin | 0.021703 |
| Dataclass -> JSON String -> Msgpack Bytes | 0.021909 |
| Mashumaro Msgpack Encoder | 0.008809 |
decode
| Method | Elapsed Time (seconds) |
|---|---|
| Dataclass with Msgpack Mixin Decode | 0.018864 |
| Dynamically Create Msgpack Mixin Decode | 0.028957 |
| Dataclass -> JSON String -> Msgpack Decode | 0.023912 |
| Mashumaro Msgpack Decode | 0.016360 |
example
encode
import msgpack
from dataclasses import dataclass, fields
from typing import List, Dict
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from mashumaro.codecs.json import JSONEncoder, JSONDecoder
from mashumaro.codecs.msgpack import MessagePackEncoder
import timeit
@dataclass
class InnerDataClass:
inner_attr1: int
inner_attr2: str
@dataclass
class MyDataClassWithMsgPackMixin(DataClassMessagePackMixin):
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
@dataclass
class MyDataClass:
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
# Initialize objects
dc_with_mixin = MyDataClassWithMsgPackMixin(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
dc = MyDataClass(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
new_dc = type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
encoder = JSONEncoder(MyDataClass)
# Test function 1: Directly use msgpack mixin
def test_msgpack_mixin():
return dc_with_mixin.to_msgpack()
# Test function 2: Dynamically create a class with msgpack mixin
def test_dynamic_msgpack_mixin():
new_instance = new_dc(**{field.name: getattr(dc_with_mixin, field.name) for field in fields(dc_with_mixin)})
return new_instance.to_msgpack()
def test_create_new_dc():
return type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
# Test function 3: Convert dataclass to JSON string and then encode to msgpack
def test_json_str_to_msgpack():
return msgpack.dumps(encoder.encode(dc))
def test_create_encoder():
return JSONEncoder(MyDataClass)
# Test function 4: Use MessagePackEncoder from mashumaro (without initialization)
def test_mashumaro_msgpack_encoder():
return mashumaro_encoder.encode(dc)
# Test function for initializing the mashumaro MessagePackEncoder
def test_create_mashumaro_encoder():
return MessagePackEncoder(MyDataClass)
# Initialize mashumaro encoder outside the test loop
mashumaro_encoder = test_create_mashumaro_encoder()
# Use timeit to test each method's performance, running 10000 times
iterations = 10000
time_mixin = timeit.timeit(test_msgpack_mixin, number=iterations)
time_dynamic = timeit.timeit(test_dynamic_msgpack_mixin, number=iterations)
time_dynamic_create_base_class = timeit.timeit(test_create_new_dc, number=1)
time_json = timeit.timeit(test_json_str_to_msgpack, number=iterations)
time_create_json_encoder = timeit.timeit(test_create_encoder, number=1)
time_mashumaro_msgpack = timeit.timeit(test_mashumaro_msgpack_encoder, number=iterations)
time_create_mashumaro_encoder = timeit.timeit(test_create_mashumaro_encoder, number=1)
# Output the results
print(f'{iterations} times dataclass with msgpack mixin elapsed time:', time_mixin)
print(f'{iterations} times dynamically create msgpack mixin elapsed time:', time_dynamic + time_dynamic_create_base_class)
print(f'{iterations} times dataclass -> json str -> msgpack bytes elapsed time:', time_json + time_create_json_encoder)
print(f'{iterations} times mashumaro msgpack encoder elapsed time:', time_mashumaro_msgpack + time_create_mashumaro_encoder)
# print(f'1 time creating mashumaro msgpack encoder elapsed time:', time_create_mashumaro_encoder)
import matplotlib.pyplot as plt
# Labels and timings for the four methods
methods = ['Msgpack Mixin', 'Dynamic Msgpack Mixin', 'JSON String to Msgpack', 'Mashumaro Msgpack Encoder']
times = [time_mixin, time_dynamic + time_dynamic_create_base_class, time_json + time_create_json_encoder, time_mashumaro_msgpack + time_create_mashumaro_encoder]
# Plotting the comparison
plt.figure(figsize=(30, 5))
plt.bar(methods, times, color=['blue', 'green', 'red', 'orange'])
plt.xlabel('Methods')
plt.ylabel('Elapsed Time (seconds)')
plt.title('Comparison of Serialization Methods')
plt.savefig('/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/performance_comparison/encode.png')
decode
import msgpack
from dataclasses import dataclass, fields
from typing import List, Dict
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from mashumaro.codecs.json import JSONEncoder, JSONDecoder
from mashumaro.codecs.msgpack import MessagePackEncoder, MessagePackDecoder
import timeit
@dataclass
class InnerDataClass(DataClassMessagePackMixin):
inner_attr1: int
inner_attr2: str
@dataclass
class MyDataClassWithMsgPackMixin(DataClassMessagePackMixin):
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
@dataclass
class MyDataClass:
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
# Initialize objects
dc_with_mixin = MyDataClassWithMsgPackMixin(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
dc = MyDataClass(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
new_dc = type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
encoder = JSONEncoder(MyDataClass)
decoder = JSONDecoder(MyDataClass)
# Serialize the object first to test decoding
msgpack_bytes_mixin = dc_with_mixin.to_msgpack()
msgpack_bytes_dynamic = new_dc(**{field.name: getattr(dc_with_mixin, field.name) for field in fields(dc_with_mixin)}).to_msgpack()
msgpack_bytes_json = msgpack.dumps(encoder.encode(dc))
# Test function 1: Decode using msgpack mixin
def test_msgpack_mixin_decode():
return MyDataClassWithMsgPackMixin.from_msgpack(msgpack_bytes_mixin)
# Test function 2: Dynamically create a class and decode using msgpack mixin
def test_dynamic_msgpack_mixin_decode():
new_instance = new_dc(**{field.name: getattr(dc_with_mixin, field.name) for field in fields(dc_with_mixin)})
return new_instance.from_msgpack(msgpack_bytes_dynamic)
def test_create_new_dc():
return type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
# Test function 3: Convert msgpack bytes to JSON string and decode back to dataclass
def test_json_str_to_msgpack_decode():
return decoder.decode(msgpack.loads(msgpack_bytes_json))
def test_create_decoder():
return JSONDecoder(MyDataClass)
# Test function 4: Use MessagePackDecoder from mashumaro
def test_create_mashumaro_decoder():
return MessagePackDecoder(MyDataClass)
# Initialize mashumaro decoder outside the test loop
mashumaro_decoder = test_create_mashumaro_decoder()
def test_mashumaro_msgpack_decoder():
return mashumaro_decoder.decode(msgpack_bytes_mixin)
# Test function for initializing the mashumaro MessagePackDecoder
# Use timeit to test each method's performance, running 10000 times
iterations = 10000
time_mixin_decode = timeit.timeit(test_msgpack_mixin_decode, number=iterations)
time_dynamic_decode = timeit.timeit(test_dynamic_msgpack_mixin_decode, number=iterations)
time_dynamic_create_base_class = timeit.timeit(test_create_new_dc, number=1)
time_json_decode = timeit.timeit(test_json_str_to_msgpack_decode, number=iterations)
time_create_json_decoder = timeit.timeit(test_create_decoder, number=1)
time_mashumaro_decode = timeit.timeit(test_mashumaro_msgpack_decoder, number=iterations)
time_create_mashumaro_decoder = timeit.timeit(test_create_mashumaro_decoder, number=1)
# Output the results
print(f'{iterations} times dataclass with msgpack mixin decode elapsed time:', time_mixin_decode)
print(f'{iterations} times dynamically create msgpack mixin decode elapsed time:', time_dynamic_decode + time_dynamic_create_base_class)
print(f'{iterations} times dataclass -> json str -> msgpack bytes decode elapsed time:', time_json_decode + time_create_json_decoder)
print(f'{iterations} times mashumaro msgpack decode elapsed time:', time_mashumaro_decode + time_create_mashumaro_decoder)
import matplotlib.pyplot as plt
# Labels and timings for the four methods
methods = ['Msgpack Mixin Decode', 'Dynamic Msgpack Mixin Decode', 'JSON String to Msgpack Decode', 'Mashumaro Msgpack Decode']
times = [time_mixin_decode, time_dynamic_decode + time_dynamic_create_base_class, time_json_decode + time_create_json_decoder, time_mashumaro_decode + time_create_mashumaro_decoder]
# Plotting the comparison
plt.figure(figsize=(30, 5))
plt.bar(methods, times, color=['blue', 'green', 'red', 'orange'])
plt.xlabel('Methods')
plt.ylabel('Elapsed Time (seconds)')
plt.title('Comparison of Decoding Methods')
plt.savefig('/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/performance_comparison/decode.png')
pydantic failure example
from pydantic import BaseModel
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from mashumaro.codecs.msgpack import MessagePackEncoder, MessagePackDecoder
# 1. Define a Pydantic model
class TrainConfig(BaseModel):
lr: float = 1e-3 # learning rate
batch_size: int = 32 # batch size for training
# 2. Use Mashumaro's MessagePackEncoder and MessagePackDecoder for serializing and deserializing Pydantic objects
# Serialization function
def serialize_train_config(train_config: TrainConfig) -> bytes:
encoder = MessagePackEncoder(TrainConfig) # Create an encoder for TrainConfig
return encoder.encode(train_config) # Serialize the TrainConfig object to MessagePack format
# Deserialization function
def deserialize_train_config(encoded_data: bytes) -> TrainConfig:
decoder = MessagePackDecoder(TrainConfig) # Create a decoder for TrainConfig
return decoder.decode(encoded_data) # Deserialize the MessagePack data back to a TrainConfig object
# Example usage:
train_config = TrainConfig(lr=0.001, batch_size=64) # Create a TrainConfig object with custom values
# Serialize the TrainConfig object
serialized_data = serialize_train_config(train_config)
print(f"Serialized Data: {serialized_data}")
# Deserialize the data back to a TrainConfig object
deserialized_config = deserialize_train_config(serialized_data)
print(f"Deserialized Config: lr={deserialized_config.lr}, batch_size={deserialized_config.batch_size}")
pydantic failure example error message
/Users/future-outlier/miniconda3/envs/dev/bin/python /Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/pydantic_example.py
Traceback (most recent call last):
File "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/pydantic_example.py", line 26, in <module>
serialized_data = serialize_train_config(train_config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/pydantic_example.py", line 14, in serialize_train_config
encoder = MessagePackEncoder(TrainConfig) # Create an encoder for TrainConfig
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/future-outlier/miniconda3/envs/dev/lib/python3.12/site-packages/mashumaro/codecs/msgpack.py", line 107, in __init__
code_builder.add_encode_method(shape_type, self, post_encoder_func)
File "/Users/future-outlier/miniconda3/envs/dev/lib/python3.12/site-packages/mashumaro/codecs/_builder.py", line 78, in add_encode_method
packed_value = PackerRegistry.get(
^^^^^^^^^^^^^^^^^^^
File "/Users/future-outlier/miniconda3/envs/dev/lib/python3.12/site-packages/mashumaro/core/meta/types/common.py", line 241, in get
raise UnserializableField(
mashumaro.exceptions.UnserializableField: Field "" of type TrainConfig in __root__ is not serializable
Process finished with exit code 1
Building a JSON schema is not an issue for the Dataclass Transformer.
Whether you have a pure dataclass or one that incorporatesDataClassMessagePackMixin, DataClassJSONMixin, or both, it will successfully generate a JSON schema.
You can verify this by running the following example:
import os
import tempfile
from dataclasses import dataclass
from flytekit.core.type_engine import TypeEngine
import pandas as pd
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
from mashumaro.mixins.json import DataClassJSONMixin
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
@dataclass
class Datum:
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes:
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
transformer = TypeEngine.get_transformer(FlyteTypes)
lt1_1 = transformer.get_literal_type(Datum)
lt1_2 = transformer.get_literal_type(FlyteTypes)
@dataclass
class Datum(DataClassMessagePackMixin, DataClassJSONMixin):
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes(DataClassMessagePackMixin, DataClassJSONMixin):
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
lt2_1 = transformer.get_literal_type(Datum)
lt2_2 = transformer.get_literal_type(FlyteTypes)
@dataclass
class Datum(DataClassJSONMixin):
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes(DataClassJSONMixin):
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
lt3_1 = transformer.get_literal_type(Datum)
lt3_2 = transformer.get_literal_type(FlyteTypes)
@dataclass
class Datum(DataClassMessagePackMixin):
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes(DataClassMessagePackMixin):
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
lt4_1 = transformer.get_literal_type(Datum)
lt4_2 = transformer.get_literal_type(FlyteTypes)
assert lt1_1 == lt2_1 and lt1_1 == lt3_1 and lt1_1 == lt4_1
assert lt1_2 == lt2_2 and lt1_2 == lt3_2 and lt1_2 == lt4_2
I also found that directly encoding by msgpack.dumps is the best way to do it.
terminal output
output 1
MsgPack serialized data size: 15757449 bytes
MsgPack serialized JSON string size: 21777797 bytes
UTF-8 encoded JSON string size: 21777792 bytes
output 2
code example
output 1
import msgpack
import json
# Step 1: Create a dictionary with 100000 key-value pairs using a for loop
d = {f'key_{i}': i for i in range(1, 1000001)}
# Step 2: Serialize dictionary using msgpack
msgpack_data = msgpack.dumps(d)
msgpack_data_size = len(msgpack_data)
# Step 3: Convert dictionary to JSON string and serialize using msgpack
json_str = json.dumps(d)
msgpack_json_data = msgpack.dumps(json_str)
msgpack_json_data_size = len(msgpack_json_data)
# Step 4: Encode the JSON string using UTF-8 and compare size
encoded_json_str = json_str.encode('utf-8')
encoded_json_str_size = len(encoded_json_str)
# Print size comparison
print(f"MsgPack serialized data size: {msgpack_data_size} bytes")
print(f"MsgPack serialized JSON string size: {msgpack_json_data_size} bytes")
print(f"UTF-8 encoded JSON string size: {encoded_json_str_size} bytes")
output 2
import msgpack
import json
import matplotlib.pyplot as plt
# Function to measure sizes for given number of attributes
def measure_sizes(n):
# Create a dictionary with n key-value pairs
d = {f'key_{i}': i for i in range(1, n+1)}
# Serialize dictionary using msgpack
msgpack_data = msgpack.dumps(d)
msgpack_data_size = len(msgpack_data)
# Convert dictionary to JSON string and serialize using msgpack
json_str = json.dumps(d)
msgpack_json_data = msgpack.dumps(json_str)
msgpack_json_data_size = len(msgpack_json_data)
# Encode the JSON string using UTF-8
encoded_json_str = json_str.encode('utf-8')
encoded_json_str_size = len(encoded_json_str)
return msgpack_data_size, msgpack_json_data_size, encoded_json_str_size
# Define test points for 100000, 200000, ..., 1000000 attributes
num_attributes = [100000 * i for i in range(1, 11)]
msgpack_sizes = []
msgpack_json_sizes = []
utf8_json_sizes = []
# Measure sizes for each point
for n in num_attributes:
msgpack_size, msgpack_json_size, utf8_json_size = measure_sizes(n)
msgpack_sizes.append(msgpack_size)
msgpack_json_sizes.append(msgpack_json_size)
utf8_json_sizes.append(utf8_json_size)
# Plot the results
plt.plot(num_attributes, msgpack_sizes, label='MsgPack serialized data size')
plt.plot(num_attributes, msgpack_json_sizes, label='MsgPack serialized JSON string size')
plt.plot(num_attributes, utf8_json_sizes, label='UTF-8 encoded JSON string size')
plt.xlabel('Number of attributes')
plt.ylabel('Size (bytes)')
plt.title('Comparison of Serialized Data Sizes')
plt.legend()
plt.grid(True)
plt.show()
The official msgpack documentation says: "MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. But it's faster and smaller. Small integers are encoded into a single byte, and typical short strings require only one extra byte in addition to the strings themselves."
I also found that if we use msgpack to serialize strings, we can't get any benefit from it.
JSON IDL Discussion
Prerequisite:
The idea is to store bytes in literal, so that we can serialize and deserialize it successfully.
There's 2 dimension we can think of, the JSON types and the way to do serialization and deserialization.
JSON types
Now we only support python's default JSON type, see here. But there's other JSON format and each has some difference, maybe there will be user wants it in the future.
serialization and deserialization APIs
We decided to use msgpack now, but there's also other API to do serialization and deserialization.
for example, Zlib library can compress the JSON data to more lighter, but also requires more time.
size
speed
Discussion
We have 2 ways to implement it, either use a new IDL type or reuse the Binary IDL.
Add a new IDL called JSON
message Json {
bytes value = 1;
}
propeller compiler
check if literal scalar simple type is JSON or not.
propeller attribute access
we can use TypeStructure in the literal to provide metadata.
message TypeStructure {
// Must exactly match for types to be castable
string tag = 1;
// dataclass_type only exists for dataclasses.
// This is used to resolve the type of the fields of dataclass
// The key is the field name, and the value is the literal type of the field
// e.g. For dataclass Foo, with fields a, and a is a string
// Foo.a will be resolved as a literal type of string from dataclass_type
map<string, LiteralType> dataclass_type = 2;
}
if `TypeStructure.tag == "json-msgpack", then we deserialize it using msgpack api and to json type.
flytectl
check if the type is scalar.JSON
flyteconsole
check if the type is (scalar.JSON and TypeStructure.tag has postfix "msgpack")
Reuse BINARY IDL
message Binary {
bytes value = 1;
string tag = 2;
}
propeller compiler
check if literal scalar simple type is (Binary and TypeStructure.tag == "json-msgpack") or not.
propeller attribute access
The same as new IDL called JSON.
flytectl
check if the type is (scalar.BINARY and TypeStructure.tag has prefix "json")
flyteconsole
check if the type is (scalar.BINARY and TypeStructure.tag has prefix "json" and postfix "msgpack")
I have a preference for the "type alias" JSON (why not call it Messagepack?) over reusing Binary as we'd avoid comparing string tags.
I have a preference for the "type alias" JSON (why not call it Messagepack?) over reusing Binary as we'd avoid comparing string tags.
I have 1 idea, but implementing, will show all of you today, and thank for your extra work in the weekend, really appreciate that.
Writing this here so others can comment. @Future-Outlier and I had a chat today about the most recent draft. I think we've strayed a bit far from the intent of the original issue.
The current implementation now doesn't really consider the JSON component very much at all. I'm basing this off of the conversation we had but it's also summarized by the first to literal block in the summary...
Python Val -> Bytes -> Protobuf JSON
In the original issue, the JSON component is the primary motivator, and msgpack was only picked because it is a more efficient transport mechanism for bytes across the wire than say, utf-8, or some other normal encoding format. (it's also understandable by browsers, unlike a generic compression format.) Now the json component seems to be an afterthought with msgpack being the primary motivator for this work. As far as I know (please correct me if I'm wrong), this is not the intent. If this was switched and intentionally so, could someone please add the context here?
If I understand correctly, the genesis of this project was the realization that json
- is a format that gets around a lot of the struct issues
- is something natively supported by popular libraries like dataclass and pydantic
- is a general enough format that we can write a general transformer for it in the future, thus enabling anything that has a
to_json/from_jsonfunction to seamlessly integrate with flyte. - is something that both back and front-end can understand
For these reasons it was thought (again, correct me if i'm wrong) that this would be a powerful abstraction. A couple things that might be enabled by a stronger json notion.
Compatibility
This was covered back in the issue with unions and dataclasses, but the existence of a json schema allows for more useful type/compatibility checking I think.
Attribute access of dataclass-like structures
The second part that I think is worth thinking about and improving upon we uncovered in our chat today. The data accessor featured added last year works well for native flyte idl container types (like lists and maps) but doesn't work with dataclasses. This example fails both subsequent tasks
from dataclasses import dataclass
from mashumaro.mixins.json import DataClassJSONMixin
from flytekit import task, workflow, WorkflowFailurePolicy
from flytekit.types.file import FlyteFile
@dataclass
class MyDC(DataClassJSONMixin):
data: FlyteFile
number: int = 10
@task
def produce_data() -> MyDC:
return MyDC(data=FlyteFile(path=__file__))
@task
def t1(input_file: FlyteFile):
with open(input_file, "r") as f:
print(f.read())
@task
def t2(nn: int):
print(f"Number is {nn}")
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf():
my_dc = produce_data()
t1(my_dc.data)
t2(my_dc.number)
The errors look pretty similar for both tasks
Error for t1
Cannot convert from Flyte Serialized object (Literal):
scalar:
generic:
fields: {'path': string_value: "s3://my-s3-bucket/data/o5/ak52wd7h59fhhtbckjgf-n0- [...] to <class 'flytekit.types.file.file.FlyteFile'>
Error for t2
Cannot convert literal Flyte Serialized object (Literal):
scalar:
primitive:
float_value: 10.0 to <class 'int'>
The second issue (the mismatch in float/int) will be addressed by the PR as it stands, but I don't think the first can be.
The reason is that propeller is just grabbing the struct object that's pointed to by the data attribute, which is not a FlyteFile object... it's a generic struct. I am very much against teaching flytekit type transformers how to handle proto generic structs.
However I think teaching type transformers how to handle to/from_json is entirely valid. We have the Python type when t1 kicks off, we know it's a FlyteFile, and so we can just parse the JSON that's pointed to by the attribute.
We can somehow instruct flytekit that the incoming data is of JSON form, rather than flyteidl proto binary form. And sure we can pick some encoding (utf-8, msgpack, doesn't matter).
For these reasons it was thought (again, correct me if i'm wrong) that this would be a powerful abstraction. A couple things that might be enabled by a stronger json notion.
I agree that having a good abstraction layer in Flytekit would be perfect. The current implementation might be simple and useful, but it doesn't exactly align with what the original issue describes.
We can somehow instruct flytekit that the incoming data is of JSON form, rather than flyteidl proto binary form. And sure we can pick some encoding (utf-8, msgpack, doesn't matter).
Not entirely understood this case, if possible, someone please explain it, thank you.
Writing this here so others can comment. @Future-Outlier and I had a chat today about the most recent draft. I think we've strayed a bit far from the intent of the original issue.
The current implementation now doesn't really consider the JSON component very much at all. I'm basing this off of the conversation we had but it's also summarized by the first to literal block in the summary...
Python Val -> Bytes -> Protobuf JSONIn the original issue, the JSON component is the primary motivator, and msgpack was only picked because it is a more efficient transport mechanism for bytes across the wire than say, utf-8, or some other normal encoding format. (it's also understandable by browsers, unlike a generic compression format.) Now the json component seems to be an afterthought with msgpack being the primary motivator for this work. As far as I know (please correct me if I'm wrong), this is not the intent. If this was switched and intentionally so, could someone please add the context here?
If I understand correctly, the genesis of this project was the realization that json
- is a format that gets around a lot of the struct issues
- is something natively supported by popular libraries like dataclass and pydantic
- is a general enough format that we can write a general transformer for it in the future, thus enabling anything that has a
to_json/from_jsonfunction to seamlessly integrate with flyte.- is something that both back and front-end can understand
For these reasons it was thought (again, correct me if i'm wrong) that this would be a powerful abstraction. A couple things that might be enabled by a stronger json notion.
Compatibility
This was covered back in the issue with unions and dataclasses, but the existence of a json schema allows for more useful type/compatibility checking I think.
Attribute access of dataclass-like structures
The second part that I think is worth thinking about and improving upon we uncovered in our chat today. The data accessor featured added last year works well for native flyte idl container types (like lists and maps) but doesn't work with dataclasses. This example fails both subsequent tasks
from dataclasses import dataclass from mashumaro.mixins.json import DataClassJSONMixin from flytekit import task, workflow, WorkflowFailurePolicy from flytekit.types.file import FlyteFile @dataclass class MyDC(DataClassJSONMixin): data: FlyteFile number: int = 10 @task def produce_data() -> MyDC: return MyDC(data=FlyteFile(path=__file__)) @task def print_file(input_file: FlyteFile): with open(input_file, "r") as f: print(f.read()) @task def print_number(nn: int): print(f"Number is {nn}") @workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) def wf(): my_dc = produce_data() print_file(my_dc.data) print_number(my_dc.number)The errors look pretty similar for both tasks
Error for
t1Cannot convert from Flyte Serialized object (Literal): scalar: generic: fields: {'path': string_value: "s3://my-s3-bucket/data/o5/ak52wd7h59fhhtbckjgf-n0- [...] to <class 'flytekit.types.file.file.FlyteFile'>Error for
t2Cannot convert literal Flyte Serialized object (Literal): scalar: primitive: float_value: 10.0 to <class 'int'>The second issue (the mismatch in float/int) will be addressed by the PR as it stands, but I don't think the second can be. The reason is that propeller is just grabbing the struct object that's pointed to by the
dataattribute, which is not a FlyteFile object... it's a generic struct. I am very much against teaching flytekit type transformers how to handle proto generic structs.However I think teaching type transformers how to handle
to/from_jsonis entirely valid. We have the Python type whent1kicks off, we know it's a FlyteFile, and so we can just parse the JSON that's pointed to by the attribute.We can somehow instruct flytekit that the incoming data is of JSON form, rather than flyteidl proto binary form. And sure we can pick some encoding (utf-8, msgpack, doesn't matter).
@wild-endeavor @eapolinario @pingsutw
Serialization: Python value -> json_str -> bytes -> JSON IDL
I think if we require everyone using JSON IDL to include a layer called json_str, it will be closer to the original idea.
Additionally, I can create a base class called JsonTransformer and use it as the parent class for DataclassTransformer, DictTransformer, and PydanticTransformer.
(class JsonTransformer(TypeTransformer) is required to implement 2 methods, to_json and from_json.)
This way, if we want to add other transformers in the future, it will be easier. What do you folks think?
Thank you. If I didn't understand what Yee said, please correct me, thank you all so much.
To make the JSON IDL more useful for users, I think that if we require users to have a json_str layer, it will ensure that the object is 100% a JSON object. For example, if we use another serialization API to serialize our object, it can't guarantee that it's a JSON object. (Since MsgPack is a superset of JSON, it can also support bytes, tuple, and set.)
A feasible approach is as follows:
- Serialization: python value -> JSON string -> bytes -> Flyte IDL JSON object
- Deserialization: Flyte IDL JSON object-> bytes -> JSON string -> python value
But if we convert language value -> JSON string, some serialization APIs may not gain a performance boost because this adds extra bytes to the string.
As mentioned in the official MsgPack documentation: 'MessagePack is an efficient binary serialization format. It allows you to exchange data between multiple languages, like JSON, but it is faster and smaller. Small integers are encoded in a single byte, and typical short strings require only one extra byte beyond the string itself.'
@Future-Outlier, @wild-endeavor and myself met to discuss the direction of this RFC. Here is what we landed on:
- We will use msgpack to serialize python types directly.
- The
LiteralTypewill be justSimpleType.STRUCTlike it's for DataClasses today. - We will fill in
TypeStructureinLiteralTypeto describe the type using Json Schema. - The
Literal(serialization medium) will beLiteral.Scalar.Binarywith atag=msgpackto allow runtime code in various languages to decode it. - The UI will need to be updated to be able to display the inputs/outputs in Binary msgpack format.
- The launch form will behave correctly in that it will render the fields based on the TypeStructure. When it creates an execution it will, however, use a
Literal.scalar.genericinstead ofLiteral.scalar.binarywhich should be ok (from a system design perspective as it's just a different medium of passing the same information) and propeller/flytekit should be able to access it normally. It can, however, also be updated at some point in the future to serialize into msgpack to avoid loss of fidelity and give compaction benefits.
The rationale:
- msgpack is more efficient to store data than JSON is. We have seen people generate really large dataclasses as outputs and storing json as a string is probably not the right thing.
- msgpack library in Golang is lossless when you unmarshal and marshal data. Json marshaller unfortunately is lossy (
1.0 floatconverts to1 int) - We do not need a new
LiteralTypesince the "type" the user is defining isn't new. We are just introducing a more efficient (and more correct) way of maintaining the values-fidelity. - We do not need a new
Literalsince thevaluethe user is storing is already supported asBinary
@wild-endeavor and @Future-Outlier will add more details about the implementation and flytekit changes needed...
cc @eapolinario
Move to https://github.com/flyteorg/flyte/pull/5742