airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Enable Serde for Pydantic BaseModel and Subclasses

Open sjyangkevin opened this issue 7 months ago • 16 comments

Motivation

The original purpose of this PR is to resolve #50867. In Cohere provider, since 1.4.2, the return type of CohereEmbeddingOperator is updated from list[list[float]] to EmbedByTypeResponseEmbeddings. EmbedByTypeResponseEmbeddings is a class inherit from the pydantic.BaseModel but there are multiple intermediate classes in between. To enable embeddings being passed through XComs, we need to have the capability to serialize/deserialize EmbedByTypeResponseEmbeddings. Since the base class is pydantic.BaseModel, we consider to implement this in core serde so future use cases can also benefit from it.

Close #50867.

High-level Design Solution

First, I think the serializer should be able to identify a Pydantic model, e.g., a class that inherits from the pydantic.BaseModel, or a class that is a subclass of another class that inherits from pydantic.BaseModel. A Pydantic model can be identified simply using isinstance(obj, pydantic.BaseModel). However, isinstance can be slow since it needs to traverse the inheritance tree. So, an alternative solution is to use attributes that are specific to a Pydantic model to identify it. In this case, the attributes being used are __pydantic_fields__ and __pydantic_validator__. Then, for any pydantic model, the serialization process can be implemented by calling the model_dump() method on that instance.

However, to restore the Pydantic model, the deserializer needs to know the actual class rather than the generic pydantic.BaseModel. Therefore, there is a need to keep track the actual pydantic class, e.g., cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings. When re-creating the model, this class will be used and the model_validate() method will be invoked.

The current implementation using dynamic import to handle it. However, this implementation face some limitations where the module cannot be resolved. For example, when the Pydantic model is defined inside a Python function, or is defined in a task-decorated Python funciton.

def _resolve_pydantic_class(qn: str):
    module_name, class_name = qn.rsplit(".", 1)
    module = import_module(module_name)
    return getattr(module, class_name)

Test Result

The Serde can successfully serialize EmbedByTypeResponseEmbeddings. Screenshot from 2025-05-25 23-08-46

The Serde can successfully deserialize EmbedByTypeResponseEmbeddings. Screenshot from 2025-05-25 23-09-24

Test DAG code

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator

from pendulum import datetime

COHERE_CONN_ID = "cohere_default"

@dag(
    start_date=datetime(2025, 5, 23),
    schedule=None,
    catchup=False,
)
def pydantic_serde():
    @task
    def push_pydantic():
        from pydantic import BaseModel, Field
        from typing import Optional

        class BarModel(BaseModel):
            whatever: int
        # this Pydantic model is created within the function, in deserialization, the module will be resolved as
        # unusual_prefix_afec8360888f39af6ea3ccaccf36a7f590a25638_pydantic_serde.pydantic_serde
        # This CANNOT be handled by the deserializer
        class FooBarModel(BaseModel):
            banana: Optional[float] = 1.1
            foo: str = Field(serialization_alias='foo_alias')
            bar: BarModel

        m = FooBarModel(banana=3.14, foo='hello', bar={'whatever': 123})
        return m
    
    @task
    def get_pydantic(m):
        # it cannot handle pydantic model created within the upstream task.
        print(m.model_dump())

    @task
    def get_embeddings():
        import pydantic
        
        cohere_hook = CohereHook()
        embeddings = cohere_hook.create_embeddings(["gruyere"])

        print("type of embeddings:", type(embeddings))
        print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))

        return embeddings

    @task
    def print_embeddings(embeddings):
        embeddings = [x[0] for x in embeddings]
        print(embeddings)

    print_embeddings(get_embeddings())
    get_pydantic(push_pydantic())

pydantic_serde()

Limitations of the implementation

During testing, I found that if a Pydantic model is not defined in the global scope, e.g., define within a (test) function, or an Airflow task (i.e., a task-decorated Python function), the Serde will not work due to the usage of dynamic import.


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

sjyangkevin avatar May 26 '25 03:05 sjyangkevin

Hi all, thank you very much again for all the constructive feedback. I've push some changes based on that.

