Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1475fe2
added buid in SSE support in GoFr
IronwallxR5 Feb 28, 2026
2bab9f9
fix: resolve golangci-lint violations in SSE implementation
IronwallxR5 Feb 28, 2026
8b26c84
fixed code quality
IronwallxR5 Feb 28, 2026
b0dfd86
switch SSE to sequential Responder callback
IronwallxR5 Mar 14, 2026
1762962
fix golangci-lint errors in sse tests
IronwallxR5 Mar 14, 2026
860acff
Merge branch 'development' into build-in-sse-suport
Umang01-hash Mar 17, 2026
d964993
address review feedback on sse implementation
IronwallxR5 Mar 18, 2026
9089301
fixed typo in responder.go
IronwallxR5 Mar 18, 2026
76b46bf
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Mar 20, 2026
597ad89
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Mar 23, 2026
7ffa346
address maintainer re-review feedback(typed callback, missing tests, …
IronwallxR5 Mar 24, 2026
845abb0
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Mar 24, 2026
3d83e79
resolve err113 lint failure
IronwallxR5 Mar 24, 2026
da0cc7a
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Mar 30, 2026
5e3e312
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Apr 3, 2026
dd3d780
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Apr 15, 2026
f415e0d
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Apr 20, 2026
ec86cce
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Apr 21, 2026
3961f89
simplify user API to return a struct instead of calling SSEResponse()
IronwallxR5 Apr 21, 2026
d203d62
Merge branch 'development' into build-in-sse-suport
IronwallxR5 Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/using-sse/configs/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
APP_NAME=using-sse
HTTP_PORT=9000
64 changes: 64 additions & 0 deletions examples/using-sse/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"fmt"
"time"

"gofr.dev/pkg/gofr"
)

func main() {
app := gofr.New()

// Stream the current time every second.
// c.Context.Done() fires on both client disconnect and server shutdown.
app.GET("/events", func(c *gofr.Context) (any, error) {
return gofr.SSEResponse(func(stream *gofr.SSEStream) error {
Comment thread
IronwallxR5 marked this conversation as resolved.
Outdated
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

i := 0

for {
select {
case <-c.Context.Done():
// Graceful cleanup: release resources, close DB cursors, etc.
return nil
case t := <-ticker.C:
if err := stream.Send(gofr.SSEEvent{
ID: fmt.Sprintf("%d", i),
Name: "time",
Data: map[string]string{"time": t.Format(time.RFC3339)},
}); err != nil {
return err
}

i++
}
}
}), nil
})

// A countdown that sends 11 events and closes.
app.GET("/countdown", func(c *gofr.Context) (any, error) {
return gofr.SSEResponse(func(stream *gofr.SSEStream) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for i := 10; i >= 0; i-- {
select {
case <-c.Context.Done():
return nil
case <-ticker.C:
if err := stream.SendEvent("countdown", map[string]int{"remaining": i}); err != nil {
return err
}
}
}

return stream.SendEvent("done", "Countdown complete!")
}), nil
})

app.Run()
}
8 changes: 6 additions & 2 deletions pkg/gofr/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (el *ErrorLogEntry) PrettyPrint(writer io.Writer) {
}

func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := newContext(gofrHTTP.NewResponder(w, r.Method), gofrHTTP.NewRequest(r), h.container)
c := newContext(gofrHTTP.NewResponder(w, r.Method, gofrHTTP.WithLogger(h.container.Logger)), gofrHTTP.NewRequest(r), h.container)

traceID := trace.SpanFromContext(r.Context()).SpanContext().TraceID().String()

Expand Down Expand Up @@ -108,7 +108,11 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.SetCustomHeaders(w)
}

// Handler function completed
// SSE streams are long-lived; bypass request timeout like WebSocket.
if _, ok := result.(response.SSE); ok {
c.Context = r.Context()
}

c.responder.Respond(result, err)
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/gofr/http/middleware/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ func (w *StatusResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return nil, nil, fmt.Errorf("%w: cannot hijack connection", errHijackNotSupported)
}

// Flush delegates to the underlying http.Flusher if supported.
func (w *StatusResponseWriter) Flush() {
if flusher, ok := w.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}

// Unwrap returns the underlying ResponseWriter for http.ResponseController.
func (w *StatusResponseWriter) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}

