Real-Time Updates Architecture: SSE RTK Query Cache Invalidation

Complete guide to configuring and using SSE RTK Query Cache Invalidation in Dopamine Starter Kit

Overview

This document describes the real-time update system built with Server-Sent Events (SSE) and RTK Query's cache invalidation mechanism. This architecture enables real-time UI updates for credit balances, transactions, and other workspace data without polling or WebSockets complexity.

Architecture Layers

The system consists of four main layers:

  1. NestJS Backend: Event emission via SSE
  2. Next.js Proxy: Authenticated SSE streaming
  3. React Client: SSE connection management
  4. RTK Query: Tag-based cache invalidation

Flow Diagram

sequenceDiagram
    participant Client as React Client
    participant Cache as RTK Query Cache
    participant BE as Next + Nest Backend
    participant Stripe
 
    BE->>Cache: Get and Cache<br/>Wallet data<br/>(30 credits)
    Cache->>Client: Display the Wallet
 
 
    Client->>BE: Consume credits: POST /credits/consume
    BE->>BE: Deduct<br/>(20 credits)
    Note over BE: Emit credit.consumed event
 
    par Response returned
        BE->>Cache: Update Wallet data
        Cache->>Client: Display the Wallet<br/>(10 credits)
    and Check threshold
        Note over BE: Listen credit.consumed event
        BE->>BE: Check balance (10 credits) < threshold (20 credits)
        BE->>Stripe: Create payment intent
        BE->>Cache: Update Transaction list
        Cache->>Client: Display<br/>Pending Transaction
    and Async Stripe Payment
        Note over BE: Listen Stripe webhook event
        Stripe-->>BE: Payment success webhook
        BE->>BE: Add 100 credits to wallet
        BE->>Cache: SSE<br/>tags_invalidated(['Credit'])
        Cache->>BE: Auto-refetch<br/>GET /credits/wallet
        BE->>Cache: Fresh wallet data
        Cache->>Client: Display new Wallet<br/>(110 credits)
    end

Layer 1: NestJS Backend - Event Emission

EventsService

Location: apps/api/src/sse/events.service.ts

The EventsService manages Server-Sent Events streams for each workspace using RxJS Subjects.

@Injectable()
export class EventsService implements OnModuleDestroy {
  private readonly eventSubjects = new Map<string, Subject<any>>();
  private readonly heartbeatIntervals = new Map<string, NodeJS.Timeout>();
 
  // Get or create an SSE subject for a specific workspace
  getWorkspaceEventStream(workspaceId: string): Subject<any> {
    if (!this.eventSubjects.has(workspaceId)) {
      const subject = new Subject<any>();
      this.eventSubjects.set(workspaceId, subject);
 
      // Send immediate heartbeat to establish connection
      this.sendHeartbeat(workspaceId);
 
      // Start heartbeat to keep connection alive (every 30s)
      this.startHeartbeat(workspaceId);
 
      // Clean up when no more subscribers
      subject.subscribe({
        complete: () => {
          this.stopHeartbeat(workspaceId);
          this.eventSubjects.delete(workspaceId);
        },
      });
    }
 
    return this.eventSubjects.get(workspaceId)!;
  }
 
  // Emit a tags invalidation event to trigger RTK Query cache invalidation
  emitTagsInvalidated(workspaceId: string, tags: string[]) {
    const subject = this.eventSubjects.get(workspaceId);
    if (subject && !subject.closed) {
      subject.next({
        type: 'tags_invalidated',
        data: {
          tags,
          timestamp: new Date().toISOString(),
        },
      });
    }
  }
 
  // Send heartbeat to keep connection alive
  private sendHeartbeat(workspaceId: string) {
    const subject = this.eventSubjects.get(workspaceId);
    if (subject && !subject.closed) {
      subject.next({
        type: 'heartbeat',
        data: { timestamp: new Date().toISOString() },
      });
    }
  }
 
  private startHeartbeat(workspaceId: string) {
    const interval = setInterval(() => {
      this.sendHeartbeat(workspaceId);
    }, 30 * 1000); // 30 seconds
 
    this.heartbeatIntervals.set(workspaceId, interval);
  }
 
  private stopHeartbeat(workspaceId: string) {
    const interval = this.heartbeatIntervals.get(workspaceId);
    if (interval) {
      clearInterval(interval);
      this.heartbeatIntervals.delete(workspaceId);
    }
  }
 