Design Principle

  1. We want to prioritize on using the serialize() and deserialize() method if those are defined in the object (custom) over the registered default serializer/deserializer
  2. For Pydantic model, and any of its subclasses, we want to be able to identify it. In the serializer/deserializer, we can register the generic one pydantic.main.BaseModel, so it can benefit all Pydantic models coming in the future.
  3. During deserialization, we need to have the capability to reconstruct the Pydantic object. In this case, we need the actual Pydantic class (e.g., cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings). This will be encoded during serialization as classname, and being propagated to the deserializer during deserialization. So, we don't need to use magic keyword __class__ to handle it, eliminating security concerns. The serde.py need to be modified accordingly for this change. Since pydantic.main.BaseModel is registered, and the actual classname (e.g., cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings) is used to search for the default deserializer. It will be a miss. Therefore, we need a fallback mechanism for Pydantic model, to check if it is a subclass of the pydantic.main.BaseModel. If so, using this hard coded key (pydantic.main.BaseModel) to invoke the registered deserializer, and propagate the serialized object into it.

I still want to keep the whitelisting mechanism, to make it even more safer. For arbitrary subclasses that do not directly inherit from Pydantic BaseModel. You will see the following error if the class is not added to the allowed_deserialization_classes.

ImportError: unusual_prefix_afec8360888f39af6ea3ccaccf36a7f590a25638_pydantic_serde.pydantic_serde.<locals>.push_pydantic.<locals>.FooBarModel was not found in allow list for deserialization imports. To allow it, add it to allowed_deserialization_classes in the configuration

For the Cohere Operator, I can have it added to allowed_deserialization_classes by creating the environment variable.

AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES=cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings

Then, I can successfully serialize/deserialize Screenshot from 2025-05-29 00-43-23

Please let me know if you have further feedback. I am happy to discuss and make changes accordingly.

Change Summary

In serde.py

  1. I renamed _is_pydantic_basemodel() to _is_pydantic_model and added docstring to describe why not use isinstance.
  2. I modified the order in which serializers/deserializers are used. In the docstring, it states that the serde provided by the object will be the prioritized. However, in the implementation, it checks for the registered serde first. I alter the order for both serialize and deserialize, so user-provided serde will be used first if defined, then the registered one, then dataclass/attr
  3. I moved _is_pydantic_model and _is_namedtuple to the top of the file
  4. In condition check for Pydantic model, I updated qn to "pydantic.main.BaseModel" instead of qualname(BaseModel). I change classname to qualname(o), so no magic keyword (i.e., __class__) will be used in either pydantic's serialize/deserialize
  5. In deserialize, I added a fallback check, since in serialize, the actual classname is encoded. So, it will be a miss in the check for registered pydantic deserializer since the classname being registered is the generic one (i.e., pydantic.main.BaseModel). So, it will check for any pydantic model subclass and direct that to the deserializer.

In pydantic.py

  1. I removed the use of __class__ key to eliminate security concerns.
  2. I use import_string instead of reinventing the wheel.
  3. serialization is simply model_dump and deserialization is simply model_validate.

sjyangkevin avatar May 29 '25 04:05 sjyangkevin

Attach unit test result. pytest --log-cli-level=DEBUG airflow-core/tests/unit/serialization/ Screenshot from 2025-05-29 01-06-56

sjyangkevin avatar May 29 '25 05:05 sjyangkevin

I'm replying from a phone so expect some mistakes ;-).

I like the direction where this is going! However, I do see some challenges remaining. The biggest challenge I would like to get rid of the "import_string" entirely. When we caught the earlier security issue I was actually thinking about add a ruff rule that prevents the addition. It's a security issue bound to happen. In your current implementation there is no guard against loading a malicious class except for the one in serde itself. So if I'm able to trick somehow the deserialization differently it still goes through. That shouldn't be the case. In addition the module is now loaded twice, probably without caching due to custom loading.

So in this case I see two options

  1. move the logic for pydantic to serde.py and remove the serializer. Drawback is that a future change will always require a core release

  2. allow passing the loaded class to the deserializer. This requires refactoring of the other deserializers to accept a class.

1 - is simpler. 2. Is IMHO more future proof

Furthermore, I prefer to fix issues in one PR especially here in this context. So please do not move the code as you did here based on the comment / doc. It is unrelated and might have subtle issues. I'd rather have that separate, because it makes sense to align the code with the comment (or vice versa!). Just not here, right now.

bolkedebruin avatar May 30 '25 10:05 bolkedebruin

Thanks a lot for the thoughtful feedback.

You're absolutely right about the risk of using import_string() in the pydantic serializer, or potentially distribute this import logic across multiple places. I will remove the import logic from the serializer module and have a deeper look into your suggestion and see how we can get rid of it while making the code clean and safe.