// RequestLog represents a log entry for HTTP requests.
type RequestLog struct {
TraceID string `json:"trace_id,omitempty"`
Expand Down
40 changes: 40 additions & 0 deletions pkg/gofr/http/middleware/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,43 @@ type mockAddr struct{}

func (*mockAddr) Network() string { return "tcp" }
func (*mockAddr) String() string { return "127.0.0.1:8080" }

func Test_StatusResponseWriter_Flush_Supported(t *testing.T) {
rr := httptest.NewRecorder()
srw := &StatusResponseWriter{ResponseWriter: rr}

// httptest.ResponseRecorder implements http.Flusher.
assert.NotPanics(t, func() {
srw.Flush()
})

assert.True(t, rr.Flushed, "expected recorder to be flushed")
}

func Test_StatusResponseWriter_Flush_NotSupported(t *testing.T) {
writer := &nonFlushableWriter{header: http.Header{}}
srw := &StatusResponseWriter{ResponseWriter: writer}

// Should not panic even if the underlying writer doesn't support Flusher.
assert.NotPanics(t, func() {
srw.Flush()
})
}

func Test_StatusResponseWriter_Unwrap(t *testing.T) {
rr := httptest.NewRecorder()
srw := &StatusResponseWriter{ResponseWriter: rr}

unwrapped := srw.Unwrap()

assert.Equal(t, rr, unwrapped, "expected Unwrap to return the underlying ResponseWriter")
}

// nonFlushableWriter is a ResponseWriter that does NOT implement http.Flusher.
type nonFlushableWriter struct {
header http.Header
}

func (n *nonFlushableWriter) Header() http.Header { return n.header }
func (*nonFlushableWriter) Write([]byte) (int, error) { return 0, nil }
func (*nonFlushableWriter) WriteHeader(int) {}
61 changes: 58 additions & 3 deletions pkg/gofr/http/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"

Expand All @@ -13,15 +14,35 @@ var (
errEmptyResponse = errors.New("internal server error")
)

// NewResponder creates a new Responder instance from the given http.ResponseWriter..
func NewResponder(w http.ResponseWriter, method string) *Responder {
return &Responder{w: w, method: method}
// sseLogger is a minimal logging interface used only for SSE error reporting.
type sseLogger interface {
Debugf(format string, args ...any)
}

// ResponderOption configures optional Responder behavior.
type ResponderOption func(*Responder)

// WithLogger attaches a logger to the Responder for debug-level SSE error logging.
func WithLogger(l sseLogger) ResponderOption {
return func(r *Responder) { r.logger = l }
}

// NewResponder creates a new Responder instance from the given http.ResponseWriter.
func NewResponder(w http.ResponseWriter, method string, opts ...ResponderOption) *Responder {
r := &Responder{w: w, method: method}

for _, o := range opts {
o(r)
}

return r
}

// Responder encapsulates an http.ResponseWriter and is responsible for crafting structured responses.
type Responder struct {
w http.ResponseWriter
method string
logger sseLogger
}

// Respond sends a response with the given data and handles potential errors, setting appropriate
Expand Down Expand Up @@ -75,6 +96,10 @@ func (r Responder) handleSpecialResponseTypes(data any, err error) bool {
statusCode := r.getStatusCodeForSpecialResponse(data, err)

switch v := data.(type) {
case resTypes.SSE:
r.handleSSEResponse(v)
return true

case resTypes.File:
r.w.Header().Set("Content-Type", v.ContentType)
r.w.WriteHeader(statusCode)
Expand Down Expand Up @@ -276,3 +301,33 @@ func isNil(i any) bool {

return v.Kind() == reflect.Ptr && v.IsNil()
}

// handleSSEResponse handles Server-Sent Events responses.
//
// TODO: SSE connections block for the full connection lifetime, causing the logging middleware
// and response histogram to record the entire duration. Consider labeling SSE in the histogram.
func (r Responder) handleSSEResponse(sse resTypes.SSE) {
callback, ok := sse.Callback.(func(http.ResponseWriter, *http.ResponseController) error)
if !ok || callback == nil {
if r.logger != nil {
r.logger.Debugf("SSE response has nil or invalid callback")
}

return
}

r.w.Header().Set("Content-Type", "text/event-stream")
r.w.Header().Set("Cache-Control", "no-cache")
r.w.Header().Set("Connection", "keep-alive")
r.w.Header().Set("X-Accel-Buffering", "no")
r.w.WriteHeader(http.StatusOK)

rc := http.NewResponseController(r.w)
_ = rc.Flush()

if err := callback(r.w, rc); err != nil {
if r.logger != nil {
r.logger.Debugf(fmt.Sprintf("SSE stream error: %v", err))
}
}
}
9 changes: 9 additions & 0 deletions pkg/gofr/http/response/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package response

// SSE represents a Server-Sent Events response.
// Return this from a handler to stream events to the client.
type SSE struct {
// Callback holds the user's SSE streaming function.
// Typed as any to avoid circular imports; the Responder type-asserts it at call-site.
Callback any
}
Loading
Loading