Skip to main content

Overview

The MicroPython client (lua_device.py) runs on microcontrollers with as little as 264KB of RAM. It uses MQTT natively and supports the full device protocol: commands, triggers, heartbeats, reconnection, and idempotency dedup.

Hardware Requirements

ComponentMinimumRecommended
BoardRaspberry Pi Pico WAny MicroPython board with WiFi
RAM264 KB512 KB+
Flash2 MB2 MB
MicroPythonv1.20+Latest stable
NetworkWiFi (2.4 GHz)WiFi with stable connection
The client depends on umqtt.robust (preferred) or umqtt.simple, both included in standard MicroPython firmware for the Pico W.

Setup

1

Flash MicroPython firmware

Download the latest MicroPython UF2 from micropython.org and flash it to your Pico W.
2

Copy the client library

Copy lua_device.py to your Pico W. You can use Thonny, mpremote, or rshell:
mpremote cp lua_device.py :lua_device.py
3

Connect to WiFi

Create a main.py that connects to WiFi before creating the device:
import network
import time

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("YOUR_SSID", "YOUR_PASSWORD")

while not wlan.isconnected():
    time.sleep(0.5)

print("WiFi connected:", wlan.ifconfig()[0])
4

Create your device

Add the device setup after WiFi connection:
from lua_device import LuaDevice

device = LuaDevice(
    agent_id="your-agent-id",
    api_key="your-api-key",
    device_name="pico-sensor",
    server="mqtt.heylua.ai",
)
5

Register commands and run

Register command handlers and start the main loop:
@device.command("read_sensor")
def read_sensor(payload):
    return {"temperature": 22.5, "humidity": 60}

device.connect()
device.run()  # blocks forever, handles commands

LuaDevice Class Reference

Constructor

LuaDevice(agent_id, api_key, device_name, server, port=8883, group=None, use_ssl=True)
ParameterTypeDefaultDescription
agent_idstrrequiredAgent ID to connect to
api_keystrrequiredAPI key for authentication
device_namestrrequiredUnique name for this device
serverstrrequiredMQTT broker hostname (e.g., mqtt.heylua.ai)
portint8883MQTT broker port
groupstrNoneOptional device group name
use_sslboolTrueEnable TLS encryption

Methods

MethodDescription
connect()Connect to the MQTT broker. Sets up LWT and subscriptions.
disconnect()Gracefully disconnect. Publishes offline status first.
run(check_interval_ms=100)Main loop. Blocks forever, checks for messages and sends heartbeats.
trigger(name, payload=None)Fire a trigger event to the agent.
on_command(name, handler)Register a command handler (non-decorator style).

The @device.command Decorator

The preferred way to register command handlers:
@device.command("led_on")
def led_on(payload):
    # payload is a dict with whatever the agent sent
    pin = machine.Pin("LED", machine.Pin.OUT)
    pin.on()
    return {"status": "on"}
The handler receives a payload dict and must return a dict (or None). If the handler raises an exception, the error message is sent back to the agent.

Complete Example: LED + DHT22

A Pico W that controls an onboard LED and reads a DHT22 temperature/humidity sensor:
import network
import machine
import time
import dht

# -- WiFi --
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("YOUR_SSID", "YOUR_PASSWORD")

while not wlan.isconnected():
    time.sleep(0.5)

print("WiFi connected:", wlan.ifconfig()[0])

# -- Sensor setup --
led = machine.Pin("LED", machine.Pin.OUT)
dht_sensor = dht.DHT22(machine.Pin(15))

# -- Device --
from lua_device import LuaDevice

device = LuaDevice(
    agent_id="your-agent-id",
    api_key="your-api-key",
    device_name="pico-env-sensor",
    server="mqtt.heylua.ai",
    group="office-sensors",
)

@device.command("led_on")
def led_on(payload):
    led.on()
    return {"status": "on"}

@device.command("led_off")
def led_off(payload):
    led.off()
    return {"status": "off"}

@device.command("read_environment")
def read_environment(payload):
    dht_sensor.measure()
    return {
        "temperature": dht_sensor.temperature(),
        "humidity": dht_sensor.humidity(),
        "led": "on" if led.value() else "off",
    }

@device.command("blink")
def blink(payload):
    count = payload.get("count", 3)
    delay = payload.get("delay_ms", 200)
    for _ in range(count):
        led.on()
        time.sleep_ms(delay)
        led.off()
        time.sleep_ms(delay)
    return {"blinked": count}

# -- Connect and run --
device.connect()

# Optional: fire a trigger every 60s if temperature is high
last_check = time.time()

while True:
    try:
        device._client.check_msg()

        now = time.time()
        if now - device._last_heartbeat >= device._heartbeat_interval:
            device._client.publish(device._topic_prefix + "heartbeat", b"", qos=0)
            device._last_heartbeat = now

        # Periodic temperature check
        if now - last_check >= 60:
            dht_sensor.measure()
            temp = dht_sensor.temperature()
            if temp > 30:
                device.trigger("high_temperature", {
                    "temperature": temp,
                    "threshold": 30,
                })
            last_check = now

        time.sleep_ms(100)

    except OSError as e:
        print("Connection lost:", e)
        device._reconnect()
    except Exception as e:
        print("Error:", e)
        time.sleep(1)

Troubleshooting

The Pico W cannot reach the MQTT broker. Check:
  • WiFi is connected (wlan.isconnected() returns True)
  • DNS resolution works (try socket.getaddrinfo("mqtt.heylua.ai", 8883))
  • No firewall blocking port 8883
Authentication failed. Verify:
  • agent_id matches your agent exactly
  • api_key is valid and not expired
  • device_name contains only lowercase letters, numbers, and hyphens
The Pico W has limited RAM. Try:
  • Reduce _dedup_ttl (default 300 seconds) if you are processing many commands
  • Avoid large payloads in command responses
  • Use gc.collect() periodically in your main loop
  • Compile lua_device.py to .mpy bytecode with mpy-cross to save RAM
Ensure:
  • device.connect() completed without error
  • You are calling device.run() or manually calling device._client.check_msg() in a loop
  • The command name in your handler matches the name the agent is using
Some MicroPython builds have limited TLS support. Try:
  • Update to the latest MicroPython firmware
  • Set use_ssl=False temporarily for debugging (not recommended in production)

Next Steps

Pico W Setup Guide

Step-by-step hardware setup with photos and wiring

Industrial Sensor Example

Complete factory monitoring example on Pico W

MQTT Transport

MQTT topic structure and QoS details

Triggers

Send events from your device to the agent