Compare commits

..

5 Commits

24 changed files with 661 additions and 540 deletions

49
broker/acl.conf Normal file
View File

@ -0,0 +1,49 @@
%%--------------------------------------------------------------------
%% EMQX ACL configuration
%%--------------------------------------------------------------------
%% =========================
%% Device user permissions
%% =========================
%% Devices can publish ONLY to their own namespace
{allow, {user, "device"}, publish, ["device/${clientid}/meta/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/property/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/command/#"]}.
%% Devices can receive commands
{allow, {user, "device"}, subscribe, ["device/${clientid}/command/#"]}.
%% =========================
%% Authenticated users
%% =========================
%% Any authenticated user can read all device topics
{allow, all, subscribe, ["device/+/meta/#"]}.
{allow, all, subscribe, ["device/+/property/#"]}.
{allow, all, subscribe, ["device/+/command/#"]}.
%% Any authenticated user can publish commands to any device
{allow, all, publish, ["device/+/command/+"]}.
%% =========================
%% Response topic mechanism
%% =========================
%% Clients can SUBSCRIBE to their own response inbox
{allow, all, subscribe, ["client/${clientid}/responses/#"]}.
%% Authenticated users can PUBLISH to any client response inbox
{allow, all, publish, ["client/+/responses/#"]}.
%% (No subscribe permission for others -> enforced by default deny)
%% =========================
%% Default deny
%% =========================
{deny, all}.

81
broker/emqx.conf Normal file
View File

@ -0,0 +1,81 @@
## Place read-only configurations in this file.
## To define configurations that can later be overridden through UI/API/CLI, add them to `etc/base.hocon`.
##
## Config precedence order:
## etc/base.hocon < cluster.hocon < emqx.conf < environment variables
##
## See https://docs.emqx.com/en/enterprise/latest/configuration/configuration.html for more information.
## Configuration full example can be found in etc/examples
node {
name = "emqx@127.0.0.1"
cookie = "emqx50elixir"
data_dir = "data"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
dashboard {
listeners {
http.bind = 18083
# https.bind = 18084
https {
ssl_options {
certfile = "${EMQX_ETC_DIR}/certs/cert.pem"
keyfile = "${EMQX_ETC_DIR}/certs/key.pem"
}
}
}
}
##--------------------------------------------------------------------
## Authentication
##--------------------------------------------------------------------
## Load users from file
authn {
enable = true
sources = [
{
type = file
path = "etc/passwd"
password_hash_algorithm {
name = plain
}
}
]
}
##--------------------------------------------------------------------
## Authorization (ACL)
##--------------------------------------------------------------------
authorization {
sources = [
{
type = file
path = "etc/acl.conf"
}
]
no_match = deny
}
##--------------------------------------------------------------------
## Anonymous access
##--------------------------------------------------------------------
allow_anonymous = true
##--------------------------------------------------------------------
## Listener (basic)
##--------------------------------------------------------------------
listeners.tcp.default {
bind = "0.0.0.0:1883"
}

3
broker/passwd Normal file
View File

@ -0,0 +1,3 @@
device:devicesecret
alice:alicepass
bob:bobpass

View File

@ -1,47 +1,59 @@
<script setup>
import { ref, provide, onMounted } from 'vue'
import { useLayout } from './composables/useLayout'
import { useToast } from 'primevue/usetoast'
const toast = useToast()
const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } =
useLayout()
import MQTTService from './services/mqtt.js'
import AppTopbar from './components/AppTopbar.vue'
import AppFooter from './components/AppFooter.vue'
import StatsWidget from './components/dashboard/StatsWidget.vue'
import SalesTrendWidget from './components/dashboard/SalesTrendWidget.vue'
import RecentActivityWidget from './components/dashboard/RecentActivityWidget.vue'
import ProductOverviewWidget from './components/dashboard/ProductOverviewWidget.vue'
import DevicesWidget from './components/dashboard/DevicesWidget.vue'
import PropertiesWidget from './components/dashboard/PropertiesWidget.vue'
import CommandsWidget from './components/dashboard/CommandsWidget.vue'
import { ref } from 'vue'
import DevicesWidget from './components/dashboard/DevicesWidget.vue'
const mqtt = ref(null)
const loggedIn = ref(false)
const selectedDevice = ref(null)
function handleLogin({ username, password }) {
// Create the shared MQTT client
mqtt.value = new MQTTService('ws://127.0.0.1:8083/mqtt', username, password)
loggedIn.value = true
}
onMounted(() => {
if(isDarkMode !== false) toggleDarkMode()
})
provide('mqtt', mqtt)
</script>
<template>
<Toast position="bottom-right" />
<div class="layout-container">
<AppTopbar />
<div class="layout-grid">
<DevicesWidget @select="selectedDevice = $event" />
<div v-if="selectedDevice" class="layout-grid-row">
<PropertiesWidget
v-if="selectedDevice"
:key="'props-' + selectedDevice"
:device-id="selectedDevice"
/>
<CommandsWidget
v-if="selectedDevice"
:key="'cmds-' + selectedDevice"
:device-id="selectedDevice"
/>
<div id="app">
<SplashPage v-if="!loggedIn" @login="handleLogin" />
<div v-else class="layout-container">
<AppTopbar />
<div class="layout-grid">
<DevicesWidget @select="selectedDevice = $event" />
<div v-if="selectedDevice" class="layout-grid-row">
<PropertiesWidget
v-if="selectedDevice"
:key="'props-' + selectedDevice"
:device-id="selectedDevice"
/>
<CommandsWidget
v-if="selectedDevice"
:key="'cmds-' + selectedDevice"
:device-id="selectedDevice"
/>
</div>
</div>
<!-- <StatsWidget /> -->
<!-- <div class="layout-grid-row">
<SalesTrendWidget />
<RecentActivityWidget />
</div>-->
<!-- <ProductOverviewWidget /> -->
<AppFooter />
</div>
<AppFooter />
</div>
</template>