  // Close all event streams (cleanup on shutdown)
  closeAllStreams() {
    this.heartbeatIntervals.forEach((interval) => clearInterval(interval));
    this.heartbeatIntervals.clear();
 
    this.eventSubjects.forEach((subject) => subject.complete());
    this.eventSubjects.clear();
  }
 
  async onModuleDestroy() {
    this.closeAllStreams();
  }
}

Key Features:

  • Workspace isolation: Each workspace has its own RxJS Subject
  • Heartbeat mechanism: Prevents proxy timeouts (30s interval)
  • Automatic cleanup: Resources are freed when connections close
  • Type safety: Events have a type field for discrimination

EventsController

Location: apps/api/src/sse/events.controller.ts

@Controller()
@UseGuards(ServerAuthGuard)
export class EventsController {
  constructor(private readonly eventsService: EventsService) {}
 
  @Sse('events')
  events(@CurrentWorkspace() workspace: Workspace): Observable<MessageEvent> {
    this.eventsService.logConnection(workspace.id);
 
    return this.eventsService.getWorkspaceEventStream(workspace.id).pipe(
      // Send immediate connection event to flush headers
      startWith({
        type: 'connection',
        data: { timestamp: new Date().toISOString() },
      }),
      map(
        (data) =>
          ({
            data: JSON.stringify(data),
            type: data.type,
          }) as MessageEvent
      )
    );
  }
}

Key Points:

  • Uses ServerAuthGuard for authentication
  • @CurrentWorkspace() decorator extracts workspace from headers
  • startWith() sends initial event to flush headers immediately (important for proxies)

Event Emission from Business Logic

Location: apps/api/src/credit/listeners/credit-topup.listener.ts

@Injectable()
export class CreditTopupListener {
  constructor(
    private readonly creditService: CreditService,
    private readonly stripeService: StripeService,
    private readonly eventsService: EventsService
  ) {}
 
  @OnEvent('credit.consumed')
  async handleCreditConsumed(event: { transactionId: string; walletId: string }) {
    const wallet = await this.creditService.getCreditWallet(event.walletId);
 
    // Check if automatic topup should be triggered
    if (wallet.autoReloadEnabled && wallet.balance <= wallet.threshold) {
      await this.stripeService.executeTopupPayment(wallet);
 
      // Get updated wallet and emit SSE event
      const updatedWallet = await this.creditService.getCreditWallet(event.walletId);
      if (updatedWallet) {
        // This triggers RTK Query cache invalidation on the client
        this.eventsService.emitTagsInvalidated(updatedWallet.workspaceId, ['Credit']);
      }
    }
  }
}

Pattern:

// Emit SSE event to invalidate cache tags
this.eventsService.emitTagsInvalidated(workspaceId, ['Credit']);

This tells all connected clients to invalidate RTK Query cache entries tagged with 'Credit'.

Layer 2: Next.js Proxy - Authenticated SSE Streaming

Location: apps/web/src/app/api/events/route.ts

Why a Proxy?

  1. Authentication: Next.js manages user sessions via Auth.js (NextAuth)
  2. Security: NestJS API only accepts requests from Next.js proxy
  3. Proxy buffering: Can send special headers to disable buffering

Implementation

export async function GET(request: NextRequest) {
  // 1. Authentication via NextAuth session
  const session = await auth();
  if (!session?.user?.jwt) {
    return new Response('Unauthorized', { status: 401 });
  }
 
  // 2. Validate workspace ID
  const workspaceId = new URL(request.url).searchParams.get('workspaceId');
  if (!workspaceId) {
    return new Response('Workspace ID required', { status: 400 });
  }
 
  // 3. Generate backend token
  const { sub, email } = session.user.jwt;
  const backendToken = await generateApiToken({ sub, email: email as string });
 
  // 4. Connect to backend SSE
  const backendUrl = `${process.env.NEST_API_URL}/events`;
  const backendResponse = await fetch(backendUrl, {
    headers: {
      Authorization: `Bearer ${backendToken}`,
      'app-workspace-id': workspaceId,
      Accept: 'text/event-stream',
    },
  });
 
  if (!backendResponse.ok) {
    return new Response(`Backend error: ${backendResponse.statusText}`, {
      status: backendResponse.status,
    });
  }
 
  // 5. Stream proxy
  const stream = new ReadableStream({
    async start(controller) {
      const reader = backendResponse.body!.getReader();
      const encoder = new TextEncoder();
 
      // Send padding to flush proxy buffers
      controller.enqueue(encoder.encode(':connected\n\n'));
      controller.enqueue(encoder.encode(':' + ' '.repeat(2048) + '\n\n'));
 
      // Stream events from backend to client
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        controller.enqueue(value);
      }
    },
  });
 
  // 6. SSE response headers
  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream; charset=utf-8',
      'Cache-Control': 'no-cache, no-transform',
      Connection: 'keep-alive',
      'X-Accel-Buffering': 'no', // Disable nginx buffering
    },
  });
}

