Skip to main content
Use Upstash Realtime on the server to subscribe to events, retrieve message history, and stream updates to clients.

Subscribe to Events

route.ts
import { realtime } from "@/lib/realtime"

const channel = realtime.channel("notifications")

await channel.on("notification.alert", (data) => {
  console.log("New notification:", data)
})
Subscribe to multiple events:
route.ts
import { realtime } from "@/lib/realtime"

const channel = realtime.channel("room-123")

await Promise.all([
  channel.on("chat.message", (data) => {
    console.log("New message:", data)
  }),
  channel.on("user.joined", (data) => {
    console.log("User joined:", data)
  }),
])

Retrieve History

Fetch past messages from a channel:
route.ts
import { realtime } from "@/lib/realtime"

export const GET = async () => {
  const channel = realtime.channel("room-123")
  const messages = await channel.history({ length: 50 })

  return new Response(JSON.stringify(messages))
}

History Options

length
number
Number of messages to retrieve (starting from most recent)
since
number
Fetch messages by Unix timestamp (inclusive, in milliseconds)
route.ts
const messages = await channel.history({
  length: 100,
  since: Date.now() - 86400000,
})

Subscribe with History

Replay past messages and continue subscribing to new ones:
route.ts
import { realtime } from "@/lib/realtime"

const channel = realtime.channel("room-123")

await channel
  .history({ length: 50 })
  .on("chat.message", (data) => {
    console.log("Message:", data)
  })
This pattern:
  1. Fetches the last 50 messages
  2. Replays them in chronological order
  3. Continues to listen for new messages

Stream to Clients

Stream historical and new messages to clients using Server-Sent Events:
app/api/stream/route.ts
import { realtime } from "@/lib/realtime"

export const GET = async (req: Request) => {
  const { searchParams } = new URL(req.url)
  const channelId = searchParams.get("channel")

  if (!channelId) {
    return new Response("Channel ID required", { status: 400 })
  }

  const channel = realtime.channel(channelId)

  const stream = new ReadableStream({
    async start(controller) {
      await channel.history().on("notification.alert", (data) => {
        controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)
      })
    },
  })

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive",
    },
  })
}

Handle Disconnections

Clean up subscriptions when a client disconnects:
app/api/stream/route.ts
import { realtime } from "@/lib/realtime"

export const GET = async (req: Request) => {
  const { searchParams } = new URL(req.url)
  const channelId = searchParams.get("channel")

  if (!channelId) {
    return new Response("Channel ID required", { status: 400 })
  }

  const channel = realtime.channel(channelId)

  const stream = new ReadableStream({
    async start(controller) {
      await channel.history().on("task.update", (data) => {
        controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)

        if (data.status === "completed" || data.status === "failed") {
          controller.close()
        }
      })

      req.signal.addEventListener("abort", () => {
        controller.close()
      })
    },
  })

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive",
    },
  })
}

Use Cases

Stream progress updates from long-running tasks:
app/api/job/route.ts
import { realtime } from "@/lib/realtime"

export const POST = async (req: Request) => {
  const { jobId } = await req.json()
  const channel = realtime.channel(jobId)

  await channel.emit("job.started", { progress: 0 })

  for (let i = 0; i <= 100; i += 10) {
    await processChunk()
    await channel.emit("job.progress", { progress: i })
  }

  await channel.emit("job.completed", { progress: 100 })

  return new Response("OK")
}
Stream updates to the client:
app/api/job/stream/route.ts
import { realtime } from "@/lib/realtime"

export const GET = async (req: Request) => {
  const { searchParams } = new URL(req.url)
  const jobId = searchParams.get("id")

  const channel = realtime.channel(jobId)

  const stream = new ReadableStream({
    async start(controller) {
      await channel.history().on("job.progress", (data) => {
        controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)

        if (data.progress === 100) {
          controller.close()
        }
      })
    },
  })

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  })
}
Process events with server-side logic:
route.ts
import { realtime } from "@/lib/realtime"
import { sendEmail } from "@/lib/email"

const channel = realtime.channel("notifications")

await channel.on("notification.alert", async (data) => {
  if (data.priority === "high") {
    await sendEmail({
      to: data.userId,
      subject: "Urgent Notification",
      body: data.message,
    })
  }
})
Emit events to multiple channels:
route.ts
import { realtime } from "@/lib/realtime"

export const POST = async (req: Request) => {
  const { teamIds, message } = await req.json()

  await Promise.all(
    teamIds.map((teamId: string) =>
      realtime.channel(`team-${teamId}`).emit("announcement", message)
    )
  )

  return new Response("Broadcast sent")
}
Forward webhook events to realtime channels:
app/api/webhook/route.ts
import { realtime } from "@/lib/realtime"

export const POST = async (req: Request) => {
  const payload = await req.json()

  const channel = realtime.channel(`user-${payload.userId}`)
  await channel.emit("webhook.received", payload)

  return new Response("OK")
}

Emit Events

Emit events from any server context:
route.ts
import { realtime } from "@/lib/realtime"

export const POST = async () => {
  await realtime.emit("notification.alert", "hello world!")

  return new Response("OK")
}
Emit to specific channels:
route.ts
import { realtime } from "@/lib/realtime"

export const POST = async () => {
  const channel = realtime.channel("user-123")
  await channel.emit("notification.alert", "hello world!")

  return new Response("OK")
}

Next Steps

I