View File

@ -97,6 +97,18 @@ body {
color: var(--p-primary-200);
}
.offline {
color: color-mix(in srgb, red, transparent 40%);
border-color: color-mix(in srgb, red, transparent 70%);
background-color: color-mix(in srgb, red, transparent 80%);
}
.p-dark .offline {
color: color-mix(in srgb, red, transparent 40%);
border-color: color-mix(in srgb, red, transparent 70%);
background-color: color-mix(in srgb, red, transparent 90%);
}
.stats-header {
display: flex;
align-items: flex-start;

View File

@ -4,10 +4,6 @@
const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } =
useLayout()
onMounted(() => {
toggleDarkMode()
})
</script>
<template>

View File

@ -0,0 +1,60 @@
<script setup>
import { ref } from 'vue'
const emit = defineEmits(['login'])
const username = ref('')
const password = ref('')
function submitLogin() {
if (username.value && password.value) {
emit('login', { username: username.value, password: password.value })
} else {
alert('Please enter username and password')
}
}
</script>
<template>
<div class="splash-page">
<div class="login-card">
<h2>MQTT Login</h2>
<input v-model="username" placeholder="Username" />
<input v-model="password" type="password" placeholder="Password" />
<Button @click="submitLogin">Login</Button>
</div>
</div>
</template>
<style scoped>
.splash-page {
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
background-color: var(--p-primary-color-light);
}
.login-card {
padding: 2rem;
border-radius: 12px;
box-shadow: 0 0 12px rgba(0, 0, 0, 0.15);
width: 300px;
text-align: center;
}
.login-card input {
display: block;
width: 100%;
margin: 0.5rem 0;
padding: 0.5rem;
border-radius: 6px;
border: 1px solid #ccc;
}
.login-card button {
margin-top: 1rem;
width: 100%;
padding: 0.5rem;
}
</style>

View File

