🔥 Feature (v3): Add buffered streaming support
Description
This feature adds buffered streaming support to Fiber v3 through the new Ctx method SendStreamWriter:
func (c Ctx ) SendStreamWriter(streamWriter func(w *bufio.Writer))
c.SendStreamWriter() essentially is a wrapper for calling fasthttp's SetBodyStreamWriter method. This feature wraps this method in the same way that c.SendStream() wraps fasthttp's SetBodyStream().
With this feature, Fiber users can send shorter segments of content over persistent HTTP connections. This functionality is important for several web-based applications such as:
- Server-Side Events (SSE)
- Large File Downloads
For example, a simple self-contained SSE example using this new feature can be setup as the following:
homeHtml := `<!DOCTYPE html>
<html>
<body>
<h1>SSE Messages</h1>
<div id="result"></div>
<script>
if (typeof(EventSource) !== "undefined") {
var source = new EventSource("http://127.0.0.1:3000/sse");
source.addEventListener("my-event", function(event) {
document.getElementById("result").innerHTML += event.data + "<br>";
});
} else {
document.getElementById("result").innerHTML = "Sorry, your browser does not support server-sent events...";
}
</script>
</body>
</html>
`
app := fiber.New()
app.Get("/", func(c fiber.Ctx) error {
c.Set("Content-Type", fiber.MIMETextHTMLCharsetUTF8)
return c.SendString(homeHtml)
})
app.Get("/sse", func (c fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
return c.Status(fiber.StatusOK).SendStreamWriter(func (w *bufio.Writer) {
for {
fmt.Fprintf(w, "event: my-event\ndata: %s\n\n", time.Now().Local())
if err := w.Flush(); err != nil {
return
}
time.Sleep(time.Second)
}
})
})
Fixes #3127
Type of change
Please delete options that are not relevant.
- [X] New feature (non-breaking change which adds functionality)
- [X] Documentation update (changes to documentation)
CURRENT STATUS
-
[X] Features
- [X] Add
Ctx.SendStreamWriter()to ctx.go
- [X] Add
-
[X] Unit Tests
-
[X] Add
Test_Ctx_SendStream_Writerto ctx_test.go -
[X] Add
Test_Ctx_SendStreamWriter_Interruptedto ctx_test.go⚠️ WARNING: This test gives a race condition warning when running with
-race, but is stable and consistent via mutex/channels.
-
-
[X] Documentation
- [X] Add
Ctx.SendStreamWriter()docs to docs/api/ctx.md
- [X] Add
-
[ ] Benchmarks
Checklist
Before you submit your pull request, please make sure you meet these requirements:
- [X] Followed the inspiration of the Express.js framework for new functionalities, making them similar in usage.
- [X] Conducted a self-review of the code and provided comments for complex or critical parts.
- [X] Updated the documentation in the
/docs/directory for Fiber's documentation. - [X] Added or updated unit tests to validate the effectiveness of the changes or new features.
- [X] Ensured that new and existing unit tests pass locally with the changes.
- [X] Verified that any new dependencies are essential and have been agreed upon by the maintainers/community.
- [X] Aimed for optimal performance with minimal allocations in the new code.
- [ ] Provided benchmarks for the new code to analyze and improve upon.
Walkthrough
The pull request introduces a new method, SendStreamWriter, to the DefaultCtx struct and the Ctx interface within the fiber package. This method allows users to set a response body stream writer using a function that accepts a pointer to a bufio.Writer. Additionally, two new test functions are added to validate the functionality of SendStreamWriter. Documentation is updated to include usage examples for this new method, enhancing the framework's ability to handle streaming data in HTTP responses.
Changes
| Files | Change Summary |
|---|---|
ctx.go, ctx_interface_gen.go |
Added SendStreamWriter(func(*bufio.Writer)) error method to DefaultCtx and Ctx interface. |
ctx_test.go |
Introduced tests Test_Ctx_SendStreamWriter and Test_Ctx_SendStreamWriter_Interrupted for the new method. |
docs/api/ctx.md |
Updated documentation to include the SendStreamWriter method with usage examples. |
Assessment against linked issues
| Objective | Addressed | Explanation |
|---|---|---|
| Add buffered streaming support (Proposal #3127) | ✅ | |
| Ensure API stability without breaking changes (Proposal #3127) | ✅ | |
| Provide examples for Server-Side Events (SSE) in Fiber (Proposal #3127) | ✅ |
Possibly related PRs
- #3161: This PR enhances the testing capabilities of the Fiber framework, which is relevant as it relates to the testing of the
SendStreamWritermethod introduced in the main PR. - #3200: This PR adds context support to the RequestID middleware, which is relevant as it enhances context management, similar to the improvements made in the main PR.
- #3206: This documentation update improves clarity regarding context usage, which is relevant to the context handling in the
SendStreamWritermethod.
Suggested reviewers
- gaby
- sixcolors
- ReneWerner87
- efectn
Poem
In the land of code where rabbits play,
A stream of data flows all day.
WithSendStreamWriter, swift and bright,
We send our messages, a joyful sight!
Hopping through bytes, we dance and cheer,
For streaming support is finally here! 🐇✨
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?
🪧 Tips
Chat
There are 3 ways to chat with CodeRabbit:
- Review comments: Directly reply to a review comment made by CodeRabbit. Example:
-
I pushed a fix in commit <commit_id>, please review it. -
Generate unit testing code for this file. -
Open a follow-up GitHub issue for this discussion.
-
- Files and specific lines of code (under the "Files changed" tab): Tag
@coderabbitaiin a new review comment at the desired location with your query. Examples:-
@coderabbitai generate unit testing code for this file. -
@coderabbitai modularize this function.
-
- PR comments: Tag
@coderabbitaiin a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:-
@coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase. -
@coderabbitai read src/utils.ts and generate unit testing code. -
@coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format. -
@coderabbitai help me debug CodeRabbit configuration file.
-
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.
CodeRabbit Commands (Invoked using PR comments)
-
@coderabbitai pauseto pause the reviews on a PR. -
@coderabbitai resumeto resume the paused reviews. -
@coderabbitai reviewto trigger an incremental review. This is useful when automatic reviews are disabled for the repository. -
@coderabbitai full reviewto do a full review from scratch and review all the files again. -
@coderabbitai summaryto regenerate the summary of the PR. -
@coderabbitai resolveresolve all the CodeRabbit review comments. -
@coderabbitai configurationto show the current CodeRabbit configuration for the repository. -
@coderabbitai helpto get help.
Other keywords and placeholders
- Add
@coderabbitai ignoreanywhere in the PR description to prevent this PR from being reviewed. - Add
@coderabbitai summaryto generate the high-level summary at a specific location in the PR description. - Add
@coderabbitaianywhere in the PR title to generate the title automatically.
CodeRabbit Configuration File (.coderabbit.yaml)
- You can programmatically configure CodeRabbit by adding a
.coderabbit.yamlfile to the root of your repository. - Please see the configuration documentation for more information.
- If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation:
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
Documentation and Community
- Visit our Documentation for detailed information on how to use CodeRabbit.
- Join our Discord Community to get help, request features, and share feedback.
- Follow us on X/Twitter for updates and announcements.
@grivera64 Try this:
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})
var mutex sync.Mutex
var wg sync.WaitGroup // WaitGroup to synchronize goroutines
startChan := make(chan bool)
interruptStreamWriter := func() {
defer wg.Done() // Notify WaitGroup when done
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
mutex.Unlock()
}
wg.Add(1) // Increment WaitGroup counter before starting goroutine
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()
startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
mutex.Unlock()
if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
wg.Wait() // Wait for the interruptStreamWriter to finish
// Protect access to the response body with the mutex
mutex.Lock()
defer mutex.Unlock()
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
}
@gaby Thanks for the recommendation! Adding the Wait Group does remove the race error, but now I am getting an empty response body. I think this may be due to one of the following:
- Using
c.Response().CloseBodyStream()may be freeing the response body internally, makingc.Context().Body()no longer valid after waiting long enough. This seems strange, but is the most probable, as it is a race condition.- The fix for this would be to find a different method to initiate a client disconnect.
- The Wait Group isn't waiting long enough to allow
Flush()to work. This is least probable, since I also tried increasing the counter to ensure that bothinterruptStreamWriterandSendStreamWriterfinished executing, and the body returned is still empty:
// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})
var mutex sync.Mutex
var wg sync.WaitGroup
startChan := make(chan bool)
interruptStreamWriter := func() {
wg.Add(1)
defer wg.Done()
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
mutex.Unlock()
}
wg.Add(1)
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()
defer wg.Done()
startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
mutex.Unlock()
if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
// Wait for StreamWriter and the goroutine to finish
wg.Wait()
mutex.Lock()
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
mutex.Unlock()
}
I will file an issue on valyala/fasthttp to ask how to mock client disconnections for this test. If it's not possible, we could remove this test case, as most of the other tests do not test for client disconnection issues.
Hey all, after reading a bit more on Fiber and Fasthttp documentation, I believe that the race condition is due to fasthttp.(*Response) not being concurrent-safe.
I tried using a modified version of app's Test() method, that closes an underlying net.Conn connection (I modified testConn too to make Close() block further writes but allow reads). This seemed to do the same as c.Response().CloseBodyStream(), but still gave the race condition issue.
This is the new race condition warning:
WARNING: DATA RACE
Read at 0x00c00016c130 by goroutine 9:
github.com/gofiber/fiber/v3.(*testConn).Write()
/fiber/helpers.go:628 +0x6b
bufio.(*Writer).Flush()
/usr/local/go/src/bufio/bufio.go:639 +0xee
github.com/valyala/fasthttp.writeChunk()
/go/pkg/mod/github.com/valyala/[email protected]/http.go:2250 +0x10b
github.com/valyala/fasthttp.writeBodyChunked()
/go/pkg/mod/github.com/valyala/[email protected]/http.go:2170 +0xce
github.com/valyala/fasthttp.(*Response).writeBodyStream()
/go/pkg/mod/github.com/valyala/[email protected]/http.go:2066 +0x338
github.com/valyala/fasthttp.(*Response).Write()
/go/pkg/mod/github.com/valyala/[email protected]/http.go:1967 +0x2c4
github.com/valyala/fasthttp.writeResponse()
/go/pkg/mod/github.com/valyala/[email protected]/server.go:2589 +0xb8
github.com/valyala/fasthttp.(*Server).serveConn()
/go/pkg/mod/github.com/valyala/[email protected]/server.go:2432 +0x1ead
github.com/valyala/fasthttp.(*Server).ServeConn()
/go/pkg/mod/github.com/valyala/[email protected]/server.go:2042 +0x154
github.com/gofiber/fiber/v3.(*App).TestWithInterrupt.func1()
/fiber/app.go:975 +0xde
...
This was the old warning when directly using c.SendStreamWriter() without a server:
github.com/valyala/fasthttp.(*Response).SetBodyStream()
/go/pkg/mod/github.com/valyala/[email protected]/http.go:249 +0x4f
github.com/valyala/fasthttp.(*Response).SetBodyStreamWriter()
/go/pkg/mod/github.com/valyala/[email protected]/http.go:292 +0x64
...
I believe fixing this race warning in the cleanest way possible would require an upstream PR to fasthttp (most likely adding a mutex for fasthttp.(*Response) related methods).
Based on Fiber's current codebase, there doesn't seem to be other interrupt tests (while other tests ignore output when the response times out). If this isn't something we should be testing for, we could just remove the interrupt test and keep the remaining tests.
What are your thoughts on this? Please let me know if you want to see the modified app.Test() code.
Edit: I will still try to work with the modified app.Test() code to see if I can get the race error to go away, since it still does look promising despite the above.
After editing app.Test() a bit, I was able to get a working test without the race warning. I had to create an app method called TestWithInterrupt():
// TestWithInterrupt is used for internal debugging by passing a *http.Request with an interruptAfter duration.
func (app *App) TestWithInterrupt(req *http.Request, interruptAfter time.Duration) (*http.Response, error)
With this change, I was able to use that in the test case as written below as a fix:
func Test_Ctx_SendStreamWriter_Interrupted_New(t *testing.T) {
t.Parallel()
app := New(Config{StreamRequestBody: true})
app.Get("/", func(c Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
time.Sleep(time.Duration(lineNum) * time.Millisecond)
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
if err := w.Flush(); err != nil {
if lineNum <= 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
}
})
})
resp, err := app.TestWithInterrupt(httptest.NewRequest(MethodGet, "/", nil), 8*time.Millisecond)
require.NoError(t, err, "app.TestWithInterrupt(req)")
body, err := io.ReadAll(resp.Body)
require.NotNil(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body))
}
Would you all like me to write an issue/PR for adding app.TestWithInterrupt() then use the function here after the merge?
Hello all, I hope you all have been doing well. I created an issue for the TestWithInterrupt() I proposed above. Please check it out here:
#3149
@grivera64 can you update testcases using new app.Test config properties?
@grivera64 can you update testcases using new app.Test config properties?
For sure @efectn, I am working on using the new app.Test() for the unit test Test_Ctx_SendStreamWriter_Interrupted.
Should I also update the previous Test_Ctx_SendStreamWriter unit test to be consistent with the interrupted version, even though it still works as-is?
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 82.79%. Comparing base (
ff55cfd) to head (76dee57). Report is 1 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #3131 +/- ##
=======================================
Coverage 82.79% 82.79%
=======================================
Files 114 114
Lines 11193 11197 +4
=======================================
+ Hits 9267 9271 +4
Misses 1526 1526
Partials 400 400
| Flag | Coverage Δ | |
|---|---|---|
| unittests | 82.79% <100.00%> (+<0.01%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
@grivera64 can you update testcases using new app.Test config properties?
For sure @efectn, I am working on using the new
app.Test()for the unit testTest_Ctx_SendStreamWriter_Interrupted.Should I also update the previous
Test_Ctx_SendStreamWriterunit test to be consistent with the interrupted version, even though it still works as-is?
I think it's good as-is since app.Test is not used for that test
@grivera64 thanks for this feature can you check our ai hints this could help to get the best out of it
@grivera64 thanks for this feature can you check our ai hints this could help to get the best out of it
No problem @ReneWerner87 !
In regards to the Code Rabbit hints, I've added a modified simple SSE example using the example the AI hints gave to the whats_new.md file.
The hints also mention about checking the return value of fmt.Fprintf(w, ...). It appears that fmt.Fprintf (under normal circumstances if I'm not mistaken) should not return any errors, including if the connection is disconnected. The method w.Flush() does return an error in this on a disconnection. Should we still include error checking for fmt.Fprintf calls in the examples?
Thanks again for the suggestions and comments!
@grivera64 I think you have to run make generate since the ctx.go changed
@grivera64 I think you have to run
make generatesince the ctx.go changed
Running make generate doesn't provide any new diffs to commit:
fiber (feature/add-buffered-streaming-support) $ make generate
go install github.com/tinylib/msgp@latest
go install github.com/vburenin/ifacemaker@975a95966976eeb2d4365a7fb236e274c54da64c
go generate ./...
fiber (feature/add-buffered-streaming-support) $ git status
On branch feature/add-buffered-streaming-support
Your branch is up to date with 'origin/feature/add-buffered-streaming-support'.
nothing to commit, working tree clean
Is this fine?
👍 Just one comment. LGTM
@gaby For sure thanks for the review!
With regards to the recipes/example comment, should I make an issue on gofiber/recipes to update the original SSE example to use c.SendStreamWriter()?
👍 Just one comment. LGTM
@gaby For sure thanks for the review!
With regards to the recipes/example comment, should I make an issue on gofiber/recipes to update the original SSE example to use
c.SendStreamWriter()?
Yes, you can
@grivera64 I think you have to run
make generatesince the ctx.go changedRunning
make generatedoesn't provide any new diffs to commit:fiber (feature/add-buffered-streaming-support) $ make generate go install github.com/tinylib/msgp@latest go install github.com/vburenin/ifacemaker@975a95966976eeb2d4365a7fb236e274c54da64c go generate ./... fiber (feature/add-buffered-streaming-support) $ git status On branch feature/add-buffered-streaming-support Your branch is up to date with 'origin/feature/add-buffered-streaming-support'. nothing to commit, working tree cleanIs this fine?
Yes, that's fine 💪