deno_std icon indicating copy to clipboard operation
deno_std copied to clipboard

feat(streams): new LimitDelimiterStream()

Open BlackAsLight opened this issue 3 weeks ago • 5 comments

This pull request offers a new TransformStream into the mix. While we do already have DelimiterStream, it isn't suitable if your delimiter is unlikely to appear for long stretches of bytes, meaning you could end up with huge chunks at a time. This pull request offers LimitDelimiterStream which as the name implies, offers a maximum length that a chunk can be.

Example

import { assertEquals } from "@std/assert";
import {
  LimitDelimiterStream,
} from "@std/streams/unstable-limit-delimiter-stream";

const encoder = new TextEncoder();

const readable = ReadableStream.from(["foo;beeps;;bar;;"])
  .pipeThrough(new TextEncoderStream())
  .pipeThrough(
    new LimitDelimiterStream({
      delimiter: encoder.encode(";"),
      limit: 4,
    }),
  );

assertEquals(
  await Array.fromAsync(readable),
  [
    { match: true, value: encoder.encode("foo") },
    { match: false, value: encoder.encode("beep") },
    { match: true, value: encoder.encode("s") },
    { match: true, value: encoder.encode("") },
    { match: true, value: encoder.encode("bar") },
    { match: true, value: encoder.encode("") },
  ],
);

BlackAsLight avatar Nov 29 '25 09:11 BlackAsLight

Codecov Report

:white_check_mark: All modified and coverable lines are covered by tests. :white_check_mark: Project coverage is 94.12%. Comparing base (206ed42) to head (a1cc7ae). :warning: Report is 3 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #6890   +/-   ##
=======================================
  Coverage   94.11%   94.12%           
=======================================
  Files         581      582    +1     
  Lines       42707    42763   +56     
  Branches     6796     6804    +8     
=======================================
+ Hits        40194    40250   +56     
  Misses       2463     2463           
  Partials       50       50           

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov[bot] avatar Nov 29 '25 10:11 codecov[bot]

Is there a particular use case for this stream? I think it is kinda random to split with a delimiter as well as a max chunk size. To me a some kind of SplitStream would make more sense and possibly could be made more flexible. The name is also very confusing because one might think it would limit the chunk count instead of the individual chunk size as the Limited*Stream in @std/streams do.

timreichen avatar Nov 29 '25 13:11 timreichen

Is there a particular use case for this stream?

Yes. I'd like to offer a streaming FormData encoder and decoder and in it I found something like this useful. The normal DelimiterStream isn't suitable as it would likely produce huge chunks, possibly too big, when processing the files defeating the purpose of streaming.

I think it is kinda random to split with a delimiter as well as a max chunk size.

To me a some kind of SplitStream would make more sense and possibly could be made more flexible.

I'd need you to be more specific on what this SplitStream would do.

The name is also very confusing because one might think it would limit the chunk count instead of the individual chunk size as the Limited*Stream in @std/streams do.

I'm not attached to the name and open to suggestions if you think there is something better to communicate its intended behaviour.

BlackAsLight avatar Nov 29 '25 17:11 BlackAsLight

Yes. I'd like to offer a streaming FormData encoder and decoder and in it I found something like this useful. The normal DelimiterStream isn't suitable as it would likely produce huge chunks, possibly too big, when processing the files defeating the purpose of streaming.

Sounds interesting to me. Can you add some note about such situation in API doc? (An example illustrating that situation would aslo be very helpful.)

kt3k avatar Dec 01 '25 07:12 kt3k

I added a note explaining more, but I can't really think of an example that's simple to demonstrate the point.

BlackAsLight avatar Dec 01 '25 09:12 BlackAsLight

@BlackAsLight Thanks for adding notes. Now I see the utility of this transform, however as @timreichen pointed, the name of the class sounds a bit confusing to me too. The meaning of limit is different from the usage in LimitedByteStream. Can you somehow come up with some other name candidates? How about ChunkedDelimiterStream, for example?

kt3k avatar Dec 08 '25 07:12 kt3k

I think there are a few things to contemplate: If this Stream is limited to the chunk size max and a delimiter, this might as well just be an option on DelimiterStream instead of a separate stream, e.g. something like

const delimStream = new DelimiterStream(new TextEncoder().encode("|"), { maxChunkByteSize: 5 })

