Use Upstash Realtime on the server to subscribe to events, retrieve message history, and stream updates to clients.
Subscribe to Events
import { realtime } from "@/lib/realtime"
const channel = realtime.channel("notifications")
await channel.on("notification.alert", (data) => {
console.log("New notification:", data)
})
Subscribe to multiple events:
import { realtime } from "@/lib/realtime"
const channel = realtime.channel("room-123")
await Promise.all([
channel.on("chat.message", (data) => {
console.log("New message:", data)
}),
channel.on("user.joined", (data) => {
console.log("User joined:", data)
}),
])
Retrieve History
Fetch past messages from a channel:
import { realtime } from "@/lib/realtime"
export const GET = async () => {
const channel = realtime.channel("room-123")
const messages = await channel.history({ length: 50 })
return new Response(JSON.stringify(messages))
}
History Options
Number of messages to retrieve (starting from most recent)
Fetch messages by Unix timestamp (inclusive, in milliseconds)
const messages = await channel.history({
length: 100,
since: Date.now() - 86400000,
})
Subscribe with History
Replay past messages and continue subscribing to new ones:
import { realtime } from "@/lib/realtime"
const channel = realtime.channel("room-123")
await channel
.history({ length: 50 })
.on("chat.message", (data) => {
console.log("Message:", data)
})
This pattern:
- Fetches the last 50 messages
- Replays them in chronological order
- Continues to listen for new messages
Stream to Clients
Stream historical and new messages to clients using Server-Sent Events:
import { realtime } from "@/lib/realtime"
export const GET = async (req: Request) => {
const { searchParams } = new URL(req.url)
const channelId = searchParams.get("channel")
if (!channelId) {
return new Response("Channel ID required", { status: 400 })
}
const channel = realtime.channel(channelId)
const stream = new ReadableStream({
async start(controller) {
await channel.history().on("notification.alert", (data) => {
controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)
})
},
})
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
})
}
Handle Disconnections
Clean up subscriptions when a client disconnects:
import { realtime } from "@/lib/realtime"
export const GET = async (req: Request) => {
const { searchParams } = new URL(req.url)
const channelId = searchParams.get("channel")
if (!channelId) {
return new Response("Channel ID required", { status: 400 })
}
const channel = realtime.channel(channelId)
const stream = new ReadableStream({
async start(controller) {
await channel.history().on("task.update", (data) => {
controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)
if (data.status === "completed" || data.status === "failed") {
controller.close()
}
})
req.signal.addEventListener("abort", () => {
controller.close()
})
},
})
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
})
}
Use Cases
Stream progress updates from long-running tasks:import { realtime } from "@/lib/realtime"
export const POST = async (req: Request) => {
const { jobId } = await req.json()
const channel = realtime.channel(jobId)
await channel.emit("job.started", { progress: 0 })
for (let i = 0; i <= 100; i += 10) {
await processChunk()
await channel.emit("job.progress", { progress: i })
}
await channel.emit("job.completed", { progress: 100 })
return new Response("OK")
}
Stream updates to the client:app/api/job/stream/route.ts
import { realtime } from "@/lib/realtime"
export const GET = async (req: Request) => {
const { searchParams } = new URL(req.url)
const jobId = searchParams.get("id")
const channel = realtime.channel(jobId)
const stream = new ReadableStream({
async start(controller) {
await channel.history().on("job.progress", (data) => {
controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)
if (data.progress === 100) {
controller.close()
}
})
},
})
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
})
}
Process events with server-side logic:import { realtime } from "@/lib/realtime"
import { sendEmail } from "@/lib/email"
const channel = realtime.channel("notifications")
await channel.on("notification.alert", async (data) => {
if (data.priority === "high") {
await sendEmail({
to: data.userId,
subject: "Urgent Notification",
body: data.message,
})
}
})
Multi-Channel Broadcasting
Emit events to multiple channels:import { realtime } from "@/lib/realtime"
export const POST = async (req: Request) => {
const { teamIds, message } = await req.json()
await Promise.all(
teamIds.map((teamId: string) =>
realtime.channel(`team-${teamId}`).emit("announcement", message)
)
)
return new Response("Broadcast sent")
}
Forward webhook events to realtime channels:import { realtime } from "@/lib/realtime"
export const POST = async (req: Request) => {
const payload = await req.json()
const channel = realtime.channel(`user-${payload.userId}`)
await channel.emit("webhook.received", payload)
return new Response("OK")
}
Emit Events
Emit events from any server context:
import { realtime } from "@/lib/realtime"
export const POST = async () => {
await realtime.emit("notification.alert", "hello world!")
return new Response("OK")
}
Emit to specific channels:
import { realtime } from "@/lib/realtime"
export const POST = async () => {
const channel = realtime.channel("user-123")
await channel.emit("notification.alert", "hello world!")
return new Response("OK")
}
Next Steps