sdk icon indicating copy to clipboard operation
sdk copied to clipboard

refactor: Implement msgspec encoding

Open edgarrmondragon opened this issue 1 year ago β€’ 8 comments

  • [ ] Docs
  • [ ] UX. Relying on mro magic is not great, e.g. why SQLiteTap(MsgSpecWriter, SQLTap) and not SQLiteTap(SQLTap, MsgSpecWriter). A better approach might be to make the IO implementation an attribute of the Singer class.
  • [ ] Fix tests

πŸ“š Documentation preview πŸ“š: https://meltano-sdk--2541.org.readthedocs.build/en/2541/

edgarrmondragon avatar Jul 17 '24 01:07 edgarrmondragon

CodSpeed Performance Report

Merging #2541 will improve performances by Γ—12

Comparing edgarrmondragon/refactor/msgspec-impl-naive (ae168a0) with main (6f32572)

Summary

⚑ 2 improvements
βœ… 5 untouched benchmarks

Benchmarks breakdown

Benchmark BASE HEAD Change
⚑ test_bench_deserialize_json 23.3 ms 5.5 ms Γ—4.2
⚑ test_bench_format_message 52.2 ms 4.2 ms Γ—12

codspeed-hq[bot] avatar Jul 17 '24 01:07 codspeed-hq[bot]

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 91.42%. Comparing base (6f32572) to head (ae168a0). Report is 195 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2541      +/-   ##
==========================================
+ Coverage   91.34%   91.42%   +0.08%     
==========================================
  Files          63       63              
  Lines        5231     5280      +49     
  Branches      677      673       -4     
==========================================
+ Hits         4778     4827      +49     
  Misses        320      320              
  Partials      133      133              

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

codecov[bot] avatar Jul 17 '24 01:07 codecov[bot]

I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.

default_output = sys.stdout.buffer

def write_message(self, message: Message) -> None:
	"""Write a message to stdout.

	Args:
		message: The message to write.
	"""
	self.default_output.write(self.format_message(message))
	self.default_output.flush()

BuzzCutNorman avatar Aug 16 '24 16:08 BuzzCutNorman

In the json.py file I found to match the msgspec performance suggestions and fit into the framework you put in place I created a function for generating jsonl so I could keep the functionality of seralize_json to return strings. This way the serialize_json can be used in the connector engine creation and also in process_batch_files.

https://jcristharif.com/msgspec/perf-tips.html#line-delimited-json

json.py:

def serialize_json(obj: object, **kwargs: t.Any) -> str:
    """Serialize a dictionary into a line of json.

    Args:
        obj: A Python object usually a dict.
        **kwargs: Optional key word arguments.

    Returns:
        A string of serialized json.
    """
    return encoder.encode(obj).decode()

msg_buffer = bytearray(64)

def serialize_jsonl(obj: object, **kwargs: t.Any) -> bytes:
        """Serialize a dictionary into a line of jsonl.

        Args:
            obj: A Python object usually a dict.
            **kwargs: Optional key word arguments.

        Returns:
            A bytes of serialized json.
        """
        encoder.encode_into(obj, msg_buffer)
        msg_buffer.extend(b"\n")
        return msg_buffer

SingerWriter:

    def serialize_message(self, message: Message) -> str | bytes:
        """Serialize a dictionary into a line of json.

        Args:
            message: A Singer message object.

        Returns:
            A string of serialized json.
        """
        return serialize_jsonl(message.to_dict())

BuzzCutNorman avatar Aug 16 '24 16:08 BuzzCutNorman

I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.

default_output = sys.stdout.buffer

def write_message(self, message: Message) -> None:
	"""Write a message to stdout.

	Args:
		message: The message to write.
	"""
	self.default_output.write(self.format_message(message))
	self.default_output.flush()

Do you mean in

https://github.com/meltano/sdk/blob/08b58bf4601eefc88418e0f696bad253bb7b91c9/singer_sdk/_singerlib/encoding/_msgspec.py#L73-L94

?

edgarrmondragon avatar Sep 06 '24 18:09 edgarrmondragon

Yes, that is exactly what I meant. Could have definitely been stated clearer on my partπŸ˜….

 class MsgSpecWriter(GenericSingerWriter[bytes, Message]): 
     """Interface for all plugins writing Singer messages to stdout.""" 
     
     default_output = sys.stdout.buffer
     
     def serialize_message(self, message: Message) -> bytes:  # noqa: PLR6301 
         """Serialize a dictionary into a line of json. 
  
         Args: 
             message: A Singer message object. 
  
         Returns: 
             A string of serialized json. 
         """ 
         return serialize_jsonl(message.to_dict()) 
  
     def write_message(self, message: Message) -> None: 
         """Write a message to stdout. 
  
         Args: 
             message: The message to write. 
         """ 
 	self.default_output.write(self.format_message(message))
	self.default_output.flush()