Critical Details:

  • Padding comments: :connected\n\n and 2KB of spaces force proxies to flush buffers
  • X-Accel-Buffering: no: Disables buffering in nginx
  • Cache-Control: no-cache: Prevents caching layers from interfering

Layer 3: React Client - SSE Connection Management

Location: apps/web/src/hooks/use-sse.ts

useSSE Hook

export function useSSE() {
  const dispatch = useAppDispatch();
  const currentWorkspace = useAppSelector(getCurrentWorkspace);
  const eventSourceRef = useRef<EventSource | null>(null);
  const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
  const reconnectAttempts = useRef(0);
 
  const maxReconnectAttempts = 5;
  const baseReconnectDelay = 1000; // 1 second
 
  const connect = useCallback(() => {
    if (eventSourceRef.current?.readyState === EventSource.OPEN) {
      return;
    }
 
    if (!currentWorkspace) {
      return;
    }
 
    const url = `/api/events?workspaceId=${encodeURIComponent(currentWorkspace)}`;
    const eventSource = new EventSource(url, { withCredentials: true });
    eventSourceRef.current = eventSource;
 
    eventSource.onopen = () => {
      reconnectAttempts.current = 0;
    };
 
    // Listen for tags_invalidated events
    eventSource.addEventListener('tags_invalidated', (event) => {
      const data = JSON.parse(event.data);
 
      if (data.data?.tags && Array.isArray(data.data.tags)) {
        // THIS IS THE MAGIC: Invalidate RTK Query cache tags
        dispatch(api.util.invalidateTags(data.data.tags));
      }
    });
 
    // Keep connection alive
    eventSource.addEventListener('heartbeat', () => {
      // Just acknowledge to keep connection alive
    });
 
    // Reconnect with exponential backoff
    eventSource.onerror = () => {
      eventSource.close();
 
      if (reconnectAttempts.current >= maxReconnectAttempts) {
        return;
      }
 
      const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts.current);
      reconnectAttempts.current++;
 
      reconnectTimeoutRef.current = setTimeout(() => {
        connect();
      }, delay);
    };
  }, [currentWorkspace, dispatch]);
 
  const disconnect = () => {
    if (reconnectTimeoutRef.current) {
      clearTimeout(reconnectTimeoutRef.current);
      reconnectTimeoutRef.current = null;
    }
 
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      eventSourceRef.current = null;
    }
  };
 
  // Connect when component mounts or workspace changes
  useEffect(() => {
    connect();
    return () => disconnect();
  }, [connect]);
 
  // Reconnect when tab becomes visible again
  useEffect(() => {
    const handleVisibilityChange = () => {
      if (document.visibilityState === 'visible') {
        if (!eventSourceRef.current || eventSourceRef.current.readyState === EventSource.CLOSED) {
          connect();
        }
      }
    };
 
    document.addEventListener('visibilitychange', handleVisibilityChange);
    return () => document.removeEventListener('visibilitychange', handleVisibilityChange);
  }, [connect]);
 
  return {
    isConnected: eventSourceRef.current?.readyState === EventSource.OPEN,
    reconnect: connect,
    disconnect,
  };
}

Key Features:

  • Automatic reconnection: Exponential backoff (1s, 2s, 4s, 8s, 16s)
  • Tab visibility handling: Reconnects when user switches back to tab
  • Workspace isolation: Reconnects when switching workspaces
  • Type-safe event handling: Validates data before cache invalidation

Usage in Components

