MQTT API
VieLang IoT hỗ trợ MQTT 3.1 và 3.1.1. Thiết bị kết nối qua MQTT để gửi telemetry, nhận lệnh RPC và đồng bộ attributes.
Thông tin kết nối
| Tham số | Giá trị |
|---|---|
| Host | localhost (hoặc domain của bạn) |
| Port | 1883 (plain) / 8883 (TLS) |
| QoS | 0 (at most once) hoặc 1 (at least once) |
| Username | Access Token hoặc MQTT username |
| Password | (để trống nếu dùng Access Token) |
Xác thực
Access Token
mosquitto_pub -u "YOUR_ACCESS_TOKEN" ...Basic MQTT Credentials
mosquitto_pub --id "client-id" -u "username" -P "password" ...X.509 Certificate (TLS)
mosquitto_pub --cafile ca.crt --cert device.crt --key device.key ...Telemetry API
Gửi telemetry
Topic: v1/devices/me/telemetry
Format 1 — Key-value đơn giản:
{"temperature": 25.5, "humidity": 68, "voltage": 3.7}Format 2 — Mảng nhiều objects:
[
{"temperature": 25.5, "humidity": 68},
{"voltage": 3.7, "current": 0.5}
]Format 3 — Có timestamp (ms):
{
"ts": 1735000000000,
"values": {"temperature": 25.5, "humidity": 68}
}Ví dụ với mosquitto:
# Đơn giản
mosquitto_pub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/telemetry" \
-m '{"temperature": 25.5, "humidity": 68}'
# Nhiều điểm với timestamp
mosquitto_pub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/telemetry" \
-m '[{"ts":1735000000000,"values":{"temperature":25.5}},{"ts":1735000005000,"values":{"temperature":25.8}}]'Attributes API
Gửi client-side attributes
Topic: v1/devices/me/attributes
mosquitto_pub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/attributes" \
-m '{"firmware": "v2.1.0", "battery": 87, "model": "ESP32"}'Yêu cầu shared attributes
Request topic: v1/devices/me/attributes/request/{requestId}
Response topic: v1/devices/me/attributes/response/{requestId}
# Subscribe nhận phản hồi trước
mosquitto_sub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/attributes/response/+"
# Gửi yêu cầu
mosquitto_pub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/attributes/request/1" \
-m '{"sharedKeys": "targetFirmware,maxTemperature"}'
# Response:
# {"shared": {"targetFirmware": "v2.2.0", "maxTemperature": 35}}Subscribe cập nhật shared attributes (real-time)
mosquitto_sub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/attributes"
# Khi server thay đổi shared attribute:
# {"targetFirmware": "v2.3.0"}RPC API
Nhận lệnh từ server (Server-side RPC)
# Subscribe nhận lệnh
mosquitto_sub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/rpc/request/+"
# Khi nhận lệnh:
# Topic: v1/devices/me/rpc/request/1
# Payload: {"method": "setGpio", "params": {"pin": 4, "value": 1}}
# Gửi phản hồi
mosquitto_pub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/rpc/response/1" \
-m '{"result": "ok", "pin": 4, "value": 1}'Gửi yêu cầu đến server (Client-side RPC)
# Gửi request
mosquitto_pub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/rpc/request/1" \
-m '{"method": "getCurrentTime", "params": {}}'
# Subscribe nhận phản hồi
mosquitto_sub -h localhost -p 1883 \
-u "ACCESS_TOKEN" \
-t "v1/devices/me/rpc/response/1"
# Phản hồi từ server:
# {"currentTime": 1735000000000}Kết nối sự kiện (Connect / Disconnect)
Khi thiết bị kết nối hoặc ngắt kết nối, Rule Engine nhận event tương ứng (CONNECT_EVENT, DISCONNECT_EVENT) để xử lý logic như cập nhật trạng thái, gửi thông báo.
Code mẫu — Python
import paho.mqtt.client as mqtt
import json, time
ACCESS_TOKEN = "YOUR_ACCESS_TOKEN"
BROKER = "localhost"
PORT = 1883
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("✓ Đã kết nối")
# Subscribe nhận lệnh RPC
client.subscribe("v1/devices/me/rpc/request/+")
else:
print(f"✗ Lỗi kết nối: {rc}")
def on_message(client, userdata, msg):
# Xử lý lệnh RPC
if "rpc/request" in msg.topic:
req_id = msg.topic.split("/")[-1]
data = json.loads(msg.payload)
print(f"RPC: {data['method']} ({data['params']})")
# Phản hồi
client.publish(
f"v1/devices/me/rpc/response/{req_id}",
json.dumps({"result": "ok"})
)
client = mqtt.Client()
client.username_pw_set(ACCESS_TOKEN)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, 60)
client.loop_start()
try:
while True:
# Gửi telemetry mỗi 5 giây
payload = {"temperature": 25.5, "humidity": 68}
client.publish("v1/devices/me/telemetry", json.dumps(payload))
time.sleep(5)
except KeyboardInterrupt:
client.loop_stop()Error Codes
| Code | Ý nghĩa |
|---|---|
| Connection Refused (3) | Access Token sai |
| Connection Refused (5) | Không có quyền |
Custom Topics (Device Profile)
Thay vì dùng topic mặc định v1/devices/me/..., bạn có thể cấu hình topic tùy chỉnh trong Device Profile → MQTT Transport Configuration:
Telemetry: factory/{deviceId}/sensors/data
Attributes: factory/{deviceId}/config
RPC request: factory/{deviceId}/commands/in
RPC response: factory/{deviceId}/commands/out