Mastering MindbricksEvent Subscriptions
Mastering Mindbricks

Event Subscription Pattern

A comprehensive guide to designing client-facing Kafka event subscriptions in Mindbricks. Covers WebSocket and SSE transports, DataObject-based authorization, topic aliases, payload transformation, mixed-access splitting, custom auth scripts, and lazy Kafka consumers.

Event Subscription Pattern

Overview

An EventSubscription is a client-facing channel that bridges Kafka events to frontend clients in real time. It lives inside a business service and lets clients subscribe to a curated set of topics with built-in authorization derived from the linked DataObject's access configuration.

CapabilityRealtimeHubEventSubscription
ScopeRoom-based (users in a room)User-based (individual clients)
DirectionBidirectional (client ↔ server ↔ client)Server → client only
TransportSocket.IO onlySocket.IO or SSE (designer's choice)
Message modelTyped messages (text, image, video, ...)Raw Kafka event payloads
AuthorizationRoom membership/ownership authDataObject access level (public/protected/private)
Kafka integrationAuto-bridges DataObject CRUD events to roomsClient subscribes to any configured topic
ConsumersAlways-on per hubLazy — start on first subscriber, stop on last
PersistenceAlways (auto-generated Message DataObject)None (events are transient)
Use caseChat, collaboration, multiplayerDashboards, live feeds, notifications, tracking

Use EventSubscription when: Clients need to receive backend events without joining a room — order status updates on a dashboard, live inventory changes on a product page, payment notifications, admin monitoring panels.

Use RealtimeHub when: Users need to talk to each other in real time — chat rooms, multiplayer games, collaborative editing.


Architecture

Each EventSubscription generates either a Socket.IO namespace or an SSE endpoint on the service's HTTP server:

  • WebSocketio.of("/events/{subscriptionName}") for long-lived connections where clients dynamically subscribe and unsubscribe to topics.
  • SSEGET /events/{subscriptionName}/stream?topics=... for short-lived, page-scoped connections that open on page load and close on navigation.

The runtime architecture:

  1. Authentication — Token-based auth middleware (same as RealtimeHub and REST APIs) validates every connection via the session layer.

  2. Topic authorization — Each topic is linked to a DataObject. The DataObject's access level (accessPublic, accessProtected, accessPrivate) and tenant scoping determine who can subscribe and what per-event filtering is applied.

  3. Lazy Kafka consumers — One shared Kafka consumer per topic, started when the first client subscribes and stopped when the last client disconnects. No idle consumers run.

  4. Per-subscriber filtering — Messages from Kafka are distributed to subscribers after applying ownership and tenancy filters. Each subscriber only sees events they are authorized for.

  5. Payload transformation — Designers can shape the delivered payload with MScript mappings or strip sensitive fields before delivery.


Pattern Structure

An EventSubscription is a Chapter pattern with three sections:

EventSubscription
├── EventSubBasics
│   ├── name          (String)           — subscription identifier
│   ├── description   (Text)             — human-readable description
│   └── transport     (EventSubTransport) — "websocket" or "sse"

├── EventSubTopics: [EventSubTopic]
│   ├── name             (String)         — client-facing event name
│   ├── dataObjectName   (DataObjectName) — the DataObject this topic belongs to
│   ├── topic            (String)         — Kafka topic (supports aliases)
│   ├── description      (String)         — human-readable description
│   ├── forceAccessLevel (DataObjectAccess?) — override DataObject's access level
│   ├── payloadMapping   (MScript?)       — transform payload before delivery
│   └── excludeFields   ([String]?)       — fields to strip from payload

└── EventSubAuth
    ├── absoluteRoles   ([String])   — roles that bypass ALL checks
    ├── checkRoles      ([String])   — roles required to subscribe
    ├── authScript      (MScript?)   — custom gate at subscribe time
    └── filterScript    (MScript?)   — per-event filter

Designing an Event Subscription

Step 1: Choose the Transport

Transport"websocket""sse"
Best forLong-lived: dashboards, tickers, admin panelsShort-lived: product pages, order tracking
ConnectionPersistent WebSocketHTTP long-poll
Subscribe/UnsubscribeDynamic via eventsFixed at connection time via query param
Multiple subscriptionsOne connection, many topicsOne connection per endpoint
When to closeExplicit disconnect or app closePage navigation

Step 2: Select DataObjects and Topics

In the Mindbricks Studio UI, the designer:

  1. Selects a DataObject — The dropdown shows all DataObjects in the service.
  2. Picks a topic — The available topics are derived from the DataObject's API events and DB events. Topic aliases are supported (e.g., order.created, review_updated, dbEvent:inventory:updated).
  3. Names the event — The name field is the event name emitted to clients (e.g., orderCreated).

Each topic inherits its authorization from the DataObject's access configuration. The designer can override this with forceAccessLevel.

Step 3: Configure Authorization

Default Authorization (Automatic)

Every topic's authorization is automatically derived from its DataObject:

DataObject AccessWho Can SubscribePer-Event Filter
accessPublicAnyone (even unauthenticated)None — all events pass
accessProtectedAuthenticated usersTenant filter in multi-tenant mode
accessPrivateOwner or admin_owner == userId (+ tenant filter)
accessPrivate (no _owner field)Admin roles onlyNone (admins see all)

Admin roles: superAdmin, admin, saasAdmin, tenantOwner, tenantAdmin.

Mixed Access Level Handling

When a subscription includes topics from DataObjects with different access levels, the system automatically splits the authorization:

Subscription: "dashboard"
├── order.created      (Order, accessPrivate)    → user sees only their own orders
├── review.created     (Review, accessProtected) → user sees all reviews in their tenant
└── announcement.created (Announcement, accessPublic) → everyone sees all announcements

For a regular user subscribing to all 3:

  • order.createdAllowed, filtered by _owner == userId
  • review.createdAllowed, filtered by _tenantId
  • announcement.createdAllowed, no filter

For an unauthenticated user:

  • order.createdRejected
  • review.createdRejected
  • announcement.createdAllowed

The server responds with { topics: [...allowed], rejected: [...denied] }, and the client can adjust its UI accordingly.

Custom Authorization

FieldWhen EvaluatedContext VariablesPurpose
absoluteRolesSubscribe timeRoles that skip ALL checks, receive all events unfiltered
checkRolesSubscribe timeRoles required to use this subscription at all
authScriptSubscribe time, per topicsession, topicName, dataObjectNameCustom gate — return false to reject a topic
filterScriptPer eventsession, topicName, dataObjectName, dataCustom filter — return false to skip an event for this user

Authorization evaluation order:

1. Is session valid? ────────────────────────── No → Reject all
2. Is roleId in absoluteRoles? ─────────────── Yes → Allow all (no filtering)
3. Is checkRoles defined and roleId not in it? ─ Yes → Reject all
4. For each topic: default auth (DataObject access level)
5. For each allowed topic: authScript (if defined)
6. Result: allowed subset + rejected list

Payload Transformation

payloadMapping

Build a completely new payload from the Kafka message. The data variable holds the parsed message:

payloadMapping: "{ orderId: data.id, status: data.status, total: data.totalAmount }"

Original Kafka message:

{ "id": "ord-123", "status": "shipped", "totalAmount": 99.99, "internalCost": 45.00, "adminNotes": "..." }

Delivered to client:

{ "orderId": "ord-123", "status": "shipped", "total": 99.99 }

excludeFields

Strip specific fields from the payload without building a new one:

excludeFields: ["internalCost", "adminNotes", "auditLog"]

Order of operations: If both payloadMapping and excludeFields are defined, payloadMapping runs first, then excludeFields strips from the result.


Topic Aliases

The topic field supports the full Mindbricks topic alias system. Instead of writing literal Kafka topic names, designers use readable aliases:

AliasResolves ToType
order.created{serviceCodename}-order-createdAPI event
review_updated{serviceCodename}-review-updatedAPI event
createUser:done{serviceCodename}-user-createdAPI event (completion keyword)
dbEvent:inventory:updated{serviceCodename}-dbevent-inventory-updatedDB event
external-payments-topicexternal-payments-topicLiteral (no alias match)

Topic aliases are resolved at build time and embedded in the generated code. The runtime uses the resolved topic names directly.


Client Protocol

WebSocket (Socket.IO)

Connection

import { io } from "socket.io-client";

const socket = io(`${baseUrl}/events/dashboard`, {
  auth: {
    token: `Bearer ${accessToken}`,
    tenantCodename: "acme",  // only in multi-tenant projects
  },
  transports: ["websocket"],
});

Subscribe

socket.emit("subscribe", {
  topics: ["orderCreated", "paymentReceived"],  // subset of available topics
});

socket.on("subscribed", ({ topics, rejected }) => {
  // topics: ["orderCreated"] — successfully subscribed
  // rejected: [{ topic: "paymentReceived", reason: "Admin role required" }]
});

Receive Events

socket.on("event", ({ topic, name, data }) => {
  // topic: "orderCreated"          — matches the name you subscribed to
  // name:  "orderCreated"          — same as topic (client-facing name)
  // data:  { id: "ord-123", ... }  — transformed payload
});

Unsubscribe

socket.emit("unsubscribe", { topics: ["orderCreated"] });

socket.on("unsubscribed", ({ topics }) => {
  // topics: ["orderCreated"]
});

Error Handling

socket.on("error", ({ message }) => {
  console.error("Subscription error:", message);
});

socket.on("connect_error", (err) => {
  console.error("Connection failed:", err.message);
});

SSE (Server-Sent Events)

Connection

const topics = ["reviewAdded", "inventoryChanged"].join(",");
const url = `${baseUrl}/events/product-updates/stream?topics=${topics}`;

const eventSource = new EventSource(url);

Note: The native EventSource API does not support custom headers. For token-based auth, use @microsoft/fetch-event-source or pass the token via a query parameter if the backend supports it.

Events

// Subscription confirmation
eventSource.addEventListener("subscribed", (e) => {
  const { topics, rejected } = JSON.parse(e.data);
});

// Incoming events
eventSource.addEventListener("event", (e) => {
  const { topic, name, data } = JSON.parse(e.data);
});

// Close on page leave
window.addEventListener("beforeunload", () => eventSource.close());

Runtime Internals

Lazy Kafka Consumers

Each topic gets a shared Kafka consumer (not per-client). Consumers are managed lazily:

EventAction
First subscriber for a topiccreateConsumer(topic) → connect → subscribe → run
Additional subscribersJust add to the subscription registry
Subscriber disconnectsDecrement reference count
Last subscriber disconnectsconsumer.disconnect() → remove from registry

Consumer group: evtsub-{subscriptionName}-{topicName}

This means:

  • No idle consumers when no one is listening
  • Multiple service instances each run their own consumer group (Kafka partitions distribute messages)
  • Consumer creation is fast (typically less than 1s)

Message Distribution Flow

Kafka message arrives


Parse JSON payload


For each subscriber on this topic:

    ├── 1. Default filter (ownership, tenancy)
    │   ├── absoluteRoles? → skip filters
    │   ├── requiresTenantFilter? → data._tenantId == subscriber.tenantId
    │   └── requiresOwnership? → data._owner == subscriber.userId

    ├── 2. Custom filterScript (if defined)
    │   └── evaluate with { session, topicName, dataObjectName, data }

    ├── 3. Payload transformation
    │   ├── payloadMapping? → build new payload
    │   └── excludeFields? → strip fields

    └── 4. Emit to subscriber
        ├── WebSocket: nsp.to(socketId).emit("event", { topic, name, data })
        └── SSE: res.write("event: event
data: {...}

")

Examples

Example 1: E-Commerce Dashboard (WebSocket)

An admin dashboard showing live order activity:

Pattern configuration:

{
  "basics": {
    "name": "admin-dashboard",
    "description": "Live order and payment events for the admin dashboard",
    "transport": "websocket"
  },
  "topics": [
    {
      "name": "orderCreated",
      "dataObjectName": "Order",
      "topic": "order.created",
      "description": "New order placed"
    },
    {
      "name": "orderStatusChanged",
      "dataObjectName": "Order",
      "topic": "order.updated",
      "description": "Order status updated",
      "payloadMapping": "{ orderId: data.id, status: data.status, customer: data.customerName }"
    },
    {
      "name": "paymentReceived",
      "dataObjectName": "Payment",
      "topic": "payment.created",
      "description": "Payment processed",
      "excludeFields": ["stripeToken", "internalRef"]
    }
  ],
  "auth": {
    "absoluteRoles": ["superAdmin"],
    "checkRoles": ["admin", "tenantAdmin", "salesManager"]
  }
}

Frontend integration:

const socket = io(`${baseUrl}/events/admin-dashboard`, {
  auth: { token: `Bearer ${token}`, tenantCodename: "acme" },
  transports: ["websocket"],
});

socket.on("connect", () => {
  socket.emit("subscribe", {
    topics: ["orderCreated", "orderStatusChanged", "paymentReceived"],
  });
});

socket.on("subscribed", ({ topics }) => {
  console.log(`Dashboard listening to: ${topics.join(", ")}`);
});

socket.on("event", ({ name, data }) => {
  switch (name) {
    case "orderCreated":
      addToOrderFeed(data);
      incrementOrderCounter();
      break;
    case "orderStatusChanged":
      updateOrderStatus(data.orderId, data.status);
      break;
    case "paymentReceived":
      addToPaymentFeed(data);
      updateRevenue(data.amount);
      break;
  }
});

Example 2: Product Page Live Updates (SSE)

A product page that shows new reviews and stock changes in real time:

Pattern configuration:

{
  "basics": {
    "name": "product-updates",
    "description": "Live product page updates for reviews and inventory",
    "transport": "sse"
  },
  "topics": [
    {
      "name": "reviewAdded",
      "dataObjectName": "Review",
      "topic": "review.created",
      "description": "New product review submitted",
      "forceAccessLevel": "accessPublic",
      "excludeFields": ["reviewerEmail", "ipAddress"]
    },
    {
      "name": "stockChanged",
      "dataObjectName": "Inventory",
      "topic": "dbEvent:inventory:updated",
      "description": "Product stock level changed",
      "forceAccessLevel": "accessPublic",
      "payloadMapping": "{ productId: data.productId, inStock: data.quantity > 0, quantity: data.quantity }"
    }
  ],
  "auth": {
    "absoluteRoles": [],
    "checkRoles": []
  }
}

Frontend integration:

// Open SSE when the product page loads
const productId = "prod-456";
const topics = "reviewAdded,stockChanged";
const es = new EventSource(
  `${baseUrl}/events/product-updates/stream?topics=${topics}`
);

es.addEventListener("event", (e) => {
  const { name, data } = JSON.parse(e.data);

  if (name === "reviewAdded" && data.productId === productId) {
    appendReview(data);
    updateRatingAverage();
  }

  if (name === "stockChanged" && data.productId === productId) {
    updateStockBadge(data.inStock, data.quantity);
  }
});

// Clean up on page leave
window.addEventListener("beforeunload", () => es.close());

Example 3: Order Tracking with Mixed Access (WebSocket)

A customer-facing order tracker where customers see only their own orders, but support agents see all:

Pattern configuration:

{
  "basics": {
    "name": "order-tracking",
    "description": "Real-time order tracking for customers and support agents",
    "transport": "websocket"
  },
  "topics": [
    {
      "name": "orderUpdate",
      "dataObjectName": "Order",
      "topic": "order.updated",
      "description": "Order status, location, and ETA changes",
      "payloadMapping": "{ orderId: data.id, status: data.status, eta: data.estimatedDelivery, location: data.currentLocation }"
    },
    {
      "name": "deliveryAssigned",
      "dataObjectName": "Delivery",
      "topic": "delivery.created",
      "description": "Delivery driver assigned to order"
    }
  ],
  "auth": {
    "absoluteRoles": ["superAdmin", "supportAgent"],
    "checkRoles": [],
    "filterScript": null
  }
}

How authorization works:

  • Customer (role: user): Subscribes to orderUpdate and deliveryAssigned. Both DataObjects are accessPrivate, so only events where _owner == userId are delivered.
  • Support agent (role: supportAgent): Listed in absoluteRoles — receives ALL events without filtering. Sees every order and delivery.
  • Unauthenticated user: Rejected entirely (private DataObjects require auth).

Example 4: Custom Auth with Finance Data

A subscription where premium users get real-time market data but free users are restricted:

Pattern configuration:

{
  "basics": {
    "name": "market-feed",
    "description": "Real-time market price updates",
    "transport": "websocket"
  },
  "topics": [
    {
      "name": "priceUpdate",
      "dataObjectName": "MarketPrice",
      "topic": "market-prices",
      "description": "Live price ticker",
      "forceAccessLevel": "accessProtected"
    },
    {
      "name": "tradeExecuted",
      "dataObjectName": "Trade",
      "topic": "trade.created",
      "description": "Trade execution notifications"
    }
  ],
  "auth": {
    "absoluteRoles": ["superAdmin"],
    "checkRoles": [],
    "authScript": "session.subscriptionTier === 'premium' || session.subscriptionTier === 'enterprise'",
    "filterScript": "data.exchange === session.preferredExchange || !session.preferredExchange"
  }
}
  • authScript: Only premium/enterprise users can subscribe (evaluated at subscribe time).
  • filterScript: Each price update is filtered so users only see their preferred exchange (evaluated per event).

Comparison with EventToken / Realtime Service

The older pattern uses a separate Realtime Service with event tokens:

AspectEvent Token + Realtime ServiceEventSubscription
ArchitectureSeparate microserviceInside the business service
Auth modelJWT with topic patterns + filter logicDataObject access levels + custom scripts
Topic selectionToken defines all allowed topicsClient selects subset at runtime
Filter logicEncoded in token (eq, &&)Runtime filter per subscriber
Consumer lifecycleAlways runningLazy (start/stop on demand)
Payload controlNone (full payload)payloadMapping + excludeFields
TransportSocket.IO onlySocket.IO or SSE
ConfigurationCode-generated from DataObject access levelsExplicit pattern with designer control

EventSubscription is the modern replacement that gives designers explicit control over which events are exposed, how they're authorized, and what payload shape clients receive.


Generated Files

When a service has EventSubscriptions, the following files are generated:

FilePurpose
socket-layer/event-subs/{name}-runtime.jsPer-subscription runtime (topic configs, lazy consumers, controller setup)
socket-layer/EventSubscriptionController.jsBase controller (auth, subscribe, distribute, SSE handling)
socket-layer/socket-app.jsSocket.IO server (generated when hubs or WS subscriptions exist)
socket-layer/index.jsInitializes all hubs and event subscriptions

The EventSubscriptionController is a static base class (not generated per-subscription) that handles:

  • WebSocket auth middleware and connection management
  • SSE request handling with proper headers
  • Topic authorization with default + custom auth
  • Lazy Kafka consumer lifecycle
  • Per-subscriber message distribution with filtering
  • Payload transformation

Best Practices

  1. Choose transport wisely — Use WebSocket for dashboards and monitoring panels where users stay connected for minutes/hours. Use SSE for page-specific updates where the connection lives only as long as the page.

  2. Keep subscriptions focused — Group related topics into one subscription (e.g., all order-related events in order-tracking). Don't create one giant subscription with every topic in the service.

  3. Use forceAccessLevel sparingly — Default authorization from the DataObject is usually correct. Only override when a specific event should be more or less restricted than the DataObject itself.

  4. Transform payloads — Use payloadMapping to send only the fields the frontend needs. This reduces bandwidth and prevents accidental exposure of sensitive data.

  5. Use excludeFields for sensitive data — Strip internal fields like audit logs, cost breakdowns, and admin notes before delivery.

  6. Prefer topic aliases — Use order.created instead of myservice-order-created. Aliases are readable and portable.

  7. Handle rejected topics gracefully — When the server responds with rejected topics, adjust the UI. Don't retry rejected topics — they'll keep failing with the same authorization result.

  8. Clean up SSE connections — Always close EventSource on page navigation. Unclosed connections waste server resources.