Regarding the serializer call order in serde.py. I now see that this change, while conceptually I feel correct, shouldn't be mixed into this PR. I will revert that part and we can discuss more and potentially open a separate PR, where I can more clearly explain the motivation. Currently, I think registered serializers take precedence over instance-provided serialize() or deserialize() methods, which unintentionally prevents custom serialization logic from being respected in some cases. Feel free to correct me if I am wrong.

As a side note, just to confirm, my understanding is that the use of import_string inside serde.py itself is acceptable since it's gated behind allowed_deserialization_clases, correct? and potentially we can add ruff rule to guard it further.

sjyangkevin avatar May 30 '25 14:05 sjyangkevin

allow passing the loaded class to the deserializer. This requires refactoring of the other deserializers to accept a class.

@bolkedebruin , I feel like this can be the way to go. I think it’s good idea to keep Pydantic modularized as well as the other serializers. I wasn’t able to find other alternatives that can be better than this solution. In serde.py, we have the import_string to resolve the actual class. Then this can ideally be passed into the serializers and use directly. We use the serde.py as a gate to validate and load the class, and serializers only use it. To resolve for the Pydantic issue (i.e. the Cohere case, user can add it to the allowed_deserialized_classes). So, in the serializers, we can totally get rid of the import_string. Let me know if you think it’s a feasible way. I can make the changes accordingly, and I would appreciate if you can provide me some guidance on how to properly test it after making the changes. Thanks!

sjyangkevin avatar Jun 01 '25 08:06 sjyangkevin

Okay! I like the more modularized approach. Let's go that way! We might need to think of a mechanism that allows serializers to register "allowed" classed, but that's probably out of scope for now (let's not include it now).

bolkedebruin avatar Jun 02 '25 08:06 bolkedebruin

I like the direction where this is going too. 2) is the way to go with a more modularised way as mentioned above. No objections..

@sjyangkevin shout to us when you need reviews :D

amoghrajesh avatar Jun 02 '25 10:06 amoghrajesh

Hi, just a heads up, sorry wasn't able to make progress these few days, was very busy. I will try to push a changes by early next week. Thank you for your patient, and appreciate your time in review. Feel free to share with me any feedback, will take that into next update.

sjyangkevin avatar Jun 04 '25 20:06 sjyangkevin

@sjyangkevin take your time. There is no urgency on this :)

We understand that everyone works on open source contributions during their free time, so no pressure at all!

amoghrajesh avatar Jun 05 '25 06:06 amoghrajesh

Hi @amoghrajesh , @bolkedebruin , I would like to follow up with some updates

  1. I reverted the changes that I made to serde.py to alter the order of serializers/deserializers
  2. I updated the function signature of deserialize in all serializer modules, by adding an optional parameter cls: Any. I found that existing serializers mostly use classname. If I replaced the classname with cls, there will be some refactors to the code. Therefore, I would like to reduce the chance of introducing subtle issues. The existing serializers can function as it is, the pydantic one can then accept cls from serde.py.
  3. I modified the pydantic's serialize method as well, instead of returning pydantic.main.BaseModel as the serialized_classname, I let it return qualname(o). In this way, any arbitrary pydantic model can be scanned during deserialization. It means cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings must be added to the allowed_deserialization_clases such that it can be deserialized.
  4. import_string is removed from the pydantic serializer.

It has passed all the unit tests, and I've updated my test DAG code as shown below.

Arbitrary pydantic model must be added to allowed_deserialization_clases

Before adding to allowed_deserialization_clases Screenshot from 2025-06-08 01-16-41 After adding to allowed_deserialization_clases Screenshot from 2025-06-08 01-16-53

Test DAG code

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator

from pendulum import datetime

COHERE_CONN_ID = "cohere_default"

@dag(
    start_date=datetime(2025, 5, 23),
    schedule=None,
    catchup=False,
)
def pydantic_serde():

    @task
    def get_pandas():
        import pandas as pd
        import numpy as np

        return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
    
    @task
    def print_pandas(df):
        print(df)

    @task
    def get_numpy():
        import numpy as np

        n = np.random.rand(3,2)[0][0]
        print(type(n))
    
    @task
    def print_numpy(n):
        print(n)

    @task
    def get_embeddings():
        import pydantic
        
        cohere_hook = CohereHook()
        embeddings = cohere_hook.create_embeddings(["gruyere"])

        print("type of embeddings:", type(embeddings))
        print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))

        return embeddings

    @task
    def print_embeddings(embeddings):
        print(embeddings)

    print_embeddings(get_embeddings())
    print_numpy(get_numpy())
    print_pandas(get_pandas())

