fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

fix: chunk get together when random rechunk

Open yyy1000 opened this issue 1 year ago • 3 comments

This wants to fix #3195

Consider the case: if (size < acc.size)

val (out, rem) = acc.splitAt(size - 1)
Pull.output(out) >> go(rem ++ hd, -1, tl)

The code will continue to append the 'hd' to 'rem', which may be larger than the 'size', to the next 'go' method. And this will lead to the 'tl' being consumed totally without restricting the size of 'Chunk'. :)

yyy1000 avatar Apr 12 '23 13:04 yyy1000

Uh, it seems that it will lead the time to be longer and fail the pipeline. :(

yyy1000 avatar Apr 12 '23 14:04 yyy1000

I can replicate the CI failures locally, can you replicate them as well? Not sure what's wrong, could it be getting into some sort of loop or something?

sbt:root> coreJVM/testOnly fs2.JvmNativeCompressionSuite
fs2.JvmNativeCompressionSuite:
  + inflate please wrap 0.493s
  + inflate please nowrap 0.005s
  + deflate input 0.437s
  + inflate input 0.1s
  + inflate input (deflated larger than inflated) 0.005s
==> X fs2.JvmNativeCompressionSuite.deflate |> inflate ~= id  0.222s munit.FailException: Failing seed: 2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD=
You can reproduce this failure by adding the following override to your suite:

  override def scalaCheckInitialSeed = "2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD="

Exception raised on property evaluation.
> ARG_0: "\u0003"
> ARG_1: true
> ARG_2: NINE
> ARG_3: HUFFMAN_ONLY
> ARG_4: SYNC_FLUSH
> ARG_0_ORIGINAL: "砱弚豗"
> Exception: java.util.zip.DataFormatException: Insufficient data
    at munit.Assertions.fail(Assertions.scala:252)
    at munit.Assertions.fail$(Assertions.scala:246)
    at munit.FunSuite.fail(FunSuite.scala:11)
    at munit.ScalaCheckEffectSuite.munit$ScalaCheckEffectSuite$$parseTestResult(ScalaCheckEffectSuite.scala:80)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2(ScalaCheckEffectSuite.scala:68)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2$adapted(ScalaCheckEffectSuite.scala:68)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at modify @ fs2.internal.Scope.close(Scope.scala:262)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at rethrow$extension @ fs2.Compiler$Target.$anonfun$compile$1(Compiler.scala:157)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Pull$.$anonfun$compile$21(Pull.scala:1214)
    at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:224)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
  + deflate.compresses input 0.005s
  + deflate and inflate are reusable 0.573s
  + gzip.compresses input 0.017s
  + gzip and gunzip are reusable 0.358s
  + maybeGunzip - not gzip 0.307s
  + maybeGunzip - gzip 0.326s
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id  120.013s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)  120.005s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:90m467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)  120.012s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
  + gzip |> GZIPInputStream ~= id 0.048s
  + gzip.compresses input, with FLG.FHCRC set 0.013s
  + gunzip limit fileName and comment length 4.22s
  + unix.gzip |> gunzip 0.007s
[error] Failed: Total 19, Failed 4, Errors 0, Passed 15
[error] Failed tests:
[error]         fs2.JvmNativeCompressionSuite
[error] (coreJVM / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 375 s (06:15), completed Apr 14, 2023, 5:57:37 PM

armanbilge avatar Apr 14 '23 18:04 armanbilge

I can replicate the CI failures locally, can you replicate them as well? Not sure what's wrong, could it be getting into some sort of loop or something?

sbt:root> coreJVM/testOnly fs2.JvmNativeCompressionSuite
fs2.JvmNativeCompressionSuite:
  + inflate please wrap 0.493s
  + inflate please nowrap 0.005s
  + deflate input 0.437s
  + inflate input 0.1s
  + inflate input (deflated larger than inflated) 0.005s
==> X fs2.JvmNativeCompressionSuite.deflate |> inflate ~= id  0.222s munit.FailException: Failing seed: 2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD=
You can reproduce this failure by adding the following override to your suite:

  override def scalaCheckInitialSeed = "2jaVSELncu5xoNkpBuxj5VZNUOP7iDEt_rzLvxoOMCD="

Exception raised on property evaluation.
> ARG_0: "\u0003"
> ARG_1: true
> ARG_2: NINE
> ARG_3: HUFFMAN_ONLY
> ARG_4: SYNC_FLUSH
> ARG_0_ORIGINAL: "砱弚豗"
> Exception: java.util.zip.DataFormatException: Insufficient data
    at munit.Assertions.fail(Assertions.scala:252)
    at munit.Assertions.fail$(Assertions.scala:246)
    at munit.FunSuite.fail(FunSuite.scala:11)
    at munit.ScalaCheckEffectSuite.munit$ScalaCheckEffectSuite$$parseTestResult(ScalaCheckEffectSuite.scala:80)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2(ScalaCheckEffectSuite.scala:68)
    at munit.ScalaCheckEffectSuite.$anonfun$checkPropF$2$adapted(ScalaCheckEffectSuite.scala:68)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at modify @ fs2.internal.Scope.close(Scope.scala:262)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at rethrow$extension @ fs2.Compiler$Target.$anonfun$compile$1(Compiler.scala:157)
    at get @ fs2.internal.Scope.openScope(Scope.scala:275)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Pull$.$anonfun$compile$21(Pull.scala:1214)
    at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:224)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
  + deflate.compresses input 0.005s
  + deflate and inflate are reusable 0.573s
  + gzip.compresses input 0.017s
  + gzip and gunzip are reusable 0.358s
  + maybeGunzip - not gzip 0.307s
  + maybeGunzip - gzip 0.326s
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id  120.013s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)  120.005s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:90m467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
==> X fs2.JvmNativeCompressionSuite.gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)  120.012s java.util.concurrent.TimeoutException: test timed out after 2 minutes
    at munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at scala.Option.fold(Option.scala:263)
    at munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:75)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
    at munit.ValueTransforms$ValueTransform.apply(ValueTransforms.scala:14)
    at munit.ValueTransforms.$anonfun$munitValueTransform$2(ValueTransforms.scala:28)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
    at scala.collection.IterableOnceOps.collectFirst(IterableOnce.scala:1116)
    at scala.collection.IterableOnceOps.collectFirst$(IterableOnce.scala:1108)
    at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1300)
    at munit.ValueTransforms.$anonfun$munitValueTransform$1(ValueTransforms.scala:29)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at unsafeToFuture @ munit.CatsEffectSuite$$anonfun$1.applyOrElse(CatsEffectSuite.scala:82)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
    at timeoutTo @ munit.CatsEffectSuite$$anonfun$1.$anonfun$applyOrElse$2(CatsEffectSuite.scala:78)
  + gzip |> GZIPInputStream ~= id 0.048s
  + gzip.compresses input, with FLG.FHCRC set 0.013s
  + gunzip limit fileName and comment length 4.22s
  + unix.gzip |> gunzip 0.007s
[error] Failed: Total 19, Failed 4, Errors 0, Passed 15
[error] Failed tests:
[error]         fs2.JvmNativeCompressionSuite
[error] (coreJVM / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 375 s (06:15), completed Apr 14, 2023, 5:57:37 PM

Thanks! I replicated it locally but didn't discover the details. I should check it carefully. :)

yyy1000 avatar Apr 15 '23 01:04 yyy1000