export function CreditBlock() {
  // Establish SSE connection
  const { isConnected } = useSSE();
 
  // RTK Query hooks (will auto-refetch on SSE invalidation)
  const { data: wallet } = useGetCreditWalletQuery();
  const { data: transactions } = useGetTransactionsQuery();
 
  return (
    <div>
      {/* SSE connection indicator */}
      <div className={isConnected ? 'bg-green-500' : 'bg-gray-400'} />
 
      {/* Wallet balance - auto-updates via SSE */}
      <div>Balance: {wallet?.balance ?? 0} credits</div>
 
      {/* Transaction history - auto-updates via SSE */}
      {transactions?.map(tx => (
        <div key={tx.id}>{tx.type}: {tx.delta}</div>
      ))}
    </div>
  );
}

Layer 4: RTK Query - Cache Invalidation

Location: apps/web/src/lib/redux/services/credit.api.ts

API Definition with Tags

export const creditApi = api.injectEndpoints({
  endpoints: (builder) => ({
    getCreditWallet: builder.query<CreditWallet, void>({
      query: () => ({ url: '/credits/wallet', method: 'GET' }),
      providesTags: ['Credit'], // ← This query provides the 'Credit' tag
    }),
    getTransactions: builder.query<CreditTransaction[], void>({
      query: () => ({ url: '/credits/transactions', method: 'GET' }),
      providesTags: ['Credit'], // ← This query also provides the 'Credit' tag
    }),
    consumeCredits: builder.mutation<CreditTransaction, ConsumeCreditsRequest>({
      query: (body) => ({ url: '/credits/consume', method: 'POST', body }),
      invalidatesTags: ['Credit'], // ← This mutation invalidates 'Credit' tag
    }),
  }),
});

How Cache Invalidation Works

When SSE event calls dispatch(api.util.invalidateTags(['Credit'])):

  1. RTK Query marks all cache entries with the 'Credit' tag as invalid
  2. Automatically refetches any currently subscribed queries (active components)
  3. Updates all components using those queries

Result: Zero manual refetching code in components!

Tag Types Configuration

Location: apps/web/src/lib/redux/services/api.ts

export const apiTagTypes = ['Workspace', 'User', 'Invitation', 'Billing', 'Credit'];
 
const api = createApi({
  reducerPath: 'api',
  baseQuery: baseQuery(),
  tagTypes: apiTagTypes,
  endpoints: () => ({}),
});

Complete User Flow Example

Initial State

  • User has 30 credits
  • Threshold is set to 20 credits

User Action: Consume 20 Credits

Branch 1: Immediate Response (~200ms)

  • NestJS deducts credits (30 → 10)
  • Returns 200 OK
  • RTK Query updates cache optimistically
  • User sees: Balance: 10 credits ✓

Branch 2: Threshold Check (~500ms)

  • Event listener catches credit.consumed event
  • Checks balance (10) < threshold (20) ✓
  • Creates Stripe Payment Intent
  • Creates PENDING transaction
  • RTK Query cache updated
  • User sees: Pending top-up transaction ⏳

Branch 3: Webhook Processing (~2-5s)

  • Stripe processes payment asynchronously
  • Webhook calls NestJS: payment_intent.succeeded
  • NestJS adds 100 credits (10 → 110)
  • Updates transaction: PENDING → COMPLETED
  • Emits SSE: tags_invalidated(['Credit'])
  • RTK Query auto-refetches wallet and transactions
  • User sees: Balance: 110 credits ✓

Timeline

t=0ms     User clicks button
t=200ms UI shows 10 credits (immediate feedback)
t=500ms UI shows pending transaction
t=2-5s UI shows 110 credits (webhook completed)

Production Considerations

1. Heartbeat Strategy

Proxies and load balancers close idle connections after ~60 seconds. The 30-second heartbeat ensures connections stay alive:

private startHeartbeat(workspaceId: string) {
  const interval = setInterval(() => {
    this.sendHeartbeat(workspaceId);
  }, 30 * 1000); // 30 seconds
 
  this.heartbeatIntervals.set(workspaceId, interval);
}

2. Exponential Backoff

Prevents overwhelming the server during reconnection:

const delay = baseReconnectDelay * Math.pow(2, reconnectAttempts.current);
// 1s, 2s, 4s, 8s, 16s

3. Tab Visibility Handling

Mobile browsers close connections when tabs are backgrounded:

const handleVisibilityChange = () => {
  if (document.visibilityState === 'visible') {
    if (eventSourceRef.current?.readyState === EventSource.CLOSED) {
      connect();
    }
  }
};

