Overview
This document describes the real-time update system built with Server-Sent Events (SSE) and RTK Query's cache invalidation mechanism. This architecture enables real-time UI updates for credit balances, transactions, and other workspace data without polling or WebSockets complexity.
Architecture Layers
The system consists of four main layers:
- NestJS Backend: Event emission via SSE
- Next.js Proxy: Authenticated SSE streaming
- React Client: SSE connection management
- RTK Query: Tag-based cache invalidation
Flow Diagram
sequenceDiagram
participant Client as React Client
participant Cache as RTK Query Cache
participant BE as Next + Nest Backend
participant Stripe
BE->>Cache: Get and Cache<br/>Wallet data<br/>(30 credits)
Cache->>Client: Display the Wallet
Client->>BE: Consume credits: POST /credits/consume
BE->>BE: Deduct<br/>(20 credits)
Note over BE: Emit credit.consumed event
par Response returned
BE->>Cache: Update Wallet data
Cache->>Client: Display the Wallet<br/>(10 credits)
and Check threshold
Note over BE: Listen credit.consumed event
BE->>BE: Check balance (10 credits) < threshold (20 credits)
BE->>Stripe: Create payment intent
BE->>Cache: Update Transaction list
Cache->>Client: Display<br/>Pending Transaction
and Async Stripe Payment
Note over BE: Listen Stripe webhook event
Stripe-->>BE: Payment success webhook
BE->>BE: Add 100 credits to wallet
BE->>Cache: SSE<br/>tags_invalidated(['Credit'])
Cache->>BE: Auto-refetch<br/>GET /credits/wallet
BE->>Cache: Fresh wallet data
Cache->>Client: Display new Wallet<br/>(110 credits)
endLayer 1: NestJS Backend - Event Emission
EventsService
Location: apps/api/src/sse/events.service.ts
The EventsService manages Server-Sent Events streams for each workspace using RxJS Subjects.
@Injectable()
export class EventsService implements OnModuleDestroy {
private readonly eventSubjects = new Map<string, Subject<any>>();
private readonly heartbeatIntervals = new Map<string, NodeJS.Timeout>();
// Get or create an SSE subject for a specific workspace
getWorkspaceEventStream(workspaceId: string): Subject<any> {
if (!this.eventSubjects.has(workspaceId)) {
const subject = new Subject<any>();
this.eventSubjects.set(workspaceId, subject);
// Send immediate heartbeat to establish connection
this.sendHeartbeat(workspaceId);
// Start heartbeat to keep connection alive (every 30s)
this.startHeartbeat(workspaceId);
// Clean up when no more subscribers
subject.subscribe({
complete: () => {
this.stopHeartbeat(workspaceId);
this.eventSubjects.delete(workspaceId);
},
});
}
return this.eventSubjects.get(workspaceId)!;
}
// Emit a tags invalidation event to trigger RTK Query cache invalidation
emitTagsInvalidated(workspaceId: string, tags: string[]) {
const subject = this.eventSubjects.get(workspaceId);
if (subject && !subject.closed) {
subject.next({
type: 'tags_invalidated',
data: {
tags,
timestamp: new Date().toISOString(),
},
});
}
}
// Send heartbeat to keep connection alive
private sendHeartbeat(workspaceId: string) {
const subject = this.eventSubjects.get(workspaceId);
if (subject && !subject.closed) {
subject.next({
type: 'heartbeat',
data: { timestamp: new Date().toISOString() },
});
}
}
private startHeartbeat(workspaceId: string) {
const interval = setInterval(() => {
this.sendHeartbeat(workspaceId);
}, 30 * 1000); // 30 seconds
this.heartbeatIntervals.set(workspaceId, interval);
}
private stopHeartbeat(workspaceId: string) {
const interval = this.heartbeatIntervals.get(workspaceId);
if (interval) {
clearInterval(interval);
this.heartbeatIntervals.delete(workspaceId);
}
}
// Close all event streams (cleanup on shutdown)
closeAllStreams() {
this.heartbeatIntervals.forEach((interval) => clearInterval(interval));
this.heartbeatIntervals.clear();
this.eventSubjects.forEach((subject) => subject.complete());
this.eventSubjects.clear();
}
async onModuleDestroy() {
this.closeAllStreams();
}
}Key Features:
- Workspace isolation: Each workspace has its own RxJS Subject
- Heartbeat mechanism: Prevents proxy timeouts (30s interval)
- Automatic cleanup: Resources are freed when connections close
- Type safety: Events have a
typefield for discrimination
EventsController
Location: apps/api/src/sse/events.controller.ts
@Controller()
@UseGuards(ServerAuthGuard)
export class EventsController {
constructor(private readonly eventsService: EventsService) {}
@Sse('events')
events(@CurrentWorkspace() workspace: Workspace): Observable<MessageEvent> {
this.eventsService.logConnection(workspace.id);
return this.eventsService.getWorkspaceEventStream(workspace.id).pipe(
// Send immediate connection event to flush headers
startWith({
type: 'connection',
data: { timestamp: new Date().toISOString() },
}),
map(
(data) =>
({
data: JSON.stringify(data),
type: data.type,
}) as MessageEvent
)
);
}
}Key Points:
- Uses
ServerAuthGuardfor authentication @CurrentWorkspace()decorator extracts workspace from headersstartWith()sends initial event to flush headers immediately (important for proxies)
Event Emission from Business Logic
Location: apps/api/src/credit/listeners/credit-topup.listener.ts
@Injectable()
export class CreditTopupListener {
constructor(
private readonly creditService: CreditService,
private readonly stripeService: StripeService,
private readonly eventsService: EventsService
) {}
@OnEvent('credit.consumed')
async handleCreditConsumed(event: { transactionId: string; walletId: string }) {
const wallet = await this.creditService.getCreditWallet(event.walletId);
// Check if automatic topup should be triggered
if (wallet.autoReloadEnabled && wallet.balance <= wallet.threshold) {
await this.stripeService.executeTopupPayment(wallet);
// Get updated wallet and emit SSE event
const updatedWallet = await this.creditService.getCreditWallet(event.walletId);
if (updatedWallet) {
// This triggers RTK Query cache invalidation on the client
this.eventsService.emitTagsInvalidated(updatedWallet.workspaceId, ['Credit']);
}
}
}
}Pattern:
// Emit SSE event to invalidate cache tags
this.eventsService.emitTagsInvalidated(workspaceId, ['Credit']);This tells all connected clients to invalidate RTK Query cache entries tagged with 'Credit'.
Layer 2: Next.js Proxy - Authenticated SSE Streaming
Location: apps/web/src/app/api/events/route.ts
Why a Proxy?
- Authentication: Next.js manages user sessions via Auth.js (NextAuth)
- Security: NestJS API only accepts requests from Next.js proxy
- Proxy buffering: Can send special headers to disable buffering
Implementation
export async function GET(request: NextRequest) {
// 1. Authentication via NextAuth session
const session = await auth();
if (!session?.user?.jwt) {
return new Response('Unauthorized', { status: 401 });
}
// 2. Validate workspace ID
const workspaceId = new URL(request.url).searchParams.get('workspaceId');
if (!workspaceId) {
return new Response('Workspace ID required', { status: 400 });
}
// 3. Generate backend token
const { sub, email } = session.user.jwt;
const backendToken = await generateApiToken({ sub, email: email as string });
// 4. Connect to backend SSE
const backendUrl = `${process.env.NEST_API_URL}/events`;
const backendResponse = await fetch(backendUrl, {
headers: {
Authorization: `Bearer ${backendToken}`,
'app-workspace-id': workspaceId,
Accept: 'text/event-stream',
},
});
if (!backendResponse.ok) {
return new Response(`Backend error: ${backendResponse.statusText}`, {
status: backendResponse.status,
});
}
// 5. Stream proxy
const stream = new ReadableStream({
async start(controller) {
const reader = backendResponse.body!.getReader();
const encoder = new TextEncoder();
// Send padding to flush proxy buffers
controller.enqueue(encoder.encode(':connected\n\n'));
controller.enqueue(encoder.encode(':' + ' '.repeat(2048) + '\n\n'));
// Stream events from backend to client
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
}
},
});
// 6. SSE response headers
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no', // Disable nginx buffering
},
});
}Critical Details:
- Padding comments:
:connected\n\nand 2KB of spaces force proxies to flush buffers X-Accel-Buffering: no: Disables buffering in nginxCache-Control: no-cache: Prevents caching layers from interfering
Layer 3: React Client - SSE Connection Management
Location: apps/web/src/hooks/use-sse.ts
useSSE Hook
export function useSSE() {
const dispatch = useAppDispatch();
const currentWorkspace = useAppSelector(getCurrentWorkspace);
const eventSourceRef = useRef<EventSource | null>(null);
const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const reconnectAttempts = useRef(0);
const maxReconnectAttempts = 5;
const baseReconnectDelay = 1000; // 1 second
const connect = useCallback(() => {
if (eventSourceRef.current?.readyState === EventSource.OPEN) {
return;
}
if (!currentWorkspace) {
return;
}
const url = `/api/events?workspaceId=${encodeURIComponent(currentWorkspace)}`;
const eventSource = new EventSource(url, { withCredentials: true });
eventSourceRef.current = eventSource;
eventSource.onopen = () => {
reconnectAttempts.current = 0;
};
// Listen for tags_invalidated events
eventSource.addEventListener('tags_invalidated', (event) => {
const data = JSON.parse(event.data);
if (data.data?.tags && Array.isArray(data.data.tags)) {
// THIS IS THE MAGIC: Invalidate RTK Query cache tags
dispatch(api.util.invalidateTags(data.data.tags));
}
});
// Keep connection alive
eventSource.addEventListener('heartbeat', () => {
// Just acknowledge to keep connection alive
});
// Reconnect with exponential backoff
eventSource.onerror = () => {
eventSource.close();
if (reconnectAttempts.current >= maxReconnectAttempts) {
return;
}
const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts.current);
reconnectAttempts.current++;
reconnectTimeoutRef.current = setTimeout(() => {
connect();
}, delay);
};
}, [currentWorkspace, dispatch]);
const disconnect = () => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
};
// Connect when component mounts or workspace changes
useEffect(() => {
connect();
return () => disconnect();
}, [connect]);
// Reconnect when tab becomes visible again
useEffect(() => {
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible') {
if (!eventSourceRef.current || eventSourceRef.current.readyState === EventSource.CLOSED) {
connect();
}
}
};
document.addEventListener('visibilitychange', handleVisibilityChange);
return () => document.removeEventListener('visibilitychange', handleVisibilityChange);
}, [connect]);
return {
isConnected: eventSourceRef.current?.readyState === EventSource.OPEN,
reconnect: connect,
disconnect,
};
}Key Features:
- Automatic reconnection: Exponential backoff (1s, 2s, 4s, 8s, 16s)
- Tab visibility handling: Reconnects when user switches back to tab
- Workspace isolation: Reconnects when switching workspaces
- Type-safe event handling: Validates data before cache invalidation
Usage in Components
export function CreditBlock() {
// Establish SSE connection
const { isConnected } = useSSE();
// RTK Query hooks (will auto-refetch on SSE invalidation)
const { data: wallet } = useGetCreditWalletQuery();
const { data: transactions } = useGetTransactionsQuery();
return (
<div>
{/* SSE connection indicator */}
<div className={isConnected ? 'bg-green-500' : 'bg-gray-400'} />
{/* Wallet balance - auto-updates via SSE */}
<div>Balance: {wallet?.balance ?? 0} credits</div>
{/* Transaction history - auto-updates via SSE */}
{transactions?.map(tx => (
<div key={tx.id}>{tx.type}: {tx.delta}</div>
))}
</div>
);
}Layer 4: RTK Query - Cache Invalidation
Location: apps/web/src/lib/redux/services/credit.api.ts
API Definition with Tags
export const creditApi = api.injectEndpoints({
endpoints: (builder) => ({
getCreditWallet: builder.query<CreditWallet, void>({
query: () => ({ url: '/credits/wallet', method: 'GET' }),
providesTags: ['Credit'], // ← This query provides the 'Credit' tag
}),
getTransactions: builder.query<CreditTransaction[], void>({
query: () => ({ url: '/credits/transactions', method: 'GET' }),
providesTags: ['Credit'], // ← This query also provides the 'Credit' tag
}),
consumeCredits: builder.mutation<CreditTransaction, ConsumeCreditsRequest>({
query: (body) => ({ url: '/credits/consume', method: 'POST', body }),
invalidatesTags: ['Credit'], // ← This mutation invalidates 'Credit' tag
}),
}),
});How Cache Invalidation Works
When SSE event calls dispatch(api.util.invalidateTags(['Credit'])):
- RTK Query marks all cache entries with the
'Credit'tag as invalid - Automatically refetches any currently subscribed queries (active components)
- Updates all components using those queries
Result: Zero manual refetching code in components!
Tag Types Configuration
Location: apps/web/src/lib/redux/services/api.ts
export const apiTagTypes = ['Workspace', 'User', 'Invitation', 'Billing', 'Credit'];
const api = createApi({
reducerPath: 'api',
baseQuery: baseQuery(),
tagTypes: apiTagTypes,
endpoints: () => ({}),
});Complete User Flow Example
Initial State
- User has 30 credits
- Threshold is set to 20 credits
User Action: Consume 20 Credits
Branch 1: Immediate Response (~200ms)
- NestJS deducts credits (30 → 10)
- Returns 200 OK
- RTK Query updates cache optimistically
- User sees: Balance: 10 credits ✓
Branch 2: Threshold Check (~500ms)
- Event listener catches
credit.consumedevent - Checks balance (10) < threshold (20) ✓
- Creates Stripe Payment Intent
- Creates PENDING transaction
- RTK Query cache updated
- User sees: Pending top-up transaction ⏳
Branch 3: Webhook Processing (~2-5s)
- Stripe processes payment asynchronously
- Webhook calls NestJS:
payment_intent.succeeded - NestJS adds 100 credits (10 → 110)
- Updates transaction: PENDING → COMPLETED
- Emits SSE:
tags_invalidated(['Credit']) - RTK Query auto-refetches wallet and transactions
- User sees: Balance: 110 credits ✓
Timeline
t=0ms User clicks button
t=200ms ✓ UI shows 10 credits (immediate feedback)
t=500ms ⏳ UI shows pending transaction
t=2-5s ✅ UI shows 110 credits (webhook completed)Production Considerations
1. Heartbeat Strategy
Proxies and load balancers close idle connections after ~60 seconds. The 30-second heartbeat ensures connections stay alive:
private startHeartbeat(workspaceId: string) {
const interval = setInterval(() => {
this.sendHeartbeat(workspaceId);
}, 30 * 1000); // 30 seconds
this.heartbeatIntervals.set(workspaceId, interval);
}2. Exponential Backoff
Prevents overwhelming the server during reconnection:
const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts.current);
// 1s, 2s, 4s, 8s, 16s3. Tab Visibility Handling
Mobile browsers close connections when tabs are backgrounded:
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible') {
if (eventSourceRef.current?.readyState === EventSource.CLOSED) {
connect();
}
}
};4. Memory Management
Clean up resources when workspaces disconnect:
subject.subscribe({
complete: () => {
this.stopHeartbeat(workspaceId);
this.eventSubjects.delete(workspaceId);
this.connectionCounts.delete(workspaceId);
},
});5. Horizontal Scaling
For multiple NestJS instances, use Redis Pub/Sub:
// Publisher (in NestJS instance that emits event)
await this.redis.publish(`workspace:${workspaceId}:events`, JSON.stringify({ type: 'tags_invalidated', data: { tags } }));
// Subscriber (in each NestJS instance)
await this.redis.subscribe(`workspace:${workspaceId}:events`);
this.redis.on('message', (channel, message) => {
const event = JSON.parse(message);
this.eventsService.emitToLocalClients(workspaceId, event);
});Event Types
Connection Event
{
"type": "connection",
"data": {
"timestamp": "2025-10-15T10:00:00.000Z"
}
}Heartbeat Event
{
"type": "heartbeat",
"data": {
"timestamp": "2025-10-15T10:00:30.000Z"
}
}Tags Invalidated Event
{
"type": "tags_invalidated",
"data": {
"tags": ["Credit"],
"timestamp": "2025-10-15T10:01:00.000Z"
}
}Benefits
- Simple client code: Components just use RTK Query hooks
- Automatic updates: No manual refetching logic
- Type-safe: TypeScript end-to-end
- Scalable: Works with horizontal scaling via Redis
- Resilient: Automatic reconnection with backoff
- Efficient: Only refetch what's needed (tag-based invalidation)
- Secure: Authentication handled by Next.js proxy
- Observable: Easy to debug with event logging
Comparison with Alternatives
vs WebSockets
- SSE is simpler for uni-directional server→client updates
- Built on HTTP (easier to deploy, better proxy support)
- Automatic reconnection built-in
- No need for custom heartbeat protocol (browser handles it)
vs Polling
- SSE eliminates wasted bandwidth
- No delayed updates
- Much more scalable (persistent connections vs constant requests)
vs GraphQL Subscriptions
- Works with existing REST endpoints
- No GraphQL schema changes needed
- Simpler mental model
- No WebSocket complexity
Debugging
Client-side Debugging
// Check connection status
const { isConnected } = useSSE();
console.log('SSE Connected:', isConnected);
// Monitor events in browser DevTools
// Network tab → Filter by "events" → View EventStreamServer-side Debugging
// NestJS logging
this.logger.log(`✅ [SSE] New connection - workspaceId: ${workspaceId}`);
this.logger.log(`📨 [SSE] Event emitted - type: ${eventType}`);
this.logger.log(`🔌 [SSE] Stream closed - workspaceId: ${workspaceId}`);Testing
Manual Testing
- Open browser DevTools → Network tab
- Filter by "events"
- Click "Consume Credits"
- Watch for:
- Immediate cache update
- Pending transaction appears
- SSE event received
- Final balance update
Troubleshooting
Issue: SSE Connection Drops Frequently
Cause: Proxy buffering or timeout Solution: Ensure proxy headers are set correctly:
X-Accel-Buffering: no- Heartbeat interval < proxy timeout
Issue: Events Not Received
Cause: Tag mismatch or workspace isolation Solution: Verify:
providesTagsmatchesinvalidateTags- Workspace ID is correct
- EventSource URL includes workspace parameter
Issue: Memory Leaks
Cause: Streams not cleaned up Solution: Ensure:
onModuleDestroyis called on shutdown- Subjects are completed when connections close
- Heartbeat intervals are cleared
Security Considerations
- Authentication: All SSE connections require valid session
- Workspace Isolation: Users can only access their workspace events
- Rate Limiting: Consider implementing rate limits on SSE connections
- Token Expiration: Handle JWT expiration gracefully
Performance Metrics
- Connection Time: ~200ms (initial handshake)
- Event Latency: ~50ms (server emit → client receive)
- Memory per Connection: ~5KB (RxJS Subject + metadata)
- CPU per Connection: Negligible (idle connections)