BuzzCutNorman avatar Sep 06 '24 18:09 BuzzCutNorman

Naive of me to think I could get this across in 1/2 a day of work πŸ˜…. I'll come back to this later, there's plenty of time until the planned release date.

edgarrmondragon avatar Sep 06 '24 20:09 edgarrmondragon

Like the pun 😊. Great dad joke material. Kind an inside joke now since you dropped (naive) from the title of the PR.

BuzzCutNorman avatar Sep 06 '24 20:09 BuzzCutNorman

Might have to punt this if https://github.com/jcrist/msgspec/pull/711 doesn't get merged before we're ready to officially declare Python 3.13 support

edgarrmondragon avatar Oct 02 '24 17:10 edgarrmondragon

I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.

 class MsgSpecWriter(GenericSingerWriter[bytes, Message]): 
     """Interface for all plugins writing Singer messages to stdout.""" 
     
     default_output = sys.stdout.buffer
     
     def serialize_message(self, message: Message) -> bytes:  # noqa: PLR6301 
         """Serialize a dictionary into a line of json. 
  
         Args: 
             message: A Singer message object. 
  
         Returns: 
             A string of serialized json. 
         """ 
         return serialize_jsonl(message.to_dict()) 
  
     def write_message(self, message: Message) -> None: 
         """Write a message to stdout. 
  
         Args: 
             message: The message to write. 
         """ 
 	self.default_output.write(self.format_message(message))
	self.default_output.flush()

Well, found that making this generic by using default_output was a bad move. It is best to stick with sys.stdout or sys.stdout.buffer. I couldn't get the built-in SDK Tap tests to work until I changed back to using sys.stdout.buffer in a tap utilizing msgspec. I borrowed your use of io.TextIOWrapper(buffer=io.BytesIO()) in the test_msgspec.py file that is part of this PR and added it to singer_sdk.testing.runners.TapTestRunner._execute_sync so the redirect_stdout doesn't error because of stdout_buf = io.StringIO() lack of a .buffer to write to.

    def _execute_sync(self) -> tuple[str, str]:
        """Invoke a Tap object and return STDOUT and STDERR results in StringIO buffers.

        Returns:
            A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
        """
        stdout_buf = io.TextIOWrapper(buffer=io.BytesIO())
        stderr_buf = io.TextIOWrapper(buffer=io.BytesIO())
        with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
            self.run_sync_dry_run()
        stdout_buf.seek(0)
        stderr_buf.seek(0)
        return stdout_buf.read(), stderr_buf.read()

In summary I had to change the tap's overwrite of write_message to utilize sys.stdout.buffer.write() and sys.stdout.flush(). Then changed singer_sdk.testing.runners.TapTestRunner._execute_sync to use io.TextIOWrapper(buffer=io.BytesIO()) instead of io.StringIO() for stdout_buff and stderr_buff. The Singer-SDK built in tap tests ran correctly after the change to the tap and installing a local development branch of the singer-sdk with the change to TapTestRunner._execute_sync in the virtual environment I ran the tap test from.

BuzzCutNorman avatar Dec 27 '24 21:12 BuzzCutNorman

Ok, the tests are passing.

Now I want to think of how to make it easy and straightforward for a developer to use msgspec as the SerDe layer, and also keep the door open to the user being the one deciding which serialization layer to use.

edgarrmondragon avatar Jan 02 '25 15:01 edgarrmondragon

@sourcery-ai review

edgarrmondragon avatar Jan 22 '25 04:01 edgarrmondragon

Reviewer's Guide by Sourcery

This pull request introduces msgspec encoding for improved performance, refactors the reader/writer implementations, and adds new test cases. It also introduces BaseSingerReader and BaseSingerWriter classes to standardize reader/writer implementations.

Updated class diagram for BaseSingerReader and BaseSingerWriter

classDiagram
    class PluginBase {
        +config: dict | PurePath | str | list[PurePath | str] | None
        +parse_env_config: bool
        +validate_config: bool
    }
    class BaseSingerReader {
        +message_reader_class: type[GenericSingerReader]
        +message_reader: GenericSingerReader | None
        +listen(file_input: t.IO[str] | None) : None
        +process_lines(file_input: t.IO[str] | None) : t.Counter[str]
        +process_endofpipe() : None
        +_assert_line_requires(message_dict: dict, requires: set[str]) : None
        <<abstract>>
        +_process_schema_message(message_dict: dict) : None
        <<abstract>>
        +_process_record_message(message_dict: dict) : None
        <<abstract>>
        +_process_state_message(message_dict: dict) : None
        <<abstract>>
        +_process_activate_version_message(message_dict: dict) : None
        <<abstract>>
        +_process_batch_message(message_dict: dict) : None
    }
    class BaseSingerWriter {
        +message_writer_class: type[GenericSingerWriter]
        +message_writer: GenericSingerWriter | None
        +write_message(message: t.Any) : None
    }
    PluginBase <|-- BaseSingerReader
    PluginBase <|-- BaseSingerWriter

