nest icon indicating copy to clipboard operation
nest copied to clipboard

The sequence of messages from the Observable pipe in the sse method of router-response-controller.ts is disordered.

Open HUAHUAI23 opened this issue 1 year ago • 0 comments
trafficstars

Is there an existing issue for this?

  • [X] I have searched the existing issues

Current behavior

The sse code is as follows: from the console, the console.log(messageEvent) shows that the id is incrementing in order, but looking at console.log(message) in router-response-controller.ts, the order of messageEvent is disordered. Ultimately, the sequence sent over the network to the client is also disordered.

@Sse('xxx')
async function streamLogs(){
  const podLogStream = new PassThrough()
  const k8sResponse: http.IncomingMessage = await logs.log(
    namespaceOfApp,
    podName,
    containerName,
    podLogStream,
    {
      follow: true,
      previous: false,
      pretty: false,
      timestamps: false,
      tailLines: 1000,
    },
  )

  return new Observable<MessageEvent>((subscriber) => {
    let idCounter = 1
    podLogStream.on('data', (chunk) => {
      const dataString = chunk.toString()
      const messageEvent: MessageEvent = {
        id: idCounter.toString(),
        data: dataString,
        type: 'log',
      }
      idCounter++
      console.log(messageEvent)
      subscriber.next(messageEvent)
    })
  })
}
图片

https://github.com/nestjs/nest/blob/aa7538ffbe8608c41ece2a035b872e5032f57763/packages/core/router/router-response-controller.ts#L102

图片 图片 图片

Minimum reproduction code

https://codesandbox.io/p/devbox/damp-cache-n2krf4?file=%2Fsrc%2Fapp.controller.ts

Steps to reproduce

No response

Expected behavior

The sse code is as follows: from the console, the console.log(messageEvent) shows that the id is incrementing in order, but looking at console.log(message) in router-response-controller.ts, the order of messageEvent is disordered. Ultimately, the sequence sent over the network to the client is also disordered.

@Sse('xxx')
async function streamLogs(){
  const podLogStream = new PassThrough()
  const k8sResponse: http.IncomingMessage = await logs.log(
    namespaceOfApp,
    podName,
    containerName,
    podLogStream,
    {
      follow: true,
      previous: false,
      pretty: false,
      timestamps: false,
      tailLines: 1000,
    },
  )

  return new Observable<MessageEvent>((subscriber) => {
    let idCounter = 1
    podLogStream.on('data', (chunk) => {
      const dataString = chunk.toString()
      const messageEvent: MessageEvent = {
        id: idCounter.toString(),
        data: dataString,
        type: 'log',
      }
      idCounter++
      console.log(messageEvent)
      subscriber.next(messageEvent)
    })
  })
}
图片

https://github.com/nestjs/nest/blob/aa7538ffbe8608c41ece2a035b872e5032f57763/packages/core/router/router-response-controller.ts#L102

图片 图片 图片

Package

  • [ ] I don't know. Or some 3rd-party package
  • [ ] @nestjs/common
  • [X] @nestjs/core
  • [ ] @nestjs/microservices
  • [ ] @nestjs/platform-express
  • [ ] @nestjs/platform-fastify
  • [ ] @nestjs/platform-socket.io
  • [ ] @nestjs/platform-ws
  • [ ] @nestjs/testing
  • [ ] @nestjs/websockets
  • [ ] Other (see below)

Other package

No response

NestJS version

"@nestjs/core": "^9.2.1",

Packages versions


  "@nestjs/axios": "^1.0.0",
    "@nestjs/common": "^9.0.0",
    "@nestjs/config": "^2.2.0",
    "@nestjs/core": "^9.2.1",
    "@nestjs/event-emitter": "^2.0.3",
    "@nestjs/jwt": "^10.0.1",
    "@nestjs/mapped-types": "*",
    "@nestjs/passport": "^9.0.0",
    "@nestjs/platform-express": "^9.0.0",
    "@nestjs/schedule": "^2.1.0",
    "@nestjs/swagger": "^6.1.3",
    "@nestjs/throttler": "^3.1.0",

Node.js version

v22.2.0

In which operating systems have you tested?

  • [X] macOS
  • [ ] Windows
  • [ ] Linux

Other

No response

HUAHUAI23 avatar May 29 '24 07:05 HUAHUAI23