go-coap icon indicating copy to clipboard operation
go-coap copied to clipboard

Block-wise transfers silently drops part of the payload after timeout

Open palsivertsen opened this issue 9 months ago • 2 comments

I'm observing an issue with the server's handling of incoming block wise transfers when a request exceeds the block-wise timeout. Earlier blocks are silently dropped and requests are passed to handlers with an incomplete payload and no error.

Here's a test demonstrating the issue:

package coapblockwisetimeout_test

import (
	"bytes"
	"io"
	"log"
	"testing"
	"time"

	"github.com/plgd-dev/go-coap/v3/message"
	"github.com/plgd-dev/go-coap/v3/message/codes"
	"github.com/plgd-dev/go-coap/v3/mux"
	"github.com/plgd-dev/go-coap/v3/net"
	"github.com/plgd-dev/go-coap/v3/net/blockwise"
	"github.com/plgd-dev/go-coap/v3/options"
	"github.com/plgd-dev/go-coap/v3/udp"
	"github.com/plgd-dev/go-coap/v3/udp/server"
)

func TestBlockWiseTimeout(t *testing.T) {
	// parts of the payload is lost when the duration of a block transfer with
	// more than one block exceeds the blockwise timeout
	tests := []struct {
		lag              time.Duration
		blockwiseTimeout time.Duration
	}{
		// lag >= timeout fails
		{
			lag:              0,
			blockwiseTimeout: time.Second,
		},
		{
			lag:              900 * time.Millisecond,
			blockwiseTimeout: time.Second,
		},
		{
			lag:              time.Second,
			blockwiseTimeout: time.Second,
		},
		{
			lag:              100 * time.Millisecond,
			blockwiseTimeout: 100 * time.Millisecond,
		},
	}
	for _, tt := range tests {
		t.Run(tt.lag.String()+"_"+tt.blockwiseTimeout.String(), func(t *testing.T) {
			t.Parallel()

			var actualBody bytes.Buffer // handler writes payload here

			url := setupServer(t,
				options.WithMux(recBodyHandler(&actualBody)),
				options.WithBlockwise(true, blockwise.SZX16, tt.blockwiseTimeout),
			)

			conn, err := udp.Dial(url)
			if err != nil {
				t.Fatal("Error dialing: ", err)
			}
			t.Log("client:", conn.LocalAddr().String())

			// Manage messages manually to avoid running into:
			// https://github.com/plgd-dev/go-coap/issues/572
			// Send a put request with payload split into two requests.
			firstBlock := conn.AcquireMessage(t.Context())
			firstBlock.SetMessage(message.Message{
				Token: message.Token{123},
				Options: message.Options{{
					ID:    message.Block1,
					Value: []byte{0b0000_1_000}, // num 0, more true, szx 16
				}},
				Code:      codes.PUT,
				Payload:   []byte("first part      "), // 16 bytes
				MessageID: 0,
				Type:      message.Confirmable,
			})

			if err := conn.WriteMessage(firstBlock); err != nil {
				t.Fatal("put: ", err)
			}
			conn.ReleaseMessage(firstBlock)

			// add a delay between blocks
			time.Sleep(tt.lag)

			secondBlock := conn.AcquireMessage(t.Context())
			secondBlock.SetMessage(message.Message{
				Token: message.Token{123},
				Options: message.Options{{
					ID:    message.Block1,
					Value: []byte{0b0001_0_000}, // num 1, more false, szx 16
				}},
				Code:      codes.PUT,
				Payload:   []byte("second part"),
				MessageID: 1,
				Type:      message.Confirmable,
			})

			if err := conn.WriteMessage(secondBlock); err != nil {
				t.Fatal("put: ", err)
			}
			conn.ReleaseMessage(secondBlock)

			actualBodyStr := actualBody.String()
			expectedBody := "first part      second part"
			if actualBodyStr != expectedBody {
				t.Fatalf("unexpected body: %q, got: %q", actualBodyStr, expectedBody)
			}
		})
	}
}

// helpers

func setupServer(tb testing.TB, opts ...server.Option) string {
	tb.Helper()

	l, err := net.NewListenUDP("udp4", "localhost:0")
	if err != nil {
		tb.Fatalf("create listener: %s", err)
	}
	tb.Cleanup(func() {
		if errC := l.Close(); errC != nil && err == nil {
			err = errC
		}
	})

	s := udp.NewServer(opts...)
	tb.Cleanup(s.Stop)

	tb.Log("starting server:", l.LocalAddr().String())
	go func() {
		if err := s.Serve(l); err != nil {
			log.Println("serve:", err)
		}
	}()

	return l.LocalAddr().String()
}

func recBodyHandler(rec io.Writer) mux.HandlerFunc {
	return func(w mux.ResponseWriter, r *mux.Message) {
		// append the payload to rec
		if _, err := io.Copy(rec, r.Body()); err != nil {
			panic(err)
		}

		if err := w.SetResponse(codes.Valid, message.AppOctets, nil); err != nil {
			panic(err)
		}
	}
}

Test output:

--- FAIL: TestBlockWiseTimeout (0.00s)
    --- FAIL: TestBlockWiseTimeout/100ms_100ms (0.10s)
        bug_test.go:51: starting server: 127.0.0.1:36497
        bug_test.go:60: client: 127.0.0.1:54059
        bug_test.go:107: unexpected body: "second part", got: "first part      second part"
    --- FAIL: TestBlockWiseTimeout/1s_1s (1.00s)
        bug_test.go:51: starting server: 127.0.0.1:56725
        bug_test.go:60: client: 127.0.0.1:36002
        bug_test.go:107: unexpected body: "second part", got: "first part      second part"
FAIL
FAIL	tmp	1.007s
?   	tmp/client	[no test files]
?   	tmp/server	[no test files]
FAIL

palsivertsen avatar Mar 31 '25 16:03 palsivertsen

The issue is that:

  1. First block arrives (num = 0 ) -> Accepts and stores it
  2. Timeout occurs before next block -> Drops stored blocks
  3. Next block (num=1) arrives after timeout -> BUG calling a handler instead of send a respond with 4.08 Request Entity Incomplete

jkralik avatar Mar 31 '25 19:03 jkralik

Would it be safe to increase the blockwise timeout to lets say EXCHANGE_LIFETIME (247 seconds) to mitigate this?

palsivertsen avatar Apr 01 '25 08:04 palsivertsen