Rule Engine
Rule Engine là hệ thống xử lý sự kiện theo luồng (flow-based processing). Mọi dữ liệu từ thiết bị đều đi qua Rule Engine trước khi được lưu trữ hoặc kích hoạt hành động.
Khái niệm
Message (Thông điệp)
Đơn vị xử lý trong Rule Engine. Mỗi message gồm:
- Originator — Thực thể nguồn (device, asset...)
- Type — Loại message (POST_TELEMETRY, POST_ATTRIBUTES, RPC...)
- Payload — Nội dung JSON
- Metadata — Thông tin bổ sung (deviceName, deviceType, ts...)
Rule Chain
Đồ thị xử lý, gồm nhiều Rule Node kết nối nhau. Mỗi tenant có một Root Rule Chain mặc định.
Rule Node
Một bước xử lý. Nhận message → xử lý → chuyển tiếp qua relation có tên (Success, Failure, True, False, Custom...).
Relation
Cạnh nối giữa các node, mang nhãn kết quả. Node tiếp theo chỉ nhận message nếu relation label khớp.
Luồng xử lý mặc định
Thiết bị gửi telemetry
↓
[Rule Chain Input]
↓
[Message Type Filter] → True → [Save Telemetry] → [Create Alarm?]
↓ False
(discard)
Message Types
| Type | Khi nào |
|---|---|
POST_TELEMETRY_REQUEST | Thiết bị gửi telemetry |
POST_ATTRIBUTES_REQUEST | Thiết bị gửi attributes |
TO_SERVER_RPC_REQUEST | Thiết bị gửi RPC request |
RPC_CALL_FROM_SERVER_TO_DEVICE | Server gửi lệnh đến thiết bị |
CONNECT_EVENT | Thiết bị kết nối |
DISCONNECT_EVENT | Thiết bị ngắt kết nối |
ACTIVITY_EVENT | Thiết bị active |
INACTIVITY_EVENT | Thiết bị inactive |
Các loại Node
Filter Nodes — Lọc message
| Node | Mô tả |
|---|---|
| Message Type Filter | Lọc theo type (POST_TELEMETRY...) |
| Script Filter | Lọc bằng JavaScript expression |
| Threshold Filter | So sánh giá trị số với ngưỡng |
| GPS Geofencing Filter | Lọc theo vị trí địa lý |
| Check Relation | Kiểm tra quan hệ giữa các thực thể |
| Originator Type Filter | Lọc theo loại originator |
Transform Nodes — Biến đổi message
| Node | Mô tả |
|---|---|
| Transform Message (Script) | Biến đổi payload bằng JavaScript |
| Rename Keys | Đổi tên key trong JSON |
| Copy Keys | Sao chép key từ payload sang metadata |
| Delete Keys | Xóa key khỏi payload |
| Math Node | Tính toán (cộng, nhân, sqrt...) |
| Change Originator | Đổi entity nguồn của message |
| To Email | Chuyển message thành email template |
Enrichment Nodes — Làm giàu dữ liệu
| Node | Mô tả |
|---|---|
| Originator Attributes | Thêm attributes của originator vào metadata |
| Originator Fields | Thêm fields (name, type) vào metadata |
| Customer Attributes | Thêm attributes của customer |
| Tenant Attributes | Thêm attributes của tenant |
| Related Attributes | Thêm attributes từ entity liên quan |
| Get Telemetry | Thêm telemetry vào metadata |
| Device Profile | Thêm thông tin device profile |
Action Nodes — Thực hiện hành động
| Node | Mô tả |
|---|---|
| Save Telemetry | Lưu telemetry vào database |
| Save Attributes | Lưu attributes |
| Create/Clear Alarm | Tạo hoặc xóa alarm |
| Send Email | Gửi email qua SMTP |
| Send SMS | Gửi SMS |
| Send Notification | Gửi thông báo hệ thống |
| REST API Call | Gọi API bên ngoài |
| MQTT Publish | Publish đến MQTT broker ngoài |
| Kafka Publish | Gửi message đến Kafka |
| RabbitMQ Publish | Gửi đến RabbitMQ |
| AWS SQS/SNS/Lambda | Tích hợp AWS |
| GCP Pub/Sub | Tích hợp Google Cloud |
| Log | Ghi log để debug |
| Send RPC Reply | Phản hồi RPC từ thiết bị |
| Assign to Customer | Giao thiết bị cho customer |
Flow Nodes — Điều hướng luồng
| Node | Mô tả |
|---|---|
| Rule Chain Input | Điểm vào của rule chain |
| Rule Chain Output | Chuyển sang rule chain khác |
| Message Type Switch | Phân nhánh theo message type |
| Device Type Switch | Phân nhánh theo device type |
| Delay | Trì hoãn xử lý |
| Checkpoint | Lưu trạng thái xử lý |
| Sync | Đồng bộ nhiều luồng song song |
Viết Script trong Node
Script dùng JavaScript (TBEL) — biến thể thiết kế cho IoT:
Script Filter
// Chỉ xử lý khi temperature > 35
return msg.temperature !== undefined && msg.temperature > 35;Transform Message
// Thêm trường tính toán, đổi tên
var newMsg = {
temp_celsius: msg.temperature,
temp_fahrenheit: msg.temperature * 9/5 + 32,
humidity: msg.humidity,
ts: metadata.ts
};
return { msg: newMsg, metadata: metadata, msgType: msgType };Enrichment Script
// Truy cập attributes từ metadata
var threshold = parseFloat(metadata.maxTemperatureThreshold);
if (msg.temperature > threshold) {
metadata.alertLevel = 'HIGH';
}
return { msg: msg, metadata: metadata, msgType: msgType };Ví dụ Rule Chain hoàn chỉnh
Yêu cầu: Khi nhiệt độ > 35°C kéo dài 5 phút → tạo alarm + gửi email
[Message Type Filter]
type == POST_TELEMETRY_REQUEST
│ True
▼
[Script Filter]
msg.temperature !== undefined
│ True
▼
[Threshold Filter]
temperature > 35
│ True
▼
[Create Alarm]
type: "High Temperature"
severity: CRITICAL
duration: 5 phút
│ Created
▼
[Originator Attributes]
server scope: contactEmail
│ Success
▼
[To Email]
to: ${metadata.contactEmail}
subject: Cảnh báo nhiệt độ cao!
body: Thiết bị ${metadata.deviceName}: ${msg.temperature}°C
│ Success
▼
[Send Email]
Quản lý Rule Chain qua API
# Lấy danh sách
GET /api/ruleChains?pageSize=20&page=0
Authorization: Bearer {JWT}
# Tạo rule chain
POST /api/ruleChain
Authorization: Bearer {JWT}
Content-Type: application/json
{ "name": "My Custom Chain", "type": "CORE" }
# Lưu metadata (nodes + connections)
POST /api/ruleChain/{ruleChainId}/metadata
Authorization: Bearer {JWT}