Class diagram for MsgSpecReader and MsgSpecWriter

classDiagram
    class GenericSingerReader {
        <<interface>>
        +deserialize_json(line: str) : dict
    }
    class GenericSingerWriter {
        <<interface>>
        +serialize_message(message: Message) : bytes
    }
    class MsgSpecReader {
        +default_input: t.IO
        +deserialize_json(line: str) : dict
    }
    class MsgSpecWriter {
        +serialize_message(message: Message) : bytes
        +write_message(message: Message) : None
    }

    GenericSingerReader <|.. MsgSpecReader : implements
    GenericSingerWriter <|.. MsgSpecWriter : implements

File-Level Changes

Change Details Files
Introduces BaseSingerReader and BaseSingerWriter classes to standardize reader/writer implementations.
  • Adds BaseSingerReader and BaseSingerWriter as base classes for readers and writers.
  • Moves listen and write_message methods to the new base classes.
  • Updates Target and Tap to inherit from the new base classes.
  • Removes SingerReader and SingerWriter inheritance from Target and Tap.
  • Adds message_reader_class and message_writer_class attributes to BaseSingerReader and BaseSingerWriter respectively.
  • Adds process_endofpipe to BaseSingerReader.
singer_sdk/plugin_base.py
singer_sdk/target_base.py
singer_sdk/tap_base.py
Implements msgspec encoding for improved performance.
  • Adds msgspec as a dependency.
  • Creates MsgSpecReader and MsgSpecWriter classes in singer_sdk/contrib/msgspec.py.
  • Implements serialize_jsonl, enc_hook, and dec_hook functions for msgspec encoding.
  • Updates deserialize_json method in MsgSpecReader to use msgspec.
  • Updates serialize_message method in MsgSpecWriter to use msgspec.
  • Adds message_writer_class = MsgSpecWriter to SQLiteTap and SampleTapCountries to use msgspec encoding by default.
  • Adds benchmark tests for msgspec reader and writer.
requirements/requirements.txt
singer_sdk/contrib/msgspec.py
tests/core/test_io.py
samples/sample_tap_sqlite/__init__.py
samples/sample_tap_countries/countries_tap.py
noxfile.py
tests/singerlib/encoding/test_msgspec.py
Refactors listen and process_lines methods to improve code structure and reusability.
  • Removes listen method from GenericSingerReader.
  • Moves the logic from GenericSingerReader.listen to GenericSingerReader.process_lines.
  • Adds a callbacks argument to GenericSingerReader.process_lines to handle different message types.
  • Removes _process_lines and _process_endofpipe methods from Target.
  • Updates target_sync_test and tap_sync_test to use io.TextIOWrapper instead of io.StringIO.
singer_sdk/plugin_base.py
singer_sdk/target_base.py
singer_sdk/singerlib/encoding/base.py
singer_sdk/testing/legacy.py
singer_sdk/testing/runners.py
tests/singerlib/encoding/test_simple.py
Adds test files for msgspec and simple encoding.
  • Adds tests/singerlib/encoding/test_msgspec.py to test msgspec encoding.
  • Adds tests/singerlib/encoding/test_simple.py to test simple encoding.
  • Adds tests/singerlib/encoding/conftest.py to configure tests for encoding.
tests/singerlib/encoding/test_msgspec.py
tests/singerlib/encoding/test_simple.py
tests/singerlib/encoding/conftest.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an issue from a review comment by replying to it. You can also reply to a review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull request title to generate a title at any time. You can also comment @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in the pull request body to generate a PR summary at any time exactly where you want it. You can also comment @sourcery-ai summary on the pull request to (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the pull request to resolve all Sourcery comments. Useful if you've already addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull request to dismiss all existing Sourcery reviews. Especially useful if you want to start fresh with a new review - don't forget to comment @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

  • Contact our support team for questions or feedback.
  • Visit our documentation for detailed guides and information.
  • Keep in touch with the Sourcery team by following us on X/Twitter, LinkedIn or GitHub.

sourcery-ai[bot] avatar Jan 22 '25 05:01 sourcery-ai[bot]