Mastering MindbricksKafka Controllers

Kafka Controllers

Event-driven controllers that trigger Business APIs or AI Agents from Kafka messages

Kafka Controllers

Kafka controllers enable event-driven architectures in Mindbricks. They listen to Kafka topics and automatically trigger actions when messages arrive — either executing a Business API or invoking an AI Agent.

There are two types of Kafka controllers in Mindbricks:

  1. Business API Kafka Controllers — Trigger a Business API when a Kafka event arrives (e.g., process an order when a payment-completed event is received)
  2. AI Agent Kafka Controllers — Trigger an AI Agent execution when a Kafka event arrives (e.g., classify a ticket when it's created)

Business API Kafka Controllers

Business APIs can be configured to listen to Kafka topics and execute their workflow when a message arrives. This is configured in the kafkaSettings section of a Business API.

Configuration

{
  "apiOptions": {
    "name": "processPaymentCompleted",
    "crudType": "custom"
  },
  "kafkaSettings": {
    "hasKafkaController": true,
    "configuration": {
      "kafkaTopicName": "myapp-payment-service-payment-completed",
      "kafkaGroupId": null
    }
  }
}

Fields

FieldTypeDescription
hasKafkaControllerBooleanActivates the Kafka controller for this API
kafkaTopicNameStringThe Kafka topic to subscribe to
kafkaGroupIdStringOptional consumer group ID. Auto-generated if null.

How It Works

  1. When the service starts, a Kafka consumer subscribes to the configured topic.
  2. When a message arrives, the message body is used as the request body for the Business API.
  3. The API's full workflow executes (validation, actions, main operation, post-actions).
  4. If the API raises events, those events are published as usual.

Topic Naming Convention

Mindbricks automatically generates Kafka topic names for API and database events:

{projectCodename}-{serviceName}-service-{resourceName}-{actionInPassiveForm}

Examples:

  • myapp-catalog-service-vehicle-created
  • myapp-order-service-order-payment-completed
  • myapp-auth-service-user-registered

Use Cases

  • Cross-service workflows: When a user registers in the auth service, automatically create a default profile in the profile service
  • Async processing: When a file is uploaded, trigger virus scanning in a background service
  • Event aggregation: Collect events from multiple services into a reporting or analytics pipeline
  • Notifications: When an order status changes, trigger notification delivery

AI Agent Kafka Controllers

AI Agents can have their own Kafka controllers that trigger agent execution from events. This is configured in the kafkaControllers array inside an AI Agent definition.

Configuration

{
  "agentBasics": {
    "name": "ticketClassifier",
    "executionMode": "task",
    "modality": "text"
  },
  "kafkaControllers": [
    {
      "name": "onTicketCreated",
      "topic": "myapp-support-service-ticket-created",
      "groupId": null,
      "dataMapping": "{ prompt: `Classify this ticket: ${message.title} - ${message.description}` }",
      "enabled": true
    }
  ]
}

Fields

FieldTypeRequiredDescription
nameStringYesUnique controller identifier within the agent (codeName format)
topicStringYesKafka topic to subscribe to
groupIdStringNoConsumer group ID. Auto-generated as {serviceCodename}-agent-{agentName}-{controllerName} if null.
dataMappingMScriptYesExpression that transforms the Kafka message into the agent's expected input.
enabledBooleanNoDefault: true. Set to false to disable without removing the configuration.

Data Mapping Context

The dataMapping expression has access to these variables:

VariableDescription
messageThe parsed JSON payload of the Kafka message
sessionExtracted session object (if present in the message)
topicThe topic name the message arrived on
headersKafka message headers

The expression must return an object matching the agent's expected input shape:

// Simple mapping
{ prompt: message.text }

// Rich mapping with metadata
{ prompt: `Analyze: ${message.title} - ${message.body}`, metadata: { id: message.id, source: topic } }

Result Events

When an AI Agent completes execution (from any trigger — REST, SSE, or Kafka), a result event is published to:

{serviceCodename}-agent-{agentName}-result

Result event payload:

{
  "_eventType": "agent-result",
  "_agent": "ticketClassifier",
  "_source": "kafka",
  "_sourceController": "onTicketCreated",
  "_timestamp": "2026-03-27T10:30:00.000Z",
  "session": {},
  "result": { "classification": "billing", "confidence": 0.95 }
}

Multiple Controllers

An agent can listen to multiple topics, each with different data mappings:

"kafkaControllers": [
  {
    "name": "fromEmail",
    "topic": "myapp-notification-service-email-received",
    "dataMapping": "{ prompt: `Classify: ${message.subject} - ${message.body}` }",
    "enabled": true
  },
  {
    "name": "fromChat",
    "topic": "myapp-chat-service-message-sent",
    "dataMapping": "{ prompt: `Classify: ${message.text}`, metadata: { chatId: message.chatId } }",
    "enabled": true
  }
]

Comparison

FeatureBusiness API Kafka ControllerAI Agent Kafka Controller
TriggerKafka message → API workflowKafka message → Agent execution
ConfigurationkafkaSettings on a Business APIkafkaControllers[] on an AI Agent
Data InputMessage body becomes API request bodydataMapping expression transforms message
OutputStandard API response + eventsAgent result + result event on Kafka
Multiple TopicsOne topic per APIMultiple controllers per agent
Use CaseCRUD operations, data processingAI analysis, classification, generation

Best Practices

  1. Use descriptive controller names. Names like onTicketCreated or fromEmailReceived make the event flow self-documenting.

  2. Set explicit group IDs for shared consumers. If multiple service instances should share the load (each message processed once), use the same group ID. If each instance should process every message, use unique group IDs.

  3. Keep data mappings simple. The dataMapping expression should just reshape the message — don't put business logic in it. Let the agent or API handle the logic.

  4. Use enabled: false for development. You can define controllers ahead of time and enable them when the corresponding producer service is ready.

  5. Monitor consumer lag. Use the engineKafkaConsumerStatus tool to check if consumers are falling behind. High lag indicates the agent or API is too slow for the message rate.

  6. Handle errors gracefully. If an agent or API fails processing a message, the error is logged but the consumer continues. Design your agents with appropriate error handling in the system prompt or guardrails.