pydantic_serde()

sjyangkevin avatar Jun 08 '25 05:06 sjyangkevin

We are slowly getting there. I do prefer a refactor, so that classname isn't there anymore. Otherwise we just have redundant code and tech debt.

Thanks for the feedback! I was a little hesitated since it will be a huge refactor. I totally agree with your point, and will gradually refactor those. I would start with the serde.py, and once we are good on the overall structure, will update submodule to follow the general structure.

sjyangkevin avatar Jun 16 '25 12:06 sjyangkevin

We are slowly getting there. I do prefer a refactor, so that classname isn't there anymore. Otherwise we just have redundant code and tech debt.

Thanks for the feedback! I was a little hesitated since it will be a huge refactor. I totally agree with your point, and will gradually refactor those. I would start with the serde.py, and once we are good on the overall structure, will update submodule to follow the general structure.

Also - if we are thinking about refactoring stuff. I think (and I know @bolkedebruin had other opinion on that) - there was a discussion on whether it's good that we are depending on other dependencies (say pandas, deltalake, iceberg, kubernetes) which are part of the "core" airflow - but also we have "providers" for many of those that provide operators / hooks and other "core extensions" related to the respective "external entity".

In my view, serializers for kubernetes, should come from kubernetes provider. Deltalake -> should come from databricks (or deltalake provider if we decide to have one), iceberg should come from iceberg provider.

Again -> I know @bolkedebruin had different view on that, but my goal is to have core as small as possible, and add anything to it as "extensions" - for which we already have "providers" as a mechanism to do so.

For me the Pydantic thing comes as very similar thing - it should be "extension" that should be IMHO implemented outside of core. So maybe it's the right time to do this kind of refactoring -> Implement "discovery" mechanism in providers manager to discover which serializers are installed (similarly as all other extensions) - and similarly speciic "pydantic" model could be provided as "extension" - by a provider or manually.

I'd love to hear thought about it.

potiuk avatar Jun 16 '25 13:06 potiuk

Hi @potiuk , very appreciate the insights and I would like to share some thoughts. Feel free to correct me if I am wrong on anything below.

We had a discussion in #50867 , and the issue with serializing Pydantic model raised in Cohere provider. Considering pydantic class may potentially be used by other providers, we think that it can be good to have it implemented in the core module such that it can be generic and reusable. In the current serialization module, I feel pandas, numpy, datetime, are similar to this case, which are common objects maybe used by multiple providers, or by tasks to pass in XComs. This approach may help avoid implementing similar things in different providers.

serialization come from providers can also provide multiple benefits. 1.) we do not need a core release when updates are needed for serialization/deserialization for data created from a specific providers (iceberg should from iceberg provider, etc.) 2.) core can be minimal to just discover and register serde as extensions

I am also very interested in looking into the option of how we can move it out of core and let provider managers to reuse common objects and register those as needed. Also, how we could keep it DRY, resolve security concerns, while being able to extend it easily.

sjyangkevin avatar Jun 16 '25 15:06 sjyangkevin

there was a discussion on whether it's good that we are depending on other dependencies (say pandas, deltalake, iceberg, kubernetes) which are part of the "core" airflow - but also we have "providers" for many of those that provide operators / hooks and other "core extensions" related to the respective "external entity".

In my view, serializers for kubernetes, should come from kubernetes provider. Deltalake -> should come from databricks (or deltalake provider if we decide to have one), iceberg should come from iceberg provider.

Good pointers, i absolutely agree that the kubernetes, pandas, deltalake and iceberg should not belong in core and should be safely moved to providers, i would love to hear what @bolkedebruin thinks on this. Is it the versioning that is stopping us from doing that?

For me the Pydantic thing comes as very similar thing - it should be "extension" that should be IMHO implemented outside of core. So maybe it's the right time to do this kind of refactoring -> Implement "discovery" mechanism in providers manager to discover which serializers are installed (similarly as all other extensions) - and similarly speciic "pydantic" model could be provided as "extension" - by a provider or manually.

However, i think pydantic is more of a core thing, it not essentially belongs to a provider, it can be consumed and used in core without additional dependencies. So there's no stopping anybody from returning a pydantic dataclass object as xcom.