4. Memory Management

Clean up resources when workspaces disconnect:

subject.subscribe({
  complete: () => {
    this.stopHeartbeat(workspaceId);
    this.eventSubjects.delete(workspaceId);
    this.connectionCounts.delete(workspaceId);
  },
});

5. Horizontal Scaling

For multiple NestJS instances, use Redis Pub/Sub:

// Publisher (in NestJS instance that emits event)
await this.redis.publish(`workspace:${workspaceId}:events`, JSON.stringify({ type: 'tags_invalidated', data: { tags } }));
 
// Subscriber (in each NestJS instance)
await this.redis.subscribe(`workspace:${workspaceId}:events`);
this.redis.on('message', (channel, message) => {
  const event = JSON.parse(message);
  this.eventsService.emitToLocalClients(workspaceId, event);
});

Event Types

Connection Event

{
  "type": "connection",
  "data": {
    "timestamp": "2025-10-15T10:00:00.000Z"
  }
}

Heartbeat Event

{
  "type": "heartbeat",
  "data": {
    "timestamp": "2025-10-15T10:00:30.000Z"
  }
}

Tags Invalidated Event

{
  "type": "tags_invalidated",
  "data": {
    "tags": ["Credit"],
    "timestamp": "2025-10-15T10:01:00.000Z"
  }
}

Benefits

  1. Simple client code: Components just use RTK Query hooks
  2. Automatic updates: No manual refetching logic
  3. Type-safe: TypeScript end-to-end
  4. Scalable: Works with horizontal scaling via Redis
  5. Resilient: Automatic reconnection with backoff
  6. Efficient: Only refetch what's needed (tag-based invalidation)
  7. Secure: Authentication handled by Next.js proxy
  8. Observable: Easy to debug with event logging

Comparison with Alternatives

vs WebSockets

  • SSE is simpler for uni-directional server→client updates
  • Built on HTTP (easier to deploy, better proxy support)
  • Automatic reconnection built-in
  • No need for custom heartbeat protocol (browser handles it)

vs Polling

  • SSE eliminates wasted bandwidth
  • No delayed updates
  • Much more scalable (persistent connections vs constant requests)

vs GraphQL Subscriptions

  • Works with existing REST endpoints
  • No GraphQL schema changes needed
  • Simpler mental model
  • No WebSocket complexity

Debugging

Client-side Debugging

// Check connection status
const { isConnected } = useSSE();
console.log('SSE Connected:', isConnected);
 
// Monitor events in browser DevTools
// Network tab → Filter by "events" → View EventStream

Server-side Debugging

// NestJS logging
this.logger.log(`✅ [SSE] New connection - workspaceId: ${workspaceId}`);
this.logger.log(`📨 [SSE] Event emitted - type: ${eventType}`);
this.logger.log(`🔌 [SSE] Stream closed - workspaceId: ${workspaceId}`);

Testing

Manual Testing

  1. Open browser DevTools → Network tab
  2. Filter by "events"
  3. Click "Consume Credits"
  4. Watch for:
    • Immediate cache update
    • Pending transaction appears
    • SSE event received
    • Final balance update

Troubleshooting

Issue: SSE Connection Drops Frequently

Cause: Proxy buffering or timeout Solution: Ensure proxy headers are set correctly:

  • X-Accel-Buffering: no
  • Heartbeat interval < proxy timeout

Issue: Events Not Received

Cause: Tag mismatch or workspace isolation Solution: Verify:

  • providesTags matches invalidateTags
  • Workspace ID is correct
  • EventSource URL includes workspace parameter

Issue: Memory Leaks

Cause: Streams not cleaned up Solution: Ensure:

  • onModuleDestroy is called on shutdown
  • Subjects are completed when connections close
  • Heartbeat intervals are cleared

Security Considerations

  1. Authentication: All SSE connections require valid session
  2. Workspace Isolation: Users can only access their workspace events
  3. Rate Limiting: Consider implementing rate limits on SSE connections
  4. Token Expiration: Handle JWT expiration gracefully

Performance Metrics

  • Connection Time: ~200ms (initial handshake)
  • Event Latency: ~50ms (server emit → client receive)
  • Memory per Connection: ~5KB (RxJS Subject + metadata)
  • CPU per Connection: Negligible (idle connections)

References