@ -1,6 +1,8 @@
<script setup>
import { ref, reactive, onMounted } from 'vue'
import MQTTService from '../../services/mqtt.js'
import { ref, reactive, onMounted, onUnmounted } from 'vue'
import { inject } from 'vue'
const mqttRef = inject('mqtt')
import Accordion from 'primevue/accordion'
import AccordionPanel from 'primevue/accordionpanel'
@ -25,8 +27,6 @@
deviceId: String,
})
const mqtt2 = new MQTTService()
const filters = ref({})
const commands = ref([])
const commandMap = reactive({})
@ -65,10 +65,10 @@
var responseId = generateId()
commandByResponseId[responseId] = cmd
mqtt2.publish(topic, payload || '{}', {
mqttRef.value.publish(topic, payload || '{}', {
qos: 1,
properties: {
responseTopic: `client/${mqtt2.clientId}/responses`,
responseTopic: `client/${mqttRef.value.clientId}/responses`,
correlationData: new TextEncoder().encode(responseId),
},
})
@ -93,7 +93,7 @@
}
onMounted(() => {
mqtt2.subscribe('device/+/command/#', (payload, topic) => {
mqttRef.value.subscribe(`device/${props.deviceId}/command/#`, (payload, topic) => {
const parts = topic.split('/')
const device = parts[1]
@ -102,6 +102,8 @@
const command = parts[3]
const field = parts[4]
console.log(topic, payload)
if (field === '$description') {
updateCommand(device, command, 'description', payload)
}
@ -111,7 +113,7 @@
}
})
mqtt2.subscribe(`client/${mqtt2.clientId}/responses`, (payload, topic) => {
mqttRef.value.subscribe(`client/${mqttRef.value.clientId}/responses`, (payload, topic) => {
let response = JSON.parse(payload)
const responseId = response.correlation
@ -137,6 +139,11 @@
})
})
onUnmounted(() => {
mqttRef.value.unsubscribe(`device/${props.deviceId}/command/#`)
mqttRef.value.unsubscribe(`client/${mqttRef.value.clientId}/responses`)
})
const jsonFormsConfig = {
validationMode: 'ValidateOnTouched',
showAllErrors: false,

View File

@ -1,93 +1,95 @@
<script setup>
import { ref, onMounted } from 'vue'
import MQTTService from '../../services/mqtt.js'
import { ref, onMounted } from 'vue'
import { inject } from 'vue'
const emit = defineEmits(['select'])
const mqttRef = inject('mqtt')
const emit = defineEmits(['select'])
const mqtt = new MQTTService()
const devices = ref([])
const deviceMap = ref({}) // map deviceId => device object with all meta properties
const selected = ref(null)
const devices = ref([])
const deviceSet = new Set()
const selected = ref(null)
function upsertDevice(device, status) {
const existing = devices.value.find((d) => d.id === device)
if (existing) {
// update reactively
existing.value = status
} else {
deviceSet.add(device)
devices.value = [
...devices.value,
{
id: device,
title: device,
value: status,
subtitle: 'MQTT Device',
icon: status === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle',
},
]
// Add or update a device's meta property
function upsertDevice(deviceId, property, value) {
if (!deviceMap.value[deviceId]) {
deviceMap.value[deviceId] = {
id: deviceId,
meta: {}, // stores all meta properties
title: deviceId,
value: '', // main status display
subtitle: 'MQTT Device',
icon: 'pi-box',
}
}
function selectDevice(device) {
selected.value = device.id
emit('select', device.id)
// update the property
deviceMap.value[deviceId].meta[property] = value
// update card display values (example: show 'status' as main value)
if (property === 'status') {
deviceMap.value[deviceId].value = value.replace(/['"]+/g, '')
deviceMap.value[deviceId].icon = deviceMap.value[deviceId].value === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle'
}
onMounted(() => {
mqtt.subscribe('device/+/property/status', (payload, topic) => {
const parts = topic.split('/')
const device = parts[1]
upsertDevice(device, payload)
})
// rebuild reactive array for rendering
devices.value = Object.values(deviceMap.value)
}
function selectDevice(device) {
selected.value = device.id
emit('select', device.id)
}
onMounted(() => {
mqttRef.value.subscribe('device/+/meta/+', (payload, topic) => {
const parts = topic.split('/')
const deviceId = parts[1]
const property = parts.slice(3).join('/') // handles nested properties if any
upsertDevice(deviceId, property, payload)
})
})
</script>
<template>
<div class="layout-grid-row">
<div
v-for="(device, index) in devices"
v-for="device in devices"
:key="device.id"
class="layout-card device-card"
:class="{ 'selected-device': device.id === selected }"
@click="selectDevice(device)"
>
<div class="stats-header">
<span class="stats-title">{{ device.title }}</span>
<span class="stats-icon-box">
<span class="stats-title">{{ device.meta['type'] }}</span>
<span :class="[ device.value === 'OFFLINE' ? 'offline' : '', 'stats-icon-box']">
<i
:class="[
'pi',
device.value === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle',
device.value === 'ONLINE' ? '' : 'text-red-500',
]"
:class="['pi', device.icon]"
:style="{ color: device.value === 'ONLINE' ? 'inherit' : 'var(--p-secondary-color)' }"
></i>
</span>
</div>
<div class="stats-content">
<div class="stats-value">{{ device.value }}</div>
<div class="stats-subtitle">{{ device.subtitle }}</div>
<div class="stats-value">{{ device.meta['name'] }}</div>
<div class="stats-subtitle"><span class='italic'>{{ device.id }}</span> on <span class="font-bold">{{ device.meta['host'] }}</span></div>
<div class="stats-subtitle italic text-green-500"></div>
</div>
</div>
</div>
</template>
<style scoped>
.device-card {
cursor: pointer;
transition: all 0.2s ease;
}
.device-card {
cursor: pointer;
transition: all 0.2s ease;
}
.device-card:hover {
transform: translateY(-2px);
}
.device-card:hover {
transform: translateY(-2px);
}
/* 🔥 selected state */
.selected-device {
border: 2px solid var(--p-primary-color);
box-shadow: 0 0 8px var(--p-primary-color);
}
</style>
.selected-device {
border: 2px solid var(--p-primary-color);
box-shadow: 0 0 8px var(--p-primary-color);
}
</style>

View File

@ -1,107 +0,0 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
const products = ref([
{
name: 'Laptop Pro',
category: 'Electronics',
price: 2499,
status: 'In Stock',
},
{
name: 'Wireless Mouse',
category: 'Accessories',
price: 49,
status: 'Low Stock',
},
{
name: 'Monitor 4K',
category: 'Electronics',
price: 699,
status: 'Out of Stock',
},
{ name: 'Keyboard', category: 'Accessories', price: 149, status: 'In Stock' },
])
const selectedProduct = ref(null)
const searchQuery = ref('')
const loading = ref(false)
const filteredProducts = ref([])
const searchProducts = () => {
loading.value = true
filteredProducts.value = products.value.filter(
(product) =>
product.name.toLowerCase().includes(searchQuery.value.toLowerCase()) ||
product.category.toLowerCase().includes(searchQuery.value.toLowerCase()) ||
product.status.toLowerCase().includes(searchQuery.value.toLowerCase()),
)
setTimeout(() => {
loading.value = false
}, 300)
}
watch(searchQuery, () => {
searchProducts()
})
onMounted(() => {
filteredProducts.value = [...products.value]
})
</script>
<template>
<div class="layout-card">
<div class="products-header">
<span class="products-title">Products Overview</span>
<IconField class="search-field">
<InputIcon class="pi pi-search" />
<InputText
v-model="searchQuery"
placeholder="Search products..."
class="products-search"
@keyup.enter="searchProducts"
/>
</IconField>
</div>
<div class="products-table-container">
<DataTable
:value="filteredProducts"
v-model:selection="selectedProduct"
selectionMode="single"
:loading="loading"
:rows="5"
class="products-table"
:pt="{
mask: {
class: 'products-table-mask',
},
loadingIcon: {
class: 'products-table-loading',
},
}"
>
<Column field="name" header="Name" sortable></Column>
<Column field="category" header="Category" sortable></Column>
<Column field="price" header="Price" sortable>
<template #body="{ data }"> ${{ data.price }} </template>
</Column>
<Column field="status" header="Status">
<template #body="{ data }">
<Tag
:severity="
data.status === 'In Stock'
? 'success'
: data.status === 'Low Stock'
? 'warn'
: 'danger'
"
>
{{ data.status }}
</Tag>
</template>
</Column>
</DataTable>
</div>
</div>
</template>

View File

@ -1,6 +1,9 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
import MQTTService from '../../services/mqtt.js'
import mqtt from 'mqtt'
import { ref, watch, onMounted, onUnmounted } from 'vue'
import { inject } from 'vue'
const mqttRef = inject('mqtt')
const props = defineProps({
deviceId: String,
@ -10,8 +13,6 @@
const filters = ref({})
const filterMode = ref({ label: 'Lenient', value: 'lenient' })
const mqtt = new MQTTService()
const changedKeys = ref({})
let propertyTree = {}
@ -80,7 +81,7 @@
)
onMounted(() => {
mqtt.subscribe('device/+/property/#', (payload, topic) => {
mqttRef.value.subscribe(`device/${props.deviceId}/property/#`, (payload, topic) => {
const parts = topic.split('/')
const device = parts[1]
@ -91,6 +92,10 @@
insertProperty(propertyPath, payload)
})
})
onUnmounted(() => {
mqttRef.value.unsubscribe(`device/${props.deviceId}/property/#`)
})
</script>
<template>

View File

@ -1,43 +0,0 @@
<script setup>
const activities = [
{
icon: 'pi-shopping-cart',
text: 'New order #1123',
time: '2 minutes ago',
color: 'pink',
},
{
icon: 'pi-user-plus',
text: 'New customer registered',
time: '15 minutes ago',
color: 'green',
},
{
icon: 'pi-check-circle',
text: 'Payment processed',
time: '25 minutes ago',
color: 'blue',
},
{
icon: 'pi-inbox',
text: 'Inventory updated',
time: '40 minutes ago',
color: 'yellow',
},
]
</script>
<template>
<div class="layout-card col-item-2">
<span class="chart-title">Recent Activity</span>
<div class="activity-list">
<div v-for="(activity, index) in activities" :key="index" class="activity-item">
<i :class="['activity-icon', activity.color, 'pi', activity.icon]"></i>
<div class="activity-content">
<span class="activity-text">{{ activity.text }}</span>
<span class="activity-time">{{ activity.time }}</span>
</div>
</div>
</div>
</div>
</template>

View File

@ -1,94 +0,0 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
import { useLayout } from '../../composables/useLayout'
const { primary, surface, isDarkMode } = useLayout()
const chartData = ref(null)
const chartOptions = ref(null)
function setChartData() {
const documentStyle = getComputedStyle(document.documentElement)
return {
labels: ['Q1', 'Q2', 'Q3', 'Q4'],
datasets: [
{
type: 'bar',
label: 'Subscriptions',
backgroundColor: documentStyle.getPropertyValue('--p-primary-400'),
data: [4000, 10000, 15000, 4000],
barThickness: 32,
},
{
type: 'bar',
label: 'Advertising',
backgroundColor: documentStyle.getPropertyValue('--p-primary-300'),
data: [2100, 8400, 2400, 7500],
barThickness: 32,
},
{
type: 'bar',
label: 'Affiliate',
backgroundColor: documentStyle.getPropertyValue('--p-primary-200'),
data: [4100, 5200, 3400, 7400],
borderRadius: {
topLeft: 8,
topRight: 8,
},
barThickness: 32,
},
],
}
}
function setChartOptions() {
return {
maintainAspectRatio: false,
responsive: true,
plugins: {
legend: {
position: 'top',
},
},
scales: {
x: {
stacked: true,
grid: {
color: 'transparent',
borderColor: 'transparent',
},
},
y: {
stacked: true,
grid: {
color: 'transparent',
borderColor: 'transparent',
drawTicks: false,
},
},
},
}
}
watch([primary, surface, isDarkMode], () => {
chartData.value = setChartData()
chartOptions.value = setChartOptions()
})
onMounted(() => {
chartData.value = setChartData()
chartOptions.value = setChartOptions()
})
</script>
<template>
<div class="layout-card col-item-2">
<div class="chart-header">
<span class="chart-title">Sales Trend</span>
</div>
<div class="chart-content">
<Chart type="bar" :data="chartData" :options="chartOptions" style="height: 300px" />
</div>
</div>
</template>

View File

@ -1,45 +0,0 @@
<script setup>
const stats = [
{
title: 'Total Orders',
icon: 'pi-shopping-cart',
value: '1,234',
subtitle: 'Last 7 days',
},
{
title: 'Active Users',
icon: 'pi-users',
value: '2,573',
subtitle: 'Last 7 days',
},
{
title: 'Revenue',
icon: 'pi-dollar',
value: '$45,200',
subtitle: 'Last 7 days',
},
{
title: 'Success Rate',
icon: 'pi-chart-line',
value: '95%',
subtitle: 'Last 7 days',
},
]
</script>
<template>
<div class="stats">
<div v-for="(stat, index) in stats" :key="index" class="layout-card">
<div class="stats-header">
<span class="stats-title">{{ stat.title }}</span>
<span class="stats-icon-box">
<i :class="['pi', stat.icon]"></i>
</span>
</div>
<div class="stats-content">
<div class="stats-value">{{ stat.value }}</div>
<div class="stats-subtitle">{{ stat.subtitle }}</div>
</div>
</div>
</div>
</template>

View File

@ -1,27 +1,79 @@
import mqtt from 'mqtt'
export default class MQTTService {
constructor(brokerUrl = 'ws://127.0.0.1:8083/mqtt', clientId = null) {
/**
* @param {string} brokerUrl - MQTT broker URL (e.g., ws://127.0.0.1:8083/mqtt)
* @param {string|null} clientId - Optional MQTT client ID
* @param {string|null} username - Optional MQTT username
* @param {string|null} password - Optional MQTT password
*/
constructor(
brokerUrl = 'ws://127.0.0.1:8083/mqtt',
username = null,
password = null,
clientId = null,
) {
this.clientId = clientId || 'vue-client-' + Math.random().toString(16).substr(2, 8)
this.client = mqtt.connect(brokerUrl, { clientId: this.clientId, protocolVersion: 5 })
this.subscriptions = [] // array of {topic, callback}
this.client.on('connect', () => console.log('Connected to MQTT broker'))
// Connect options
const options = {
clientId: this.clientId,
protocolVersion: 5,
}
if (username) options.username = username
if (password) options.password = password
this.client = mqtt.connect(brokerUrl, options)
this.subscriptions = [] // array of { topic, callback }
this.client.on('connect', () => {
console.log(`Connected to MQTT broker as ${this.clientId}`)
})
this.client.on('message', (topic, payload) => {
// iterate over subscriptions and check for matches
const message = payload.toString()
this.subscriptions.forEach(({ topic: subTopic, callback }) => {
if (mqttMatch(subTopic, topic)) {
callback(payload.toString(), topic)
callback(message, topic)
}
})
})
}
/**
* Subscribe to a topic
* @param {string} topic
* @param {function} callback
*/
subscribe(topic, callback) {
this.subscriptions.push({ topic, callback })
this.client.subscribe(topic)
}
/**
* Unsubscribe from a topic
* @param {string} topic
* @param {function|null} callback - optional, remove only this callback
*/
unsubscribe(topic, callback = null) {
this.subscriptions = this.subscriptions.filter((sub) => {
if (sub.topic !== topic) return true
if (callback && sub.callback !== callback) return true
return false
})
// Actually tell the broker to unsubscribe only if no more callbacks exist for this topic
const stillSubscribed = this.subscriptions.some((sub) => sub.topic === topic)
if (!stillSubscribed) {
this.client.unsubscribe(topic)
}
}
/**
* Publish a message
* @param {string} topic
* @param {string|Buffer} message
* @param {object} options
*/
publish(topic, message, options = {}) {
this.client.publish(topic, message, options)
}
@ -30,6 +82,6 @@ export default class MQTTService {
// helper function for MQTT wildcards
function mqttMatch(subTopic, topic) {
// replace MQTT wildcards with RegExp
const regex = '^' + subTopic.replace('+', '[^/]+').replace('#', '.+') + '$'
const regex = '^' + subTopic.replace(/\+/g, '[^/]+').replace(/#/g, '.+') + '$'
return new RegExp(regex).test(topic)
}

View File

@ -9,6 +9,10 @@ services:
- 8084:8084
- 8883:8883
- 18083:18083
volumes:
- ./broker/emqx.conf:/opt/emqx/etc/emqx.conf:z
- ./broker/acl.conf:/opt/emqx/etc/acl.conf:z
- ./broker/passwd:/opt/emqx/etc/passwd:z
mediamtx:
container_name: mediamtx

View File

@ -7,9 +7,10 @@ from dataclasses import dataclass
import aiohttp
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from mqtthandler.handler import MQTTHandler, task
from prometheus_client.parser import text_string_to_metric_families
@dataclass
class MediaMTXConfig:
host: str = "http://localhost"
@ -21,11 +22,10 @@ class MediaMTXConfig:
class MediaMTXHandler(MQTTHandler):
def __init__(
self,
mqtt_config: MQTTConfig,
handler_id: str,
name: str,
mediamtx_config: MediaMTXConfig,
):
super().__init__(mqtt_config, handler_id)
super().__init__(name)
self.config = mediamtx_config
@task
@ -33,10 +33,14 @@ class MediaMTXHandler(MQTTHandler):
cache = {}
while True:
auth = aiohttp.BasicAuth(login=self.config.username, password=self.config.password)
auth = aiohttp.BasicAuth(
login=self.config.username, password=self.config.password
)
async with aiohttp.ClientSession(auth=auth) as session:
while True:
async with session.get(f"{self.config.host}{self.config.metrics_path}") as r:
async with session.get(
f"{self.config.host}{self.config.metrics_path}"
) as r:
metrics = await r.text()
for family in text_string_to_metric_families(metrics):
@ -47,7 +51,9 @@ class MediaMTXHandler(MQTTHandler):
cache[topic] = sample.value
print(topic, sample.value)
await self.set_property(topic, sample.value, 0, True)
await self.set_property(
topic, sample.value, qos=0, retain=True
)
await asyncio.sleep(1)
@ -56,12 +62,9 @@ class MediaMTXHandler(MQTTHandler):
async def main():
handler_id = f"mediamtx-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = MediaMTXHandler(mqtt_config, handler_id, MediaMTXConfig())
handler = MediaMTXHandler("mediamtx", MediaMTXConfig())
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
await handler.run("127.0.0.1", username="device", password="devicesecret")
if __name__ == "__main__":

View File

@ -64,3 +64,13 @@ class CommandResponse:
def __str__(self):
return json.dumps(asdict(self))
def enumerate_commands(obj: object):
commands = {}
for base in obj.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
commands[attr.name] = attr
return commands

View File

@ -4,101 +4,147 @@ import inspect
import paho
import signal
import json
from dataclasses import dataclass
import secrets
import os
import socket
from pathlib import Path
from enum import Enum, auto
from .command import (
Command,
CommandResponse,
CommandArgumentError,
CommandExecutionError,
enumerate_commands,
)
from .property import Property
@dataclass
class MQTTConfig:
host: str
port: int = 1883
username: str | None = None
password: str | None = None
keepalive: int = 60
def get_identifier(cache_path: Path) -> str:
"""
Determine an MQTT client ID using the following order:
1. Environment variable IDENTIFIER
2. Value stored in /tmp/<handler-name>.tmp
3. Generate a new random ID using secrets.token_urlsafe
The resulting client ID is written to /tmp/mqtt_client_id.tmp for future use.
"""
client_id = os.environ.get("IDENTIFIER", None)
if not client_id and cache_path.exists():
client_id = cache_path.read_text().strip()
elif not client_id:
client_id = generate_identifier()
cache_path.write_text(client_id)
return client_id
def generate_identifier() -> str:
return secrets.token_urlsafe(6)
class Status(Enum):
ONLINE = auto()
OFFLINE = auto()
class MQTTHandler:
DEVICE = "device"
META = "meta"
PROPERTY = "property"
COMMAND = "command"
STATUS = "status"
def __init__(
self,
mqtt_config: MQTTConfig,
handler_id: str,
):
self.handler_id = handler_id
self.mqtt_config = mqtt_config
def __init__(self, name: str):
self.name = name
self.identifier = get_identifier(Path(f"/tmp/{self.name}.tmp"))
self.topic_base = f"device/{handler_id}"
self.command_topic = f"{self.topic_base}/command"
self.property_topic = f"{self.topic_base}/property"
self.topic_base = lambda: f"{MQTTHandler.DEVICE}/{self.identifier}"
self.meta_topic = lambda: f"{self.topic_base()}/{MQTTHandler.META}"
self.command_topic = lambda: f"{self.topic_base()}/{MQTTHandler.COMMAND}"
self.property_topic = lambda: f"{self.topic_base()}/{MQTTHandler.PROPERTY}"
self._shutdown_event = asyncio.Event()
will = aiomqtt.Will(
topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True
self._mqtt_client = None
self._commands = enumerate_commands(self)
self._properties = {}
self._meta = {}
async def set_property(self, name: str, value, **kwargs):
if name in self._properties:
await self._properties[name](value, **kwargs)
else:
# print(f"Warning: proeprty {name} is unregistered")
await self._publish(f"{self.property_topic()}/{name}", value, **kwargs)
async def register_property(
self, name: str, description: str | None = None, schema: dict | None = None
):
property = self._register_property(
f"{self.property_topic()}/{name}", description, schema
)
self._properties[name] = property
self.mqtt_client = aiomqtt.Client(
self.mqtt_config.host,
port=self.mqtt_config.port,
identifier=handler_id,
protocol=paho.mqtt.client.MQTTv5,
will=will,
)
self.commands = self.get_available_commands()
def get_available_commands(self):
commands = {}
for base in self.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'")
commands[attr.name] = attr
return commands
async def register_commands(self):
for name, command in self.commands.items():
for k, v in {
"schema": json.dumps(command.schema),
"description": command.description,
**command.additional_properties,
}.items():
await self.mqtt_client.publish(
f"{self.command_topic}/{command.name}/${k}",
str(v),
qos=1,
retain=True,
)
async def register_property(self, property: str, description: str | None = None, schema: dict | None = None):
async def _register_property(
self, name: str, description: str | None = None, schema: dict | None = None
):
property = Property(name, description, schema, self._publish)
data = {
"schema": json.dumps(schema),
"description": description,
}
for k, v in {k:v for k,v in data.items() if v is not None}.items():
await self.mqtt_client.publish(
f"{self.property_topic}/{property}/${k}",
for k, v in {k: v for k, v in data.items() if v is not None}.items():
await self._mqtt_client.publish(
f"{name}/${k}",
str(v),
qos=1,
retain=True,
)
async def set_property(self, property: str, value, qos=0, retain=False):
await self.mqtt_client.publish(
f"{self.property_topic}/{property}",
str(value),
qos=qos,
retain=retain,
return property
async def _publish(self, name: str, value, **kwargs):
await self._mqtt_client.publish(f"{name}", value, **kwargs)
async def _register_commands(self):
for name, command in self._commands.items():
for k, v in {
"schema": json.dumps(command.schema),
"description": command.description,
**command.additional_properties,
}.items():
await self._mqtt_client.publish(
f"{self.command_topic()}/{command.name}/${k}",
str(v),
qos=1,
retain=True,
)
async def _announce(self):
# announce that we are online
await self._register_commands()
self._meta[MQTTHandler.STATUS] = await self._register_property(
f"{self.meta_topic()}/{MQTTHandler.STATUS}",
"Indicates the status of the device.",
{"type": "string", "enum": list(Status.__members__.keys())},
)
await self._meta[MQTTHandler.STATUS](
self, json.dumps(Status.ONLINE.name), qos=1, retain=True
)
async def execute_command(
await self._publish(f"{self.meta_topic()}/name", self.name, qos=1, retain=True)
await self._publish(
f"{self.meta_topic()}/type", type(self).__name__, qos=1, retain=True
)
await self._publish(
f"{self.meta_topic()}/host", socket.gethostname(), qos=1, retain=True
)
async def _execute_command(
self,
command_name: str,
payload: str,
@ -115,7 +161,7 @@ class MQTTHandler:
if hasattr(properties, "CorrelationData")
else None
)
await self.mqtt_client.publish(
await self._mqtt_client.publish(
properties.ResponseTopic,
str(CommandResponse(success, str(message), correlation)),
qos=1,
@ -123,7 +169,7 @@ class MQTTHandler:
)
try:
command = self.commands[command_name]
command = self._commands[command_name]
result = await command(self, payload)
await respond(True, result)
except (CommandArgumentError, CommandExecutionError) as e:
@ -132,39 +178,53 @@ class MQTTHandler:
print(f"Failed to execute command {command_name} with unknown cause: ", e)
await respond(False, "Unexpected error")
async def mqtt_command_writer_task(self):
async for message in self.mqtt_client.messages:
async def _command_executor(self):
await self._mqtt_client.subscribe(f"{self.command_topic()}/+")
async for message in self._mqtt_client.messages:
topic = str(message.topic)
payload = message.payload.decode("utf-8")
if topic.startswith(self.command_topic):
command_name = topic.removeprefix(f"{self.command_topic}/")
await self.execute_command(command_name, payload, message.properties)
if topic.startswith(self.command_topic()):
command_name = topic.removeprefix(f"{self.command_topic()}/")
await self._execute_command(command_name, payload, message.properties)
async def shutdown_watcher(self):
async def _shutdown_watcher(self):
await self._shutdown_event.wait()
await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True)
await self._meta[MQTTHandler.STATUS](
self, json.dumps(Status.OFFLINE.name), qos=1, retain=True
)
def stop(self):
self._shutdown_event.set()
signal.signal(signal.SIGINT, signal.SIG_DFL)
async def run(self):
async def run(self, host: str, **kwargs):
INTERVAL = 5
will = aiomqtt.Will(
topic=f"{self.meta_topic()}/{MQTTHandler.STATUS}",
payload=json.dumps(Status.OFFLINE.name),
qos=1,
retain=True,
)
while True:
try:
async with self.mqtt_client as client:
await client.subscribe(f"{self.command_topic}/+")
await self.register_commands()
async with aiomqtt.Client(
host,
protocol=paho.mqtt.client.MQTTv5,
will=will,
identifier=self.identifier,
**kwargs,
) as client:
self._mqtt_client = client
# announce that we are online
await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True)
await self.register_property("status", "Indicates the status of the device.", {
"type": "string",
"enum": ["ONLINE", "OFFLINE"]
})
tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()]
tasks = [
self._command_executor(),
self._shutdown_watcher(),
self._announce(),
]
# Inspect instance methods
for attr_name in dir(self):
@ -179,11 +239,12 @@ class MQTTHandler:
await asyncio.gather(*tasks)
except aiomqtt.MqttError as e:
print(
f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {INTERVAL}s..."
)
print(f"MQTT connection error: {e}. Reconnecting in {INTERVAL}s...")
await asyncio.sleep(INTERVAL)
finally:
self._mqtt_client = None
def task(func):
"""Decorator to mark async methods for automatic gathering."""

58
mqtthandler/property.py Normal file
View File

@ -0,0 +1,58 @@
import json
import jsonschema
class PropertyValueError(ValueError):
pass
class Property:
"""
Presumes that the handler will take the same arguments as aiomqtt.Client.publish
ie: async publish(
topic: str,
payload: str | bytes | bytearray | int | float | None = None,
qos: int = 0,
retain: bool = False,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any
) None
"""
def __init__(
self,
name: str,
description: str | None = None,
schema: dict | None = None,
handler=None,
**kwargs,
):
self.name = name
self.description = description
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
def __call__(self, handler_instance, payload, **kwargs):
if self.handler is None:
raise NotImplementedError(f"No handler bound for property '{self.name}'")
try:
value = json.loads(payload)
if self.schema is not None:
jsonschema.validate(value, self.schema)
except json.decoder.JSONDecodeError as e:
raise PropertyValueError(
f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}"
)
except jsonschema.ValidationError as e:
raise PropertyValueError(f"Schema error in {e.json_path}: {e.message}")
try:
return self.handler(self.name, payload, **kwargs)
except Exception as e:
print("Failed to set property: ", e)
raise RuntimeError(f"Failed to set property.")

View File

@ -6,24 +6,23 @@ import asyncio
from dataclasses import dataclass
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from mqtthandler.handler import MQTTHandler, task
from streamer.fileradio import FileRadio
class RadioHandler(MQTTHandler):
def __init__(
self,
mqtt_config: MQTTConfig,
handler_id: str,
name: str,
):
super().__init__(mqtt_config, handler_id)
self.radio = FileRadio("./data/StarWars60.mp3", handler_id)
super().__init__(name)
self.radio = FileRadio("./data/StarWars60.mp3", name)
@task
async def publish_stream_path(self):
await self.set_property("path", self.radio.stream_path(), 1, True)
await self.set_property("file", self.radio.path, 1, True)
await self.set_property("path", self.radio.stream_path(), qos=1, retain=True)
await self.set_property("file", self.radio.path, qos=1, retain=True)
@command({"type": "object"}, "Start the radio stream.")
async def start(self, args):
@ -47,13 +46,11 @@ class RadioHandler(MQTTHandler):
self.radio = FileRadio(args, self.radio.name)
await self.publish_stream_path()
async def main():
handler_id = f"radio-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = RadioHandler(mqtt_config, handler_id)
async def main():
handler = RadioHandler("radio")
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
await handler.run("127.0.0.1", username="device", password="devicesecret")
if __name__ == "__main__":

View File

@ -21,9 +21,9 @@ class FileRadio(Streamer):
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream -
"-1", # ...indefinitely
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream -
"-1", # ...indefinitely
"-i",
self.path,
"-c:a",

View File

@ -1,5 +1,6 @@
from threading import Thread
def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False

37
ubx.py
View File

@ -2,17 +2,15 @@
import asyncio
import aioserial
import aiomqtt
import pyubx2
import io
import logging
import aioserial
import asyncio
import socket
import signal
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from mqtthandler.handler import MQTTHandler, task
# The pyubx2 library spams the console with errors that aren't errors.
# I don't care if you failed to parse an incomplete buffer.
@ -47,11 +45,10 @@ class UBXAsyncParser:
class UBXHandler(MQTTHandler):
def __init__(
self,
mqtt_client: aiomqtt.Client,
handler_id: str,
name: str,
serial_port: aioserial.AioSerial,
):
super().__init__(mqtt_client, handler_id)
super().__init__(name)
self.serial_port = serial_port
@task
@ -77,7 +74,7 @@ class UBXHandler(MQTTHandler):
continue
property = f"{message.identity}/{name}"
await self.set_property(property, value, 1, True)
await self.set_property(property, value, qos=0, retain=True)
else:
# print("Unexpected response:", message)
pass
@ -112,7 +109,7 @@ class UBXHandler(MQTTHandler):
460800,
921600,
],
"default": 9600
"default": 9600,
},
},
"required": ["portID", "baudRate"],
@ -147,7 +144,7 @@ class UBXHandler(MQTTHandler):
"minimum": 1,
"maximum": 127,
"description": "Number of measurement cycles per navigation solution",
"default": 1
"default": 1,
},
"timeRef": {
"type": "integer",
@ -174,19 +171,19 @@ class UBXHandler(MQTTHandler):
async def main():
handler_id = f"example-gps-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1", port=1883)
serial_port = aioserial.AioSerial(
port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
handler = UBXHandler(
"example-gps",
aioserial.AioSerial(
port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
),
)
handler = UBXHandler(mqtt_config, handler_id, serial_port)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
await handler.run(
"127.0.0.1", port=1883, username="device", password="devicesecret"
)
if __name__ == "__main__":