amoghrajesh avatar Jun 17 '25 05:06 amoghrajesh

I feel pandas, numpy, datetime, are similar to this case, which are common objects maybe used by multiple providers, or by tasks to pass in XComs. This approach may help avoid implementing similar things in different providers.

I am not quite sure. I'd say it might also come to "common.dataframe" provider (alongside polars and likely other things - see a discussion we had some time ago (nothing materialized from it - but it could) https://lists.apache.org/thread/qx3yh6h0l6jb0kh3fz9q95b3x5b4001l - then, any provider that needs it could simply depend on it.

There is conceptually no difference of "common.sql" and "common.dataframe" - and somehow we seem to want to have "dataframe" related code in Airflow core, while we keep "sql" releated code in provider.

However, i think pydantic is more of a core thing, it not essentially belongs to a provider, it can be consumed and used in core without additional dependencies. So there's no stopping anybody from returning a pydantic dataclass object as xcom.

I think Pydantic is two different layers:

  • serialization code that handles various Pydantic models that can be registered with it - can indeed be in "airflow-core", especially that we already use it for other things (namely fast api)

  • but serialization/deserialization of concrete Pydantic Classes should be "injected" by respective provider. If Cohere, defines their own "Pydantic" model, and other provider wants to use that Pydantic model "deserialized" - this is perfectly fine for the other provider to depend on Cohere to be able to instantiate the concrete Cohere model. Basically you need ot have Cohere provider installed, in order to use it's model. And installing Cohere provider should "register" the Cohere model in the common Pydantic serializer.

This is "classic" dependency injection that we need to implement (and we already do a lot of it common.sql for example) - but also common.messaging. You implement "common" code in comon place, and then specific code that implements "external entity variant of it" in a provider that registers the "specific code" when it gets installed.

See https://github.com/apache/airflow/pull/50057 which "fixed" the way how it is done for "common.messaging" -> provider's manager loads common.messaging and the "common" code that then you can use to listen for incoming messages. Common.messaging provides that "common implementation". Then "amazon" provider providers SQS implementation of it and registers it (providers manager takes care about discovering "queue classes" and injects them in common.messging - so that common.messaging class can "know" about "sqs" class.

No core class needs to know or import "amazon" classes in this case. It does not know about them at all. It's the amazon provider that "knows" about common.messaging and exposes it's specific "amazon" messaging implementation and allows provider's manager to register it when provider is installed (via entrypoints).

This is IMHO how whole serialization of ours should be implemented. Airflow core should not care about, pandas, polars, iceberg etc. etc. It shoudl be "pure, no dependencies" code. Providers - when installed - should provide all the code that is specific for the "external entity" (amazon, iceberg, whatever) and make it usable by the core. But imporitng "iceberg" anywhere in "airlfow-core" is just wrong.

potiuk avatar Jun 17 '25 10:06 potiuk

