Overview
Lua devices communicate with agents over MQTT (recommended) or Socket.IO. This page documents the complete wire protocol so you can build a client in any language — Go, Rust, C#, Swift, or anything with an MQTT library.
If you’re using Node.js, Python, or MicroPython, use the official SDKs instead.
This page is for building clients in languages without an official SDK.
Authentication
Devices authenticate using three credentials passed as MQTT connection parameters:
| Parameter | MQTT Field | Format | Example |
|---|
| Agent ID | username (before :) | {agentId} | baseAgent_agent_abc123 |
| Device Name | username (after :) | {deviceName} | warehouse-scanner |
| API Key | password | api_... | api_sk_live_abc123 |
MQTT username format: {agentId}:{deviceName}
Connection settings:
- Broker:
wss://mqtt.heylua.ai/mqtt (TLS required in production)
- Client ID:
lua-{agentId}-{deviceName}
- Clean session:
false (enables persistent session for QoS 1 message queueing)
- Keep-alive:
60 seconds
MQTT Topics
All topics use the prefix lua/devices/{agentId}/{deviceName}/.
Device Subscribes To (Server -> Device)
| Topic Suffix | QoS | Description | Payload |
|---|
command | 1 | Incoming command from agent | CommandMessage |
connected | 1 | Server confirms device connection | {"message": "..."} |
trigger_ack | 1 | Server acknowledges a trigger | TriggerAckMessage |
trigger_result | 1 | Result from trigger execution | {"triggerName": "...", "result": ...} |
error | 1 | Error from server | {"code": "...", "message": "..."} |
Device Publishes To (Device -> Server)
| Topic Suffix | QoS | Retain | Description | Payload |
|---|
status | 1 | Yes | Online/offline status (no secrets) | StatusMessage |
status | 1 | No | Auth + command manifest | AuthStatusMessage |
response | 1 | No | Command execution result | ResponseMessage |
trigger | 1 | No | Fire a trigger to the agent | TriggerMessage |
heartbeat | 0 | No | Keep-alive signal | Empty payload ("") |
Socket.IO Events
If you prefer WebSocket transport, connect to {serverUrl}/devices with Socket.IO.
| Event | Direction | Description | Payload |
|---|
connected | Server -> Device | Connection confirmed | {} |
command | Server -> Device | Incoming command | CommandMessage + ack callback |
trigger_ack | Server -> Device | Trigger acknowledged | TriggerAckMessage |
trigger_result | Server -> Device | Trigger execution result | {triggerName, result} |
error | Server -> Device | Error | {code, message} |
response | Device -> Server | Command result | ResponseMessage |
trigger | Device -> Server | Fire trigger | TriggerMessage |
heartbeat | Device -> Server | Keep-alive | {} |
Socket.IO auth is passed in the auth option at connection time:
{
"apiKey": "api_sk_live_abc123",
"agentId": "baseAgent_agent_abc123",
"deviceName": "warehouse-scanner",
"group": "warehouse-a",
"commands": [...]
}
Message Schemas
CommandMessage
Received on the command topic when the agent invokes a device command.
{
"commandId": "cmd_abc123",
"command": "scan_barcode",
"payload": { "format": "qr" },
"timeout": 30000
}
| Field | Type | Required | Description |
|---|
commandId | string | Yes | Unique ID for idempotency |
command | string | Yes | Command name matching a registered handler |
payload | any | No | Input parameters for the command |
timeout | number | No | Timeout in milliseconds (default: 30000) |
ResponseMessage
Published to the response topic after executing a command.
{
"commandId": "cmd_abc123",
"success": true,
"data": { "barcode": "ABC-12345", "format": "CODE128" }
}
{
"commandId": "cmd_abc123",
"success": false,
"error": "Scanner hardware not responding"
}
| Field | Type | Required | Description |
|---|
commandId | string | Yes | Must match the incoming command’s ID |
success | boolean | Yes | Whether the command succeeded |
data | any | No | Result data (on success) |
error | string | No | Error message (on failure) |
TriggerMessage
Published to the trigger topic to fire an event to the agent.
{
"triggerName": "barcode_scanned",
"payload": { "value": "ABC-12345", "location": "aisle-3" }
}
| Field | Type | Required | Description |
|---|
triggerName | string | Yes | Name of the trigger |
payload | any | No | Trigger data sent to the agent |
TriggerAckMessage
Received on the trigger_ack topic after the server processes a trigger.
{
"triggerId": "trg_abc123",
"received": true
}
| Field | Type | Required | Description |
|---|
triggerId | string | Yes | Server-assigned trigger ID |
received | boolean | Yes | Whether the trigger was accepted |
error | string | No | Error message if rejected |
StatusMessage (retained)
Published to the status topic with retain: true. This message must NOT contain secrets (the API key) because retained messages are stored by the broker and delivered to any future subscriber.
{
"status": "online",
"timestamp": "2026-04-17T10:30:00.000Z",
"group": "warehouse-a"
}
AuthStatusMessage (non-retained)
Published to the status topic with retain: false immediately after the retained status. Contains the API key and command manifest for server-side authentication.
{
"status": "online",
"apiKey": "api_sk_live_abc123",
"group": "warehouse-a",
"commands": [
{
"name": "scan_barcode",
"description": "Scan a barcode and return its value",
"inputSchema": {
"type": "object",
"properties": {
"format": { "type": "string", "enum": ["qr", "code128", "ean13"] }
}
},
"timeoutMs": 30000
}
]
}
Heartbeat
Published to the heartbeat topic every 30 seconds with an empty payload. QoS 0 (fire-and-forget).
Last Will and Testament (LWT)
Set the MQTT LWT to publish an offline status if the device disconnects unexpectedly:
- Topic:
lua/devices/{agentId}/{deviceName}/status
- Payload:
{"status": "offline", "timestamp": "..."}
- QoS: 1
- Retain: true
Self-Describing Commands
Commands are sent at connect time in the AuthStatusMessage. The server registers them as agent tools automatically — no compile/push cycle needed.
{
"name": "read_temperature",
"description": "Read current temperature in celsius from the DHT22 sensor",
"inputSchema": {
"type": "object",
"properties": {
"unit": { "type": "string", "enum": ["celsius", "fahrenheit"] }
}
},
"timeoutMs": 5000,
"retry": { "maxAttempts": 3, "backoffMs": 1000 }
}
| Field | Type | Required | Description |
|---|
name | string | Yes | Command name (used by agent to invoke) |
description | string | Yes | Shown to the AI agent as tool description |
inputSchema | object | No | JSON Schema for command parameters |
timeoutMs | number | No | Timeout in ms (default: 30000) |
retry | object | No | {maxAttempts, backoffMs} |
Command Lifecycle
Idempotency
Commands include a commandId that must be used for idempotency. Your client should:
- Maintain an LRU cache of recently seen
commandId values (recommended: 1000 entries, 5-minute TTL)
- On receiving a command, check if the
commandId has been seen before
- If seen, re-publish the cached response without re-executing the handler
- If new, execute the handler, cache the response, then publish it
This ensures that retried messages (common with QoS 1) do not cause duplicate side effects.
Without idempotency handling, MQTT QoS 1 redelivery can cause commands to execute multiple times.
Always implement the dedup cache.
Rate Limits and Constraints
| Constraint | Value |
|---|
| Max payload size (MQTT) | 256 KB |
| Max commands per device | 50 |
| Heartbeat interval | 30 seconds |
| Trigger ACK timeout | 10 seconds |
| Command default timeout | 30 seconds |
| Dedup cache TTL | 5 minutes |
| Dedup cache size | 1000 entries |
| Reconnect base delay | 1 second |
| Reconnect max delay | 30 seconds |
| MQTT keep-alive | 60 seconds |
Example Implementations
package main
import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"os/signal"
"time"
)
func main() {
agentID := "baseAgent_agent_abc123"
deviceName := "go-sensor"
apiKey := "api_your_key"
prefix := fmt.Sprintf("lua/devices/%s/%s/", agentID, deviceName)
opts := mqtt.NewClientOptions().
AddBroker("wss://mqtt.heylua.ai/mqtt").
SetClientID(fmt.Sprintf("lua-%s-%s", agentID, deviceName)).
SetUsername(fmt.Sprintf("%s:%s", agentID, deviceName)).
SetPassword(apiKey).
SetKeepAlive(60 * time.Second).
SetCleanSession(false).
SetWill(prefix+"status",
`{"status":"offline","timestamp":"`+time.Now().UTC().Format(time.RFC3339)+`"}`,
1, true)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// Subscribe to commands
client.Subscribe(prefix+"command", 1, func(c mqtt.Client, msg mqtt.Message) {
var cmd map[string]interface{}
json.Unmarshal(msg.Payload(), &cmd)
response := map[string]interface{}{
"commandId": cmd["commandId"],
"success": true,
"data": map[string]interface{}{"temp": 22.5},
}
payload, _ := json.Marshal(response)
c.Publish(prefix+"response", 1, false, payload)
})
// Publish online status
online, _ := json.Marshal(map[string]string{"status": "online", "timestamp": time.Now().UTC().Format(time.RFC3339)})
client.Publish(prefix+"status", 1, true, online)
auth, _ := json.Marshal(map[string]interface{}{"status": "online", "apiKey": apiKey, "commands": []interface{}{}})
client.Publish(prefix+"status", 1, false, auth)
fmt.Println("Device connected. Press Ctrl+C to exit.")
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
client.Disconnect(250)
}
use rumqttc::{MqttOptions, AsyncClient, QoS, Event, Packet, LastWill};
use serde_json::{json, Value};
use tokio;
#[tokio::main]
async fn main() {
let agent_id = "baseAgent_agent_abc123";
let device_name = "rust-sensor";
let api_key = "api_your_key";
let prefix = format!("lua/devices/{agent_id}/{device_name}/");
let mut opts = MqttOptions::new(
format!("lua-{agent_id}-{device_name}"),
"wss://mqtt.heylua.ai/mqtt", 443,
);
opts.set_credentials(format!("{agent_id}:{device_name}"), api_key);
opts.set_keep_alive(std::time::Duration::from_secs(60));
opts.set_clean_session(false);
opts.set_last_will(LastWill::new(
format!("{prefix}status"),
json!({"status": "offline"}).to_string(),
QoS::AtLeastOnce, true,
));
opts.set_transport(rumqttc::Transport::tls_with_default_config());
let (client, mut eventloop) = AsyncClient::new(opts, 10);
client.subscribe(format!("{prefix}command"), QoS::AtLeastOnce).await.unwrap();
// Publish online status
let online = json!({"status": "online"}).to_string();
client.publish(format!("{prefix}status"), QoS::AtLeastOnce, true, online).await.unwrap();
let auth = json!({"status": "online", "apiKey": api_key, "commands": []}).to_string();
client.publish(format!("{prefix}status"), QoS::AtLeastOnce, false, auth).await.unwrap();
println!("Device connected. Listening for commands...");
while let Ok(event) = eventloop.poll().await {
if let Event::Incoming(Packet::Publish(msg)) = event {
if msg.topic.ends_with("/command") {
let cmd: Value = serde_json::from_slice(&msg.payload).unwrap();
let response = json!({
"commandId": cmd["commandId"],
"success": true,
"data": {"temp": 22.5}
});
client.publish(
format!("{prefix}response"), QoS::AtLeastOnce, false,
response.to_string(),
).await.unwrap();
}
}
}
}
using MQTTnet;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;
var agentId = "baseAgent_agent_abc123";
var deviceName = "csharp-sensor";
var apiKey = "api_your_key";
var prefix = $"lua/devices/{agentId}/{deviceName}/";
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("wss://mqtt.heylua.ai/mqtt", 443)
.WithTlsOptions(o => o.UseTls())
.WithClientId($"lua-{agentId}-{deviceName}")
.WithCredentials($"{agentId}:{deviceName}", apiKey)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithCleanSession(false)
.WithWillTopic($"{prefix}status")
.WithWillPayload(JsonSerializer.Serialize(new { status = "offline" }))
.WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.WithWillRetain(true)
.Build();
client.ApplicationMessageReceivedAsync += async e =>
{
if (e.ApplicationMessage.Topic.EndsWith("/command"))
{
var cmd = JsonSerializer.Deserialize<JsonElement>(e.ApplicationMessage.PayloadSegment);
var response = JsonSerializer.Serialize(new
{
commandId = cmd.GetProperty("commandId").GetString(),
success = true,
data = new { temp = 22.5 }
});
await client.PublishStringAsync($"{prefix}response", response,
MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
}
};
await client.ConnectAsync(options);
await client.SubscribeAsync($"{prefix}command",
MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
// Publish online status
await client.PublishStringAsync($"{prefix}status",
JsonSerializer.Serialize(new { status = "online" }),
MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true);
await client.PublishStringAsync($"{prefix}status",
JsonSerializer.Serialize(new { status = "online", apiKey, commands = Array.Empty<object>() }),
MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce, false);
Console.WriteLine("Device connected. Press Ctrl+C to exit.");
await Task.Delay(Timeout.Infinite);
import MQTTNIO
import NIO
import Foundation
let agentId = "baseAgent_agent_abc123"
let deviceName = "swift-sensor"
let apiKey = "api_your_key"
let prefix = "lua/devices/\(agentId)/\(deviceName)/"
let client = MQTTClient(
configuration: .init(
target: .webSocket("wss://mqtt.heylua.ai/mqtt"),
tls: .forClient(certificateVerification: .fullVerification),
clientId: "lua-\(agentId)-\(deviceName)",
clean: false,
credentials: .init(
username: "\(agentId):\(deviceName)",
password: apiKey
),
willMessage: .init(
topic: "\(prefix)status",
payload: ByteBuffer(string: #"{"status":"offline"}"#),
qos: .atLeastOnce,
retain: true
),
keepAliveInterval: .seconds(60)
),
eventLoopGroupProvider: .createNew
)
try client.connect().wait()
try client.subscribe(to: [.init(topicFilter: "\(prefix)command", qos: .atLeastOnce)]).wait()
// Publish online status
let online = #"{"status":"online"}"#
try client.publish(.init(topic: "\(prefix)status",
payload: ByteBuffer(string: online), qos: .atLeastOnce, retain: true)).wait()
let auth = #"{"status":"online","apiKey":"\#(apiKey)","commands":[]}"#
try client.publish(.init(topic: "\(prefix)status",
payload: ByteBuffer(string: auth), qos: .atLeastOnce, retain: false)).wait()
client.addPublishListener(named: "commands") { result in
if case .success(let msg) = result, msg.topic.hasSuffix("/command") {
guard let data = msg.payload.getData(at: 0, length: msg.payload.readableBytes),
let cmd = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
let commandId = cmd["commandId"] as? String else { return }
let response: [String: Any] = [
"commandId": commandId, "success": true,
"data": ["temp": 22.5]
]
let payload = try! JSONSerialization.data(withJSONObject: response)
_ = try? client.publish(.init(topic: "\(prefix)response",
payload: ByteBuffer(data: payload), qos: .atLeastOnce, retain: false)).wait()
}
}
print("Device connected. Listening for commands...")
dispatchMain()
These examples show the minimal connect-and-handle pattern. Production clients should add:
idempotency dedup, heartbeat loop, LWT, graceful shutdown, error handling, and auto-reconnect
with exponential backoff.