I would argue this is fairly limited in usage, so Maybe a better option would be some kind of preserveChunks option on the DelimiterStream that does not combine chunks and an addition of MaxChunkByteSizeStream in std/streams. This would keep the stream modularity.

stream
  .pipeThrough(new DelimiterStream(new Uint8Array(2), { preserveChunks: true }))
  .pipeThrough(new MaxChunkByteSizeStream(5))

timreichen avatar Dec 08 '25 10:12 timreichen

What about:

  • PartitionedDelimiterStream
  • TruncatedDelimiterStream
  • BoundedDelimiterStream
  • CappedDelimiterStream

I think CappedDelimiterStream could be good.

BlackAsLight avatar Dec 08 '25 12:12 BlackAsLight

I would argue this is fairly limited in usage, so Maybe a better option would be some kind of preserveChunks option on the DelimiterStream that does not combine chunks and an addition of MaxChunkByteSizeStream in std/streams. This would keep the stream modularity.

stream
  .pipeThrough(new DelimiterStream(new Uint8Array(2), { preserveChunks: true }))
  .pipeThrough(new MaxChunkByteSizeStream(5))

It would still queue them all in before serving them up which defeats the purpose of streams

BlackAsLight avatar Dec 08 '25 12:12 BlackAsLight

I would argue this is fairly limited in usage, so Maybe a better option would be some kind of preserveChunks option on the DelimiterStream that does not combine chunks and an addition of MaxChunkByteSizeStream in std/streams. This would keep the stream modularity.

stream
  .pipeThrough(new DelimiterStream(new Uint8Array(2), { preserveChunks: true }))
  .pipeThrough(new MaxChunkByteSizeStream(5))

It would still queue them all in before serving them up which defeats the purpose of streams

Maybe I lack some understanding of stream functionality, but how would enqueueing all chunks defeat the purpose of streams? All chunks go through each transform stream serially without blocking no?

timreichen avatar Dec 09 '25 10:12 timreichen

I would argue this is fairly limited in usage, so Maybe a better option would be some kind of preserveChunks option on the DelimiterStream that does not combine chunks and an addition of MaxChunkByteSizeStream in std/streams. This would keep the stream modularity.

stream
  .pipeThrough(new DelimiterStream(new Uint8Array(2), { preserveChunks: true }))
  .pipeThrough(new MaxChunkByteSizeStream(5))

It would still queue them all in before serving them up which defeats the purpose of streams

Maybe I lack some understanding of stream functionality, but how would enqueueing all chunks defeat the purpose of streams? All chunks go through each transform stream serially without blocking no?

I think there might have been some misunderstanding in communication. DelimiterStream at the moment is a Uint8Array stream and you're suggesting this preserve chunks option is going to change that to an object stream of { match: boolean, value: Uint8Array } or a Uint8Array[] stream? If you meant the former, then I don't understand what MaxChunkByteStream would be doing, and if the latter then DelimiterStream would be pulling in a lot of chunks before serving the next Uint8Array[], which defeats the purpose of streams for having only a small portion of the data being handled at once and could risk running out of memory as it's a huge stream and a rare delimiter.

BlackAsLight avatar Dec 09 '25 20:12 BlackAsLight

I think there might have been some misunderstanding in communication. DelimiterStream at the moment is a Uint8Array stream and you're suggesting this preserve chunks option is going to change that to an object stream of { match: boolean, value: Uint8Array } or a Uint8Array[] stream? If you meant the former, then I don't understand what MaxChunkByteStream would be doing, and if the latter then DelimiterStream would be pulling in a lot of chunks before serving the next Uint8Array[], which defeats the purpose of streams for having only a small portion of the data being handled at once and could risk running out of memory as it's a huge stream and a rare delimiter.

Oh, I see. I guess having a separate stream class makes sense then, however maybe we could generalize that stream class so it could work not only with delimiters but is customizable to parse a json stream for example?

timreichen avatar Dec 15 '25 11:12 timreichen

@BlackAsLight CappedDelimiterStream sounds good to me. Can you update the PR?

@timreichen

Oh, I see. I guess having a separate stream class makes sense then, however maybe we could generalize that stream class so it could work not only with delimiters but is customizable to parse a json stream for example?

That sounds a bit overly general to me, but please feel free to explore such API if you feel strongly. I think CappedDelimiterStream is fine as is as it has relatively concrete example use case.

kt3k avatar Dec 16 '25 06:12 kt3k