@potiuk I concur with the view that we ideally should move these to providers as much as possible (I'm not sure if I was really against that maybe I was ;-) ), except maybe for builtin python (like int, str etc). However, for the sake of simplicity we earlier (in the issue I think) already decided to not ask for providing a plugin implementation. I think that warrants a separate PR.

So yes you are right, but given that pydantic is used elsewhere in airflow-core I don't think we are adding extra dependencies into core right now and that we can move it as an improvement by a follow up.

This PR actually improves the API that the plugins should follow (cls instead of classname). The PR implementing the plugin separation can use that.

bolkedebruin avatar Jun 17 '25 14:06 bolkedebruin

Hi @bolkedebruin , I've pushed the refactor for all the interfaces to use cls instead of classname. Below are change highlights.

  1. Update the interface of deserialize to deserialize(cls: type, version: int, data: object)
  2. Group _is_namedtuple and _is_pydantic_model with other private methods
  3. Use a constant PYDANTIC_MODEL_QUALNAME for "pydantic.main.BaseModel"
  4. Remove import private method from serde.py in serializers
  5. Add more unit test cases (bignum, builtin, pydantic)

It would be great if I could get your guidance on how to better implement the following. Thank you for your time and patient in reviewing it.

  1. In deserialize, I use class: type as the type hint, do you have any suggestion on this?

  2. Use attributes to identify Pydantic model. Now, I have another private method in the pydantic serializer, which is duplicate code of the one in serde.py. Is there anywhere we can move it to a common place and safely import by both? I am thinking about airflow.utils. Or, we should change the way how it's checked.

  3. I wasn't sure how to properly test iceberg, and deltalake, and there are a few pendulum (e.g., v2) tests are skipped. Is there any service or things I can setup locally to run those tests?

  4. The test case test_timezone_deserialize_zoneinfo tries to deserialize "backports.zoneinfo.ZoneInfo". However, this module seems not in the breeze environment, and cannot be passed as backports.zoneinfo.ZoneInfo.

I've read the PR link and discussion shared by @potiuk , I also think that is a good way to go, and I am interested in contributing to that part as the next step.

Please feel free to let me know if anything needs to be changed, I would really appreciate any feedback and eager to make it better. I am having some issues running pre-commit locally (it's extremely slow). If the push didn't pass the checks, will make sure all checks pass before next push.

sjyangkevin avatar Jun 19 '25 04:06 sjyangkevin

Sorry, there are still some issues with static checks, when I run locally, it seems to be fixed, i try to sort it out and push again later.

sjyangkevin avatar Jun 19 '25 21:06 sjyangkevin

I would like to attach my test DAG code which check most of the serializers and deserializers, except for iceberg, and deltalake. Hope this can be helpful for review. Thanks.

Screenshot from 2025-06-24 23-51-31

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator

from pendulum import datetime

COHERE_CONN_ID = "cohere_default"

@dag(
    start_date=datetime(2025, 5, 23),
    schedule=None,
    catchup=False,
)
def pydantic_serde():

    @task
    def get_pandas():
        import pandas as pd
        import numpy as np

        return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
    
    @task
    def print_pandas(df):
        print("Pandas DataFrame")
        print(df)

    @task
    def get_bignum():
        import decimal
        return decimal.Decimal(1234567891011)
    
    @task
    def print_bignum(n):
        print("bignum:", n)

    @task
    def get_list():
        return [1, 2, 3, 4]
    
    @task
    def print_list(l):
        print(l)

    @task
    def get_set():
        return set([1, 2, 3, 4])
    
    @task
    def print_set(s):
        print(s)

    @task
    def get_tuple():
        return (1, 2, 3, 4)
    
    @task
    def print_tuple(t):
        print(t)

    @task
    def get_frozenset():
        return frozenset([1,2,3,4])
    
    @task
    def print_frozenset(fs):
        print(fs)

    @task
    def get_numpy():
        import numpy as np

        n = np.random.rand(3,2)[0][0]
        print(type(n))
        return n
    
    @task
    def get_datetime():
        import datetime
        return datetime.datetime.now()
    
    @task
    def print_datetime(dt):
        print(dt)

    @task
    def get_timezone():
        from zoneinfo import ZoneInfo
        from datetime import datetime

        return datetime(2020, 10, 31, 12, tzinfo=ZoneInfo("America/Toronto"))
    
    @task
    def get_pendulum_tz():
        import pendulum
        return pendulum.timezone("Europe/Paris")

    @task
    def print_pendulum_tz(tz):
        print(tz)

    @task
    def print_timezone(tz):
        print(tz)

    @task
    def get_pendulum_datetime():
        import pendulum
        return pendulum.now()
    
    @task
    def print_pendulum_datetime(dt):
        print(dt)

    @task
    def print_numpy(n):
        print("NumPy Array")
        print(n)

    @task
    def get_embeddings():
        # this uses the older provider version when embedding is returned as a pydantic model
        import pydantic
        
        cohere_hook = CohereHook()
        embeddings = cohere_hook.create_embeddings(["gruyere"])

        print("type of embeddings:", type(embeddings))
        print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel))

        return embeddings

    @task
    def print_embeddings(embeddings):
        print("Pydantic Model")
        print(embeddings)

    print_embeddings(get_embeddings())
    print_numpy(get_numpy())
    print_pandas(get_pandas())
    print_list(get_list())
    print_set(get_set())
    print_tuple(get_tuple())
    print_bignum(get_bignum())
    print_datetime(get_datetime())
    print_pendulum_datetime(get_pendulum_datetime())
    print_frozenset(get_frozenset())
    print_timezone(get_timezone())
    print_pendulum_tz(get_pendulum_tz())

pydantic_serde()

sjyangkevin avatar Jun 25 '25 03:06 sjyangkevin

Awesome! I think we are there

Nice! Thank you!

sjyangkevin avatar Jun 26 '25 12:06 sjyangkevin