Kafka Controllers
Event-driven controllers that trigger Business APIs or AI Agents from Kafka messages
Kafka Controllers
Kafka controllers enable event-driven architectures in Mindbricks. They listen to Kafka topics and automatically trigger actions when messages arrive — either executing a Business API or invoking an AI Agent.
There are two types of Kafka controllers in Mindbricks:
- Business API Kafka Controllers — Trigger a Business API when a Kafka event arrives (e.g., process an order when a payment-completed event is received)
- AI Agent Kafka Controllers — Trigger an AI Agent execution when a Kafka event arrives (e.g., classify a ticket when it's created)
Business API Kafka Controllers
Business APIs can be configured to listen to Kafka topics and execute their workflow when a message arrives. This is configured in the kafkaSettings section of a Business API.
Configuration
{
"apiOptions": {
"name": "processPaymentCompleted",
"crudType": "custom"
},
"kafkaSettings": {
"hasKafkaController": true,
"configuration": {
"kafkaTopicName": "myapp-payment-service-payment-completed",
"kafkaGroupId": null
}
}
}
Fields
| Field | Type | Description |
|---|---|---|
hasKafkaController | Boolean | Activates the Kafka controller for this API |
kafkaTopicName | String | The Kafka topic to subscribe to |
kafkaGroupId | String | Optional consumer group ID. Auto-generated if null. |
How It Works
- When the service starts, a Kafka consumer subscribes to the configured topic.
- When a message arrives, the message body is used as the request body for the Business API.
- The API's full workflow executes (validation, actions, main operation, post-actions).
- If the API raises events, those events are published as usual.
Topic Naming Convention
Mindbricks automatically generates Kafka topic names for API and database events:
{projectCodename}-{serviceName}-service-{resourceName}-{actionInPassiveForm}
Examples:
myapp-catalog-service-vehicle-createdmyapp-order-service-order-payment-completedmyapp-auth-service-user-registered
Use Cases
- Cross-service workflows: When a user registers in the auth service, automatically create a default profile in the profile service
- Async processing: When a file is uploaded, trigger virus scanning in a background service
- Event aggregation: Collect events from multiple services into a reporting or analytics pipeline
- Notifications: When an order status changes, trigger notification delivery
AI Agent Kafka Controllers
AI Agents can have their own Kafka controllers that trigger agent execution from events. This is configured in the kafkaControllers array inside an AI Agent definition.
Configuration
{
"agentBasics": {
"name": "ticketClassifier",
"executionMode": "task",
"modality": "text"
},
"kafkaControllers": [
{
"name": "onTicketCreated",
"topic": "myapp-support-service-ticket-created",
"groupId": null,
"dataMapping": "{ prompt: `Classify this ticket: ${message.title} - ${message.description}` }",
"enabled": true
}
]
}
Fields
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Unique controller identifier within the agent (codeName format) |
topic | String | Yes | Kafka topic to subscribe to |
groupId | String | No | Consumer group ID. Auto-generated as {serviceCodename}-agent-{agentName}-{controllerName} if null. |
dataMapping | MScript | Yes | Expression that transforms the Kafka message into the agent's expected input. |
enabled | Boolean | No | Default: true. Set to false to disable without removing the configuration. |
Data Mapping Context
The dataMapping expression has access to these variables:
| Variable | Description |
|---|---|
message | The parsed JSON payload of the Kafka message |
session | Extracted session object (if present in the message) |
topic | The topic name the message arrived on |
headers | Kafka message headers |
The expression must return an object matching the agent's expected input shape:
// Simple mapping
{ prompt: message.text }
// Rich mapping with metadata
{ prompt: `Analyze: ${message.title} - ${message.body}`, metadata: { id: message.id, source: topic } }
Result Events
When an AI Agent completes execution (from any trigger — REST, SSE, or Kafka), a result event is published to:
{serviceCodename}-agent-{agentName}-result
Result event payload:
{
"_eventType": "agent-result",
"_agent": "ticketClassifier",
"_source": "kafka",
"_sourceController": "onTicketCreated",
"_timestamp": "2026-03-27T10:30:00.000Z",
"session": {},
"result": { "classification": "billing", "confidence": 0.95 }
}
Multiple Controllers
An agent can listen to multiple topics, each with different data mappings:
"kafkaControllers": [
{
"name": "fromEmail",
"topic": "myapp-notification-service-email-received",
"dataMapping": "{ prompt: `Classify: ${message.subject} - ${message.body}` }",
"enabled": true
},
{
"name": "fromChat",
"topic": "myapp-chat-service-message-sent",
"dataMapping": "{ prompt: `Classify: ${message.text}`, metadata: { chatId: message.chatId } }",
"enabled": true
}
]
Comparison
| Feature | Business API Kafka Controller | AI Agent Kafka Controller |
|---|---|---|
| Trigger | Kafka message → API workflow | Kafka message → Agent execution |
| Configuration | kafkaSettings on a Business API | kafkaControllers[] on an AI Agent |
| Data Input | Message body becomes API request body | dataMapping expression transforms message |
| Output | Standard API response + events | Agent result + result event on Kafka |
| Multiple Topics | One topic per API | Multiple controllers per agent |
| Use Case | CRUD operations, data processing | AI analysis, classification, generation |
Best Practices
-
Use descriptive controller names. Names like
onTicketCreatedorfromEmailReceivedmake the event flow self-documenting. -
Set explicit group IDs for shared consumers. If multiple service instances should share the load (each message processed once), use the same group ID. If each instance should process every message, use unique group IDs.
-
Keep data mappings simple. The
dataMappingexpression should just reshape the message — don't put business logic in it. Let the agent or API handle the logic. -
Use
enabled: falsefor development. You can define controllers ahead of time and enable them when the corresponding producer service is ready. -
Monitor consumer lag. Use the
engineKafkaConsumerStatustool to check if consumers are falling behind. High lag indicates the agent or API is too slow for the message rate. -
Handle errors gracefully. If an agent or API fails processing a message, the error is logged but the consumer continues. Design your agents with appropriate error handling in the system prompt or guardrails.
Last updated Mar 27, 2026
Built with Documentation.AI