refactor: Implement msgspec encoding
- [ ] Docs
- [ ] UX. Relying on mro magic is not great, e.g. why
SQLiteTap(MsgSpecWriter, SQLTap)and notSQLiteTap(SQLTap, MsgSpecWriter). A better approach might be to make the IO implementation an attribute of the Singer class. - [ ] Fix tests
π Documentation preview π: https://meltano-sdk--2541.org.readthedocs.build/en/2541/
CodSpeed Performance Report
Merging #2541 will improve performances by Γ12
Comparing edgarrmondragon/refactor/msgspec-impl-naive (ae168a0) with main (6f32572)
Summary
β‘ 2 improvements
β
5 untouched benchmarks
Benchmarks breakdown
| Benchmark | BASE |
HEAD |
Change | |
|---|---|---|---|---|
| β‘ | test_bench_deserialize_json |
23.3 ms | 5.5 ms | Γ4.2 |
| β‘ | test_bench_format_message |
52.2 ms | 4.2 ms | Γ12 |
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 91.42%. Comparing base (
6f32572) to head (ae168a0). Report is 195 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #2541 +/- ##
==========================================
+ Coverage 91.34% 91.42% +0.08%
==========================================
Files 63 63
Lines 5231 5280 +49
Branches 677 673 -4
==========================================
+ Hits 4778 4827 +49
Misses 320 320
Partials 133 133
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.
default_output = sys.stdout.buffer
def write_message(self, message: Message) -> None:
"""Write a message to stdout.
Args:
message: The message to write.
"""
self.default_output.write(self.format_message(message))
self.default_output.flush()
In the json.py file I found to match the msgspec performance suggestions and fit into the framework you put in place I created a function for generating jsonl so I could keep the functionality of seralize_json to return strings. This way the serialize_json can be used in the connector engine creation and also in process_batch_files.
https://jcristharif.com/msgspec/perf-tips.html#line-delimited-json
json.py:
def serialize_json(obj: object, **kwargs: t.Any) -> str:
"""Serialize a dictionary into a line of json.
Args:
obj: A Python object usually a dict.
**kwargs: Optional key word arguments.
Returns:
A string of serialized json.
"""
return encoder.encode(obj).decode()
msg_buffer = bytearray(64)
def serialize_jsonl(obj: object, **kwargs: t.Any) -> bytes:
"""Serialize a dictionary into a line of jsonl.
Args:
obj: A Python object usually a dict.
**kwargs: Optional key word arguments.
Returns:
A bytes of serialized json.
"""
encoder.encode_into(obj, msg_buffer)
msg_buffer.extend(b"\n")
return msg_buffer
SingerWriter:
def serialize_message(self, message: Message) -> str | bytes:
"""Serialize a dictionary into a line of json.
Args:
message: A Singer message object.
Returns:
A string of serialized json.
"""
return serialize_jsonl(message.to_dict())
I found that it helped to add a
defualt_outputto the for the SingerWriter to use. This allows you to make the write message a little generic.default_output = sys.stdout.buffer def write_message(self, message: Message) -> None: """Write a message to stdout. Args: message: The message to write. """ self.default_output.write(self.format_message(message)) self.default_output.flush()
Do you mean in
https://github.com/meltano/sdk/blob/08b58bf4601eefc88418e0f696bad253bb7b91c9/singer_sdk/_singerlib/encoding/_msgspec.py#L73-L94
?
Yes, that is exactly what I meant. Could have definitely been stated clearer on my partπ .
class MsgSpecWriter(GenericSingerWriter[bytes, Message]):
"""Interface for all plugins writing Singer messages to stdout."""
default_output = sys.stdout.buffer
def serialize_message(self, message: Message) -> bytes: # noqa: PLR6301
"""Serialize a dictionary into a line of json.
Args:
message: A Singer message object.
Returns:
A string of serialized json.
"""
return serialize_jsonl(message.to_dict())
def write_message(self, message: Message) -> None:
"""Write a message to stdout.
Args:
message: The message to write.
"""
self.default_output.write(self.format_message(message))
self.default_output.flush()
Naive of me to think I could get this across in 1/2 a day of work π . I'll come back to this later, there's plenty of time until the planned release date.
Like the pun π. Great dad joke material. Kind an inside joke now since you dropped (naive) from the title of the PR.
Might have to punt this if https://github.com/jcrist/msgspec/pull/711 doesn't get merged before we're ready to officially declare Python 3.13 support
I found that it helped to add a defualt_output to the for the SingerWriter to use. This allows you to make the write message a little generic.
class MsgSpecWriter(GenericSingerWriter[bytes, Message]): """Interface for all plugins writing Singer messages to stdout.""" default_output = sys.stdout.buffer def serialize_message(self, message: Message) -> bytes: # noqa: PLR6301 """Serialize a dictionary into a line of json. Args: message: A Singer message object. Returns: A string of serialized json. """ return serialize_jsonl(message.to_dict()) def write_message(self, message: Message) -> None: """Write a message to stdout. Args: message: The message to write. """ self.default_output.write(self.format_message(message)) self.default_output.flush()
Well, found that making this generic by using default_output was a bad move. It is best to stick with sys.stdout or sys.stdout.buffer. I couldn't get the built-in SDK Tap tests to work until I changed back to using sys.stdout.buffer in a tap utilizing msgspec. I borrowed your use of io.TextIOWrapper(buffer=io.BytesIO()) in the test_msgspec.py file that is part of this PR and added it to singer_sdk.testing.runners.TapTestRunner._execute_sync so the redirect_stdout doesn't error because of stdout_buf = io.StringIO() lack of a .buffer to write to.
def _execute_sync(self) -> tuple[str, str]:
"""Invoke a Tap object and return STDOUT and STDERR results in StringIO buffers.
Returns:
A 2-item tuple with StringIO buffers from the Tap's output: (stdout, stderr)
"""
stdout_buf = io.TextIOWrapper(buffer=io.BytesIO())
stderr_buf = io.TextIOWrapper(buffer=io.BytesIO())
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
self.run_sync_dry_run()
stdout_buf.seek(0)
stderr_buf.seek(0)
return stdout_buf.read(), stderr_buf.read()
In summary I had to change the tap's overwrite of write_message to utilize sys.stdout.buffer.write() and sys.stdout.flush(). Then changed singer_sdk.testing.runners.TapTestRunner._execute_sync to use io.TextIOWrapper(buffer=io.BytesIO()) instead of io.StringIO() for stdout_buff and stderr_buff. The Singer-SDK built in tap tests ran correctly after the change to the tap and installing a local development branch of the singer-sdk with the change to TapTestRunner._execute_sync in the virtual environment I ran the tap test from.
Ok, the tests are passing.
Now I want to think of how to make it easy and straightforward for a developer to use msgspec as the SerDe layer, and also keep the door open to the user being the one deciding which serialization layer to use.
@sourcery-ai review
Reviewer's Guide by Sourcery
This pull request introduces msgspec encoding for improved performance, refactors the reader/writer implementations, and adds new test cases. It also introduces BaseSingerReader and BaseSingerWriter classes to standardize reader/writer implementations.
Updated class diagram for BaseSingerReader and BaseSingerWriter
classDiagram
class PluginBase {
+config: dict | PurePath | str | list[PurePath | str] | None
+parse_env_config: bool
+validate_config: bool
}
class BaseSingerReader {
+message_reader_class: type[GenericSingerReader]
+message_reader: GenericSingerReader | None
+listen(file_input: t.IO[str] | None) : None
+process_lines(file_input: t.IO[str] | None) : t.Counter[str]
+process_endofpipe() : None
+_assert_line_requires(message_dict: dict, requires: set[str]) : None
<<abstract>>
+_process_schema_message(message_dict: dict) : None
<<abstract>>
+_process_record_message(message_dict: dict) : None
<<abstract>>
+_process_state_message(message_dict: dict) : None
<<abstract>>
+_process_activate_version_message(message_dict: dict) : None
<<abstract>>
+_process_batch_message(message_dict: dict) : None
}
class BaseSingerWriter {
+message_writer_class: type[GenericSingerWriter]
+message_writer: GenericSingerWriter | None
+write_message(message: t.Any) : None
}
PluginBase <|-- BaseSingerReader
PluginBase <|-- BaseSingerWriter
Class diagram for MsgSpecReader and MsgSpecWriter
classDiagram
class GenericSingerReader {
<<interface>>
+deserialize_json(line: str) : dict
}
class GenericSingerWriter {
<<interface>>
+serialize_message(message: Message) : bytes
}
class MsgSpecReader {
+default_input: t.IO
+deserialize_json(line: str) : dict
}
class MsgSpecWriter {
+serialize_message(message: Message) : bytes
+write_message(message: Message) : None
}
GenericSingerReader <|.. MsgSpecReader : implements
GenericSingerWriter <|.. MsgSpecWriter : implements
File-Level Changes
| Change | Details | Files |
|---|---|---|
Introduces BaseSingerReader and BaseSingerWriter classes to standardize reader/writer implementations. |
|
singer_sdk/plugin_base.pysinger_sdk/target_base.pysinger_sdk/tap_base.py |
Implements msgspec encoding for improved performance. |
|
requirements/requirements.txtsinger_sdk/contrib/msgspec.pytests/core/test_io.pysamples/sample_tap_sqlite/__init__.pysamples/sample_tap_countries/countries_tap.pynoxfile.pytests/singerlib/encoding/test_msgspec.py |
Refactors listen and process_lines methods to improve code structure and reusability. |
|
singer_sdk/plugin_base.pysinger_sdk/target_base.pysinger_sdk/singerlib/encoding/base.pysinger_sdk/testing/legacy.pysinger_sdk/testing/runners.pytests/singerlib/encoding/test_simple.py |
Adds test files for msgspec and simple encoding. |
|
tests/singerlib/encoding/test_msgspec.pytests/singerlib/encoding/test_simple.pytests/singerlib/encoding/conftest.py |
Tips and commands
Interacting with Sourcery
-
Trigger a new review: Comment
@sourcery-ai reviewon the pull request. - Continue discussions: Reply directly to Sourcery's review comments.
-
Generate a GitHub issue from a review comment: Ask Sourcery to create an
issue from a review comment by replying to it. You can also reply to a
review comment with
@sourcery-ai issueto create an issue from it. -
Generate a pull request title: Write
@sourcery-aianywhere in the pull request title to generate a title at any time. You can also comment@sourcery-ai titleon the pull request to (re-)generate the title at any time. -
Generate a pull request summary: Write
@sourcery-ai summaryanywhere in the pull request body to generate a PR summary at any time exactly where you want it. You can also comment@sourcery-ai summaryon the pull request to (re-)generate the summary at any time. -
Generate reviewer's guide: Comment
@sourcery-ai guideon the pull request to (re-)generate the reviewer's guide at any time. -
Resolve all Sourcery comments: Comment
@sourcery-ai resolveon the pull request to resolve all Sourcery comments. Useful if you've already addressed all the comments and don't want to see them anymore. -
Dismiss all Sourcery reviews: Comment
@sourcery-ai dismisson the pull request to dismiss all existing Sourcery reviews. Especially useful if you want to start fresh with a new review - don't forget to comment@sourcery-ai reviewto trigger a new review! -
Generate a plan of action for an issue: Comment
@sourcery-ai planon an issue to generate a plan of action for it.
Customizing Your Experience
Access your dashboard to:
- Enable or disable review features such as the Sourcery-generated pull request summary, the reviewer's guide, and others.
- Change the review language.
- Add, remove or edit custom review instructions.
- Adjust other review settings.
Getting Help
- Contact our support team for questions or feedback.
- Visit our documentation for detailed guides and information.
- Keep in touch with the Sourcery team by following us on X/Twitter, LinkedIn or GitHub.