hypercore icon indicating copy to clipboard operation
hypercore copied to clipboard

protocol stream `.end()` not working as expected

Open gmaclennan opened this issue 2 years ago • 9 comments
trafficstars

The end event on the protocol stream returned by core.replicate() does not seem to propogate as expected.

Use case:

I want to gracefully end a replication stream without destroying it.

Expected behaviour:

const s = core.replicate(true)
s.on('end', () => console.log('end'))
s.end()

I would expect the above code to output end. It doesn't. There is a scenario where it does work: if the core that is replicating is a read-only peer, and you call s.end() after first calling core.update(). There does not seem to be a way to end the replication stream for the writer core.

Minimal reproduction:

import Hypercore from 'hypercore'
import RAM from 'random-access-memory'
import test from 'tape'

test('can end replication stream from writer', async t => {
  t.plan(2)
  ;(async () => {
    const core1 = new Hypercore(RAM)
    await core1.ready()
    const core2 = new Hypercore(RAM, core1.key)

    const s1 = core1.replicate(true)
    const s2 = core2.replicate(false)

    s1.on('end', () => t.pass('s1 end'))
    s2.on('end', () => t.pass('s2 end'))

    s1.pipe(s2).pipe(s1)

    await core1.update()
    s1.end()
  })()
})

test('can end replication stream from reader', t => {
  t.plan(2)
  ;(async () => {
    const core1 = new Hypercore(RAM)
    await core1.ready()
    const core2 = new Hypercore(RAM, core1.key)

    const s1 = core1.replicate(true)
    const s2 = core2.replicate(false)

    s1.on('end', () => t.pass('s1 end'))
    s2.on('end', () => t.pass('s2 end'))

    s1.pipe(s2).pipe(s1)

    await core2.update()
    s2.end()
  })()
})

gmaclennan avatar Apr 25 '23 12:04 gmaclennan

When you do stream.end() this sends the signal to the other side, meaning the other side gets the 'end' event but not you, unless they also send stream.end() back, for example:

const s = core.replicate(true)
s.on('end', () => s.end())
// ...
s.end()

I think this is automatically handled so you should only need to do s.end() and wait for 'close' event (unsure about timeouts here).

Except if you listen to the event like s.on('end', cb) then it's your job to do as above and manually call s.end(). In your Expected behaviour example you were registering the event and just logging, so you're not sending back s.end().

LuKks avatar Apr 25 '23 13:04 LuKks

Thanks @LuKks that makes sense, however this is not the behaviour I am seeing: s.end() does not cause any end event on the other side. You can see from running the tests in my comment that the only event that fires is s2.on('end') when you call s2.end(), in the second test. No other end events fire.

gmaclennan avatar Apr 25 '23 13:04 gmaclennan

Ok did some digging and got the following results. It seems like s.end() results in inconsistent behaviour if called in the same tick, or the next tick, but is consistent (but not quite in the way that you suggest) if I call it after setTimeout(0):

import Hypercore from 'hypercore'
import RAM from 'random-access-memory'

const core1 = new Hypercore(RAM)
await core1.ready()
const core2 = new Hypercore(RAM, core1.key)

const s1 = core1.replicate(true)
const s2 = core2.replicate(false)

s1.on('end', () => console.log('s1 end'))
s1.on('close', () => console.log('s1 close'))
s2.on('end', () => console.log('s2 end'))
s2.on('close', () => console.log('s2 close'))

s1.pipe(s2).pipe(s1)

s2.end()

This code logs nothing, either when calling s1.end() or s2.end().

If I await process.nextTick I get different results depending on which core I await and which stream I end:

await new Promise(res => process.nextTick(res))
s1.end()
// s1 end
// s1 close
// s2 close
await new Promise(res => process.nextTick(res))
s2.end()
// s2 end
// s1 end
// s1 close

If I await setTimeout(0) then I get consistent, but not quite what I would expect based on your comment.

await new Promise(res => setTimeout(res, 0))
s1.end()
// s1 end
// s1 close
// s2 close
await new Promise(res => setTimeout(res, 0))
s2.end()
// s2 end
// s2 close
// s1 close

gmaclennan avatar Apr 25 '23 14:04 gmaclennan

Try commenting out the lines where you listen to 'end' events, and try again without waiting the next tick

LuKks avatar Apr 25 '23 14:04 LuKks

Done: nothing logged (e.g. no 'close' events logging)

gmaclennan avatar Apr 25 '23 14:04 gmaclennan

Possibly related? https://github.com/mafintosh/streamx/issues/72

gmaclennan avatar Apr 25 '23 14:04 gmaclennan

I was about to say that it sounds like a bug on secret-stream because it registers the 'end' event a bit late, that's why if you wait a tick it works. You should create PR on secret-stream where you add a single simple test case reproducing the bug

LuKks avatar Apr 25 '23 15:04 LuKks

Meanwhile, instead of commenting it out do this:

s1.on('end', () => s1.end())
s2.on('end', () => s2.end())

And try again without waiting any tick

LuKks avatar Apr 25 '23 15:04 LuKks

Unsure if you can end these streams atm. We tend to use the replicate(stream) api everywhere with a swarm stream, and the stream returned works a bit differently, but we'll look into it.

mafintosh avatar Jun 03 '23 20:06 mafintosh