Event Subscription Pattern
A comprehensive guide to designing client-facing Kafka event subscriptions in Mindbricks. Covers WebSocket and SSE transports, DataObject-based authorization, topic aliases, payload transformation, mixed-access splitting, custom auth scripts, and lazy Kafka consumers.
Event Subscription Pattern
Overview
An EventSubscription is a client-facing channel that bridges Kafka events to frontend clients in real time. It lives inside a business service and lets clients subscribe to a curated set of topics with built-in authorization derived from the linked DataObject's access configuration.
| Capability | RealtimeHub | EventSubscription |
|---|---|---|
| Scope | Room-based (users in a room) | User-based (individual clients) |
| Direction | Bidirectional (client ↔ server ↔ client) | Server → client only |
| Transport | Socket.IO only | Socket.IO or SSE (designer's choice) |
| Message model | Typed messages (text, image, video, ...) | Raw Kafka event payloads |
| Authorization | Room membership/ownership auth | DataObject access level (public/protected/private) |
| Kafka integration | Auto-bridges DataObject CRUD events to rooms | Client subscribes to any configured topic |
| Consumers | Always-on per hub | Lazy — start on first subscriber, stop on last |
| Persistence | Always (auto-generated Message DataObject) | None (events are transient) |
| Use case | Chat, collaboration, multiplayer | Dashboards, live feeds, notifications, tracking |
Use EventSubscription when: Clients need to receive backend events without joining a room — order status updates on a dashboard, live inventory changes on a product page, payment notifications, admin monitoring panels.
Use RealtimeHub when: Users need to talk to each other in real time — chat rooms, multiplayer games, collaborative editing.
Architecture
Each EventSubscription generates either a Socket.IO namespace or an SSE endpoint on the service's HTTP server:
- WebSocket —
io.of("/events/{subscriptionName}")for long-lived connections where clients dynamically subscribe and unsubscribe to topics. - SSE —
GET /events/{subscriptionName}/stream?topics=...for short-lived, page-scoped connections that open on page load and close on navigation.
The runtime architecture:
-
Authentication — Token-based auth middleware (same as RealtimeHub and REST APIs) validates every connection via the session layer.
-
Topic authorization — Each topic is linked to a DataObject. The DataObject's access level (
accessPublic,accessProtected,accessPrivate) and tenant scoping determine who can subscribe and what per-event filtering is applied. -
Lazy Kafka consumers — One shared Kafka consumer per topic, started when the first client subscribes and stopped when the last client disconnects. No idle consumers run.
-
Per-subscriber filtering — Messages from Kafka are distributed to subscribers after applying ownership and tenancy filters. Each subscriber only sees events they are authorized for.
-
Payload transformation — Designers can shape the delivered payload with MScript mappings or strip sensitive fields before delivery.
Pattern Structure
An EventSubscription is a Chapter pattern with three sections:
EventSubscription
├── EventSubBasics
│ ├── name (String) — subscription identifier
│ ├── description (Text) — human-readable description
│ └── transport (EventSubTransport) — "websocket" or "sse"
│
├── EventSubTopics: [EventSubTopic]
│ ├── name (String) — client-facing event name
│ ├── dataObjectName (DataObjectName) — the DataObject this topic belongs to
│ ├── topic (String) — Kafka topic (supports aliases)
│ ├── description (String) — human-readable description
│ ├── forceAccessLevel (DataObjectAccess?) — override DataObject's access level
│ ├── payloadMapping (MScript?) — transform payload before delivery
│ └── excludeFields ([String]?) — fields to strip from payload
│
└── EventSubAuth
├── absoluteRoles ([String]) — roles that bypass ALL checks
├── checkRoles ([String]) — roles required to subscribe
├── authScript (MScript?) — custom gate at subscribe time
└── filterScript (MScript?) — per-event filter
Designing an Event Subscription
Step 1: Choose the Transport
| Transport | "websocket" | "sse" |
|---|---|---|
| Best for | Long-lived: dashboards, tickers, admin panels | Short-lived: product pages, order tracking |
| Connection | Persistent WebSocket | HTTP long-poll |
| Subscribe/Unsubscribe | Dynamic via events | Fixed at connection time via query param |
| Multiple subscriptions | One connection, many topics | One connection per endpoint |
| When to close | Explicit disconnect or app close | Page navigation |
Step 2: Select DataObjects and Topics
In the Mindbricks Studio UI, the designer:
- Selects a DataObject — The dropdown shows all DataObjects in the service.
- Picks a topic — The available topics are derived from the DataObject's API events and DB events. Topic aliases are supported (e.g.,
order.created,review_updated,dbEvent:inventory:updated). - Names the event — The
namefield is the event name emitted to clients (e.g.,orderCreated).
Each topic inherits its authorization from the DataObject's access configuration. The designer can override this with forceAccessLevel.
Step 3: Configure Authorization
Default Authorization (Automatic)
Every topic's authorization is automatically derived from its DataObject:
| DataObject Access | Who Can Subscribe | Per-Event Filter |
|---|---|---|
accessPublic | Anyone (even unauthenticated) | None — all events pass |
accessProtected | Authenticated users | Tenant filter in multi-tenant mode |
accessPrivate | Owner or admin | _owner == userId (+ tenant filter) |
accessPrivate (no _owner field) | Admin roles only | None (admins see all) |
Admin roles: superAdmin, admin, saasAdmin, tenantOwner, tenantAdmin.
Mixed Access Level Handling
When a subscription includes topics from DataObjects with different access levels, the system automatically splits the authorization:
Subscription: "dashboard"
├── order.created (Order, accessPrivate) → user sees only their own orders
├── review.created (Review, accessProtected) → user sees all reviews in their tenant
└── announcement.created (Announcement, accessPublic) → everyone sees all announcements
For a regular user subscribing to all 3:
order.created— Allowed, filtered by_owner == userIdreview.created— Allowed, filtered by_tenantIdannouncement.created— Allowed, no filter
For an unauthenticated user:
order.created— Rejectedreview.created— Rejectedannouncement.created— Allowed
The server responds with { topics: [...allowed], rejected: [...denied] }, and the client can adjust its UI accordingly.
Custom Authorization
| Field | When Evaluated | Context Variables | Purpose |
|---|---|---|---|
absoluteRoles | Subscribe time | — | Roles that skip ALL checks, receive all events unfiltered |
checkRoles | Subscribe time | — | Roles required to use this subscription at all |
authScript | Subscribe time, per topic | session, topicName, dataObjectName | Custom gate — return false to reject a topic |
filterScript | Per event | session, topicName, dataObjectName, data | Custom filter — return false to skip an event for this user |
Authorization evaluation order:
1. Is session valid? ────────────────────────── No → Reject all
2. Is roleId in absoluteRoles? ─────────────── Yes → Allow all (no filtering)
3. Is checkRoles defined and roleId not in it? ─ Yes → Reject all
4. For each topic: default auth (DataObject access level)
5. For each allowed topic: authScript (if defined)
6. Result: allowed subset + rejected list
Payload Transformation
payloadMapping
Build a completely new payload from the Kafka message. The data variable holds the parsed message:
payloadMapping: "{ orderId: data.id, status: data.status, total: data.totalAmount }"
Original Kafka message:
{ "id": "ord-123", "status": "shipped", "totalAmount": 99.99, "internalCost": 45.00, "adminNotes": "..." }
Delivered to client:
{ "orderId": "ord-123", "status": "shipped", "total": 99.99 }
excludeFields
Strip specific fields from the payload without building a new one:
excludeFields: ["internalCost", "adminNotes", "auditLog"]
Order of operations: If both payloadMapping and excludeFields are defined, payloadMapping runs first, then excludeFields strips from the result.
Topic Aliases
The topic field supports the full Mindbricks topic alias system. Instead of writing literal Kafka topic names, designers use readable aliases:
| Alias | Resolves To | Type |
|---|---|---|
order.created | {serviceCodename}-order-created | API event |
review_updated | {serviceCodename}-review-updated | API event |
createUser:done | {serviceCodename}-user-created | API event (completion keyword) |
dbEvent:inventory:updated | {serviceCodename}-dbevent-inventory-updated | DB event |
external-payments-topic | external-payments-topic | Literal (no alias match) |
Topic aliases are resolved at build time and embedded in the generated code. The runtime uses the resolved topic names directly.
Client Protocol
WebSocket (Socket.IO)
Connection
import { io } from "socket.io-client";
const socket = io(`${baseUrl}/events/dashboard`, {
auth: {
token: `Bearer ${accessToken}`,
tenantCodename: "acme", // only in multi-tenant projects
},
transports: ["websocket"],
});
Subscribe
socket.emit("subscribe", {
topics: ["orderCreated", "paymentReceived"], // subset of available topics
});
socket.on("subscribed", ({ topics, rejected }) => {
// topics: ["orderCreated"] — successfully subscribed
// rejected: [{ topic: "paymentReceived", reason: "Admin role required" }]
});
Receive Events
socket.on("event", ({ topic, name, data }) => {
// topic: "orderCreated" — matches the name you subscribed to
// name: "orderCreated" — same as topic (client-facing name)
// data: { id: "ord-123", ... } — transformed payload
});
Unsubscribe
socket.emit("unsubscribe", { topics: ["orderCreated"] });
socket.on("unsubscribed", ({ topics }) => {
// topics: ["orderCreated"]
});
Error Handling
socket.on("error", ({ message }) => {
console.error("Subscription error:", message);
});
socket.on("connect_error", (err) => {
console.error("Connection failed:", err.message);
});
SSE (Server-Sent Events)
Connection
const topics = ["reviewAdded", "inventoryChanged"].join(",");
const url = `${baseUrl}/events/product-updates/stream?topics=${topics}`;
const eventSource = new EventSource(url);
Note: The native
EventSourceAPI does not support custom headers. For token-based auth, use@microsoft/fetch-event-sourceor pass the token via a query parameter if the backend supports it.
Events
// Subscription confirmation
eventSource.addEventListener("subscribed", (e) => {
const { topics, rejected } = JSON.parse(e.data);
});
// Incoming events
eventSource.addEventListener("event", (e) => {
const { topic, name, data } = JSON.parse(e.data);
});
// Close on page leave
window.addEventListener("beforeunload", () => eventSource.close());
Runtime Internals
Lazy Kafka Consumers
Each topic gets a shared Kafka consumer (not per-client). Consumers are managed lazily:
| Event | Action |
|---|---|
| First subscriber for a topic | createConsumer(topic) → connect → subscribe → run |
| Additional subscribers | Just add to the subscription registry |
| Subscriber disconnects | Decrement reference count |
| Last subscriber disconnects | consumer.disconnect() → remove from registry |
Consumer group: evtsub-{subscriptionName}-{topicName}
This means:
- No idle consumers when no one is listening
- Multiple service instances each run their own consumer group (Kafka partitions distribute messages)
- Consumer creation is fast (typically less than 1s)
Message Distribution Flow
Kafka message arrives
│
▼
Parse JSON payload
│
▼
For each subscriber on this topic:
│
├── 1. Default filter (ownership, tenancy)
│ ├── absoluteRoles? → skip filters
│ ├── requiresTenantFilter? → data._tenantId == subscriber.tenantId
│ └── requiresOwnership? → data._owner == subscriber.userId
│
├── 2. Custom filterScript (if defined)
│ └── evaluate with { session, topicName, dataObjectName, data }
│
├── 3. Payload transformation
│ ├── payloadMapping? → build new payload
│ └── excludeFields? → strip fields
│
└── 4. Emit to subscriber
├── WebSocket: nsp.to(socketId).emit("event", { topic, name, data })
└── SSE: res.write("event: event
data: {...}
")
Examples
Example 1: E-Commerce Dashboard (WebSocket)
An admin dashboard showing live order activity:
Pattern configuration:
{
"basics": {
"name": "admin-dashboard",
"description": "Live order and payment events for the admin dashboard",
"transport": "websocket"
},
"topics": [
{
"name": "orderCreated",
"dataObjectName": "Order",
"topic": "order.created",
"description": "New order placed"
},
{
"name": "orderStatusChanged",
"dataObjectName": "Order",
"topic": "order.updated",
"description": "Order status updated",
"payloadMapping": "{ orderId: data.id, status: data.status, customer: data.customerName }"
},
{
"name": "paymentReceived",
"dataObjectName": "Payment",
"topic": "payment.created",
"description": "Payment processed",
"excludeFields": ["stripeToken", "internalRef"]
}
],
"auth": {
"absoluteRoles": ["superAdmin"],
"checkRoles": ["admin", "tenantAdmin", "salesManager"]
}
}
Frontend integration:
const socket = io(`${baseUrl}/events/admin-dashboard`, {
auth: { token: `Bearer ${token}`, tenantCodename: "acme" },
transports: ["websocket"],
});
socket.on("connect", () => {
socket.emit("subscribe", {
topics: ["orderCreated", "orderStatusChanged", "paymentReceived"],
});
});
socket.on("subscribed", ({ topics }) => {
console.log(`Dashboard listening to: ${topics.join(", ")}`);
});
socket.on("event", ({ name, data }) => {
switch (name) {
case "orderCreated":
addToOrderFeed(data);
incrementOrderCounter();
break;
case "orderStatusChanged":
updateOrderStatus(data.orderId, data.status);
break;
case "paymentReceived":
addToPaymentFeed(data);
updateRevenue(data.amount);
break;
}
});
Example 2: Product Page Live Updates (SSE)
A product page that shows new reviews and stock changes in real time:
Pattern configuration:
{
"basics": {
"name": "product-updates",
"description": "Live product page updates for reviews and inventory",
"transport": "sse"
},
"topics": [
{
"name": "reviewAdded",
"dataObjectName": "Review",
"topic": "review.created",
"description": "New product review submitted",
"forceAccessLevel": "accessPublic",
"excludeFields": ["reviewerEmail", "ipAddress"]
},
{
"name": "stockChanged",
"dataObjectName": "Inventory",
"topic": "dbEvent:inventory:updated",
"description": "Product stock level changed",
"forceAccessLevel": "accessPublic",
"payloadMapping": "{ productId: data.productId, inStock: data.quantity > 0, quantity: data.quantity }"
}
],
"auth": {
"absoluteRoles": [],
"checkRoles": []
}
}
Frontend integration:
// Open SSE when the product page loads
const productId = "prod-456";
const topics = "reviewAdded,stockChanged";
const es = new EventSource(
`${baseUrl}/events/product-updates/stream?topics=${topics}`
);
es.addEventListener("event", (e) => {
const { name, data } = JSON.parse(e.data);
if (name === "reviewAdded" && data.productId === productId) {
appendReview(data);
updateRatingAverage();
}
if (name === "stockChanged" && data.productId === productId) {
updateStockBadge(data.inStock, data.quantity);
}
});
// Clean up on page leave
window.addEventListener("beforeunload", () => es.close());
Example 3: Order Tracking with Mixed Access (WebSocket)
A customer-facing order tracker where customers see only their own orders, but support agents see all:
Pattern configuration:
{
"basics": {
"name": "order-tracking",
"description": "Real-time order tracking for customers and support agents",
"transport": "websocket"
},
"topics": [
{
"name": "orderUpdate",
"dataObjectName": "Order",
"topic": "order.updated",
"description": "Order status, location, and ETA changes",
"payloadMapping": "{ orderId: data.id, status: data.status, eta: data.estimatedDelivery, location: data.currentLocation }"
},
{
"name": "deliveryAssigned",
"dataObjectName": "Delivery",
"topic": "delivery.created",
"description": "Delivery driver assigned to order"
}
],
"auth": {
"absoluteRoles": ["superAdmin", "supportAgent"],
"checkRoles": [],
"filterScript": null
}
}
How authorization works:
- Customer (role:
user): Subscribes toorderUpdateanddeliveryAssigned. Both DataObjects areaccessPrivate, so only events where_owner == userIdare delivered. - Support agent (role:
supportAgent): Listed inabsoluteRoles— receives ALL events without filtering. Sees every order and delivery. - Unauthenticated user: Rejected entirely (private DataObjects require auth).
Example 4: Custom Auth with Finance Data
A subscription where premium users get real-time market data but free users are restricted:
Pattern configuration:
{
"basics": {
"name": "market-feed",
"description": "Real-time market price updates",
"transport": "websocket"
},
"topics": [
{
"name": "priceUpdate",
"dataObjectName": "MarketPrice",
"topic": "market-prices",
"description": "Live price ticker",
"forceAccessLevel": "accessProtected"
},
{
"name": "tradeExecuted",
"dataObjectName": "Trade",
"topic": "trade.created",
"description": "Trade execution notifications"
}
],
"auth": {
"absoluteRoles": ["superAdmin"],
"checkRoles": [],
"authScript": "session.subscriptionTier === 'premium' || session.subscriptionTier === 'enterprise'",
"filterScript": "data.exchange === session.preferredExchange || !session.preferredExchange"
}
}
- authScript: Only premium/enterprise users can subscribe (evaluated at subscribe time).
- filterScript: Each price update is filtered so users only see their preferred exchange (evaluated per event).
Comparison with EventToken / Realtime Service
The older pattern uses a separate Realtime Service with event tokens:
| Aspect | Event Token + Realtime Service | EventSubscription |
|---|---|---|
| Architecture | Separate microservice | Inside the business service |
| Auth model | JWT with topic patterns + filter logic | DataObject access levels + custom scripts |
| Topic selection | Token defines all allowed topics | Client selects subset at runtime |
| Filter logic | Encoded in token (eq, &&) | Runtime filter per subscriber |
| Consumer lifecycle | Always running | Lazy (start/stop on demand) |
| Payload control | None (full payload) | payloadMapping + excludeFields |
| Transport | Socket.IO only | Socket.IO or SSE |
| Configuration | Code-generated from DataObject access levels | Explicit pattern with designer control |
EventSubscription is the modern replacement that gives designers explicit control over which events are exposed, how they're authorized, and what payload shape clients receive.
Generated Files
When a service has EventSubscriptions, the following files are generated:
| File | Purpose |
|---|---|
socket-layer/event-subs/{name}-runtime.js | Per-subscription runtime (topic configs, lazy consumers, controller setup) |
socket-layer/EventSubscriptionController.js | Base controller (auth, subscribe, distribute, SSE handling) |
socket-layer/socket-app.js | Socket.IO server (generated when hubs or WS subscriptions exist) |
socket-layer/index.js | Initializes all hubs and event subscriptions |
The EventSubscriptionController is a static base class (not generated per-subscription) that handles:
- WebSocket auth middleware and connection management
- SSE request handling with proper headers
- Topic authorization with default + custom auth
- Lazy Kafka consumer lifecycle
- Per-subscriber message distribution with filtering
- Payload transformation
Best Practices
-
Choose transport wisely — Use WebSocket for dashboards and monitoring panels where users stay connected for minutes/hours. Use SSE for page-specific updates where the connection lives only as long as the page.
-
Keep subscriptions focused — Group related topics into one subscription (e.g., all order-related events in
order-tracking). Don't create one giant subscription with every topic in the service. -
Use forceAccessLevel sparingly — Default authorization from the DataObject is usually correct. Only override when a specific event should be more or less restricted than the DataObject itself.
-
Transform payloads — Use
payloadMappingto send only the fields the frontend needs. This reduces bandwidth and prevents accidental exposure of sensitive data. -
Use excludeFields for sensitive data — Strip internal fields like audit logs, cost breakdowns, and admin notes before delivery.
-
Prefer topic aliases — Use
order.createdinstead ofmyservice-order-created. Aliases are readable and portable. -
Handle rejected topics gracefully — When the server responds with rejected topics, adjust the UI. Don't retry rejected topics — they'll keep failing with the same authorization result.
-
Clean up SSE connections — Always close
EventSourceon page navigation. Unclosed connections waste server resources.
Last updated today