From 21303e879d2c7a857306206d2cbdae83f61983e6 Mon Sep 17 00:00:00 2001 From: boojack Date: Sat, 9 May 2026 09:15:50 +0800 Subject: [PATCH] fix(sse): stream initial response and refresh tokens --- server/router/api/v1/sse_handler.go | 9 ++++++ server/router/api/v1/test/sse_handler_test.go | 22 +++++++++++++ web/src/connect.ts | 2 +- web/src/hooks/useLiveMemoRefresh.ts | 32 +++++++++++++------ 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/server/router/api/v1/sse_handler.go b/server/router/api/v1/sse_handler.go index a9f8f444f..dedfbec6b 100644 --- a/server/router/api/v1/sse_handler.go +++ b/server/router/api/v1/sse_handler.go @@ -68,6 +68,15 @@ func handleSSE(c *echo.Context, hub *SSEHub, authenticator *auth.Authenticator) slog.Debug("SSE client connected", "userID", userID) + // Send an initial comment so clients and dev proxies observe the stream + // immediately instead of waiting for the first heartbeat or data event. + if _, err := fmt.Fprint(w, ": connected\n\n"); err != nil { + return nil + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + for { select { case <-ctx.Done(): diff --git a/server/router/api/v1/test/sse_handler_test.go b/server/router/api/v1/test/sse_handler_test.go index 6c79d34a4..711abfc11 100644 --- a/server/router/api/v1/test/sse_handler_test.go +++ b/server/router/api/v1/test/sse_handler_test.go @@ -1,6 +1,7 @@ package test import ( + "bufio" "context" "io" "net/http" @@ -78,6 +79,27 @@ func TestSSEHandler_Authentication(t *testing.T) { require.Equal(t, http.StatusUnauthorized, rec.Code) }) + t.Run("valid token streams initial comment", func(t *testing.T) { + server := httptest.NewServer(e) + defer server.Close() + + reqCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, server.URL+"/api/v1/sse", nil) + require.NoError(t, err) + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := server.Client().Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type")) + + line, err := bufio.NewReader(resp.Body).ReadString('\n') + require.NoError(t, err) + require.Equal(t, ": connected\n", line) + }) + t.Run("hub close disconnects stream", func(t *testing.T) { server := httptest.NewServer(e) defer server.Close() diff --git a/web/src/connect.ts b/web/src/connect.ts index 66c446f3a..c40ea4195 100644 --- a/web/src/connect.ts +++ b/web/src/connect.ts @@ -121,7 +121,7 @@ async function refreshAndGetAccessToken(): Promise { return token; } -async function getRequestToken(): Promise { +export async function getRequestToken(): Promise { let token = getAccessToken(); if (!token) { if (!hasStoredToken()) return null; diff --git a/web/src/hooks/useLiveMemoRefresh.ts b/web/src/hooks/useLiveMemoRefresh.ts index 40700dde1..c9df6a606 100644 --- a/web/src/hooks/useLiveMemoRefresh.ts +++ b/web/src/hooks/useLiveMemoRefresh.ts @@ -1,6 +1,6 @@ import { useQueryClient } from "@tanstack/react-query"; import { useCallback, useEffect, useRef, useSyncExternalStore } from "react"; -import { getAccessToken } from "@/auth-state"; +import { getRequestToken, refreshAccessToken } from "@/connect"; import { useAuth } from "@/contexts/AuthContext"; import { memoKeys } from "@/hooks/useMemoQueries"; import { userKeys } from "@/hooks/useUserQueries"; @@ -89,7 +89,7 @@ export function useLiveMemoRefresh() { return; } - const token = getAccessToken(); + let token = await getRequestToken(); if (!token) { setSSEStatus("disconnected"); // Not logged in; do not retry. Effect will re-run when currentUser is set. @@ -101,13 +101,16 @@ export function useLiveMemoRefresh() { abortControllerRef.current = abortController; try { - const response = await fetch("/api/v1/sse", { - headers: { - Authorization: `Bearer ${token}`, - }, - signal: abortController.signal, - credentials: "include", - }); + let response = await fetchSSEStream(token, abortController.signal); + + if (response.status === 401) { + await refreshAccessToken(); + token = await getRequestToken(); + if (!token) { + throw new Error("SSE connection failed: missing token after refresh"); + } + response = await fetchSSEStream(token, abortController.signal); + } if (!response.ok || !response.body) { throw new Error(`SSE connection failed: ${response.status}`); @@ -196,6 +199,17 @@ export function useLiveMemoRefresh() { // Event handling // --------------------------------------------------------------------------- +function fetchSSEStream(token: string, signal: AbortSignal): Promise { + return fetch("/api/v1/sse", { + headers: { + Accept: "text/event-stream", + Authorization: `Bearer ${token}`, + }, + signal, + credentials: "include", + }); +} + interface SSEChangeEvent { type: (typeof SSE_EVENT_TYPES)[keyof typeof SSE_EVENT_TYPES]; name: string;