Compare commits

..

No commits in common. "51c27ac9fb508af9284e352573bb83698e12c1f7" and "93d7e9e5d126e9591b80a25951363753bcd33b88" have entirely different histories.

24 changed files with 535 additions and 656 deletions

View File

@ -1,49 +0,0 @@
%%--------------------------------------------------------------------
%% EMQX ACL configuration
%%--------------------------------------------------------------------
%% =========================
%% Device user permissions
%% =========================
%% Devices can publish ONLY to their own namespace
{allow, {user, "device"}, publish, ["device/${clientid}/meta/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/property/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/command/#"]}.
%% Devices can receive commands
{allow, {user, "device"}, subscribe, ["device/${clientid}/command/#"]}.
%% =========================
%% Authenticated users
%% =========================
%% Any authenticated user can read all device topics
{allow, all, subscribe, ["device/+/meta/#"]}.
{allow, all, subscribe, ["device/+/property/#"]}.
{allow, all, subscribe, ["device/+/command/#"]}.
%% Any authenticated user can publish commands to any device
{allow, all, publish, ["device/+/command/+"]}.
%% =========================
%% Response topic mechanism
%% =========================
%% Clients can SUBSCRIBE to their own response inbox
{allow, all, subscribe, ["client/${clientid}/responses/#"]}.
%% Authenticated users can PUBLISH to any client response inbox
{allow, all, publish, ["client/+/responses/#"]}.
%% (No subscribe permission for others -> enforced by default deny)
%% =========================
%% Default deny
%% =========================
{deny, all}.

View File

@ -1,81 +0,0 @@
## Place read-only configurations in this file.
## To define configurations that can later be overridden through UI/API/CLI, add them to `etc/base.hocon`.
##
## Config precedence order:
## etc/base.hocon < cluster.hocon < emqx.conf < environment variables
##
## See https://docs.emqx.com/en/enterprise/latest/configuration/configuration.html for more information.
## Configuration full example can be found in etc/examples
node {
name = "emqx@127.0.0.1"
cookie = "emqx50elixir"
data_dir = "data"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
dashboard {
listeners {
http.bind = 18083
# https.bind = 18084
https {
ssl_options {
certfile = "${EMQX_ETC_DIR}/certs/cert.pem"
keyfile = "${EMQX_ETC_DIR}/certs/key.pem"
}
}
}
}
##--------------------------------------------------------------------
## Authentication
##--------------------------------------------------------------------
## Load users from file
authn {
enable = true
sources = [
{
type = file
path = "etc/passwd"
password_hash_algorithm {
name = plain
}
}
]
}
##--------------------------------------------------------------------
## Authorization (ACL)
##--------------------------------------------------------------------
authorization {
sources = [
{
type = file
path = "etc/acl.conf"
}
]
no_match = deny
}
##--------------------------------------------------------------------
## Anonymous access
##--------------------------------------------------------------------
allow_anonymous = true
##--------------------------------------------------------------------
## Listener (basic)
##--------------------------------------------------------------------
listeners.tcp.default {
bind = "0.0.0.0:1883"
}

View File

@ -1,3 +0,0 @@
device:devicesecret
alice:alicepass
bob:bobpass

View File

@ -1,59 +1,47 @@
<script setup> <script setup>
import { ref, provide, onMounted } from 'vue'
import { useLayout } from './composables/useLayout'
import { useToast } from 'primevue/usetoast'
const toast = useToast()
const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } =
useLayout()
import MQTTService from './services/mqtt.js'
import AppTopbar from './components/AppTopbar.vue' import AppTopbar from './components/AppTopbar.vue'
import AppFooter from './components/AppFooter.vue' import AppFooter from './components/AppFooter.vue'
import DevicesWidget from './components/dashboard/DevicesWidget.vue' import StatsWidget from './components/dashboard/StatsWidget.vue'
import SalesTrendWidget from './components/dashboard/SalesTrendWidget.vue'
import RecentActivityWidget from './components/dashboard/RecentActivityWidget.vue'
import ProductOverviewWidget from './components/dashboard/ProductOverviewWidget.vue'
import PropertiesWidget from './components/dashboard/PropertiesWidget.vue' import PropertiesWidget from './components/dashboard/PropertiesWidget.vue'
import CommandsWidget from './components/dashboard/CommandsWidget.vue' import CommandsWidget from './components/dashboard/CommandsWidget.vue'
const mqtt = ref(null) import { ref } from 'vue'
const loggedIn = ref(false) import DevicesWidget from './components/dashboard/DevicesWidget.vue'
const selectedDevice = ref(null) const selectedDevice = ref(null)
function handleLogin({ username, password }) {
// Create the shared MQTT client
mqtt.value = new MQTTService('ws://127.0.0.1:8083/mqtt', username, password)
loggedIn.value = true
}
onMounted(() => {
if(isDarkMode !== false) toggleDarkMode()
})
provide('mqtt', mqtt)
</script> </script>
<template> <template>
<Toast position="bottom-right" /> <Toast position="bottom-right" />
<div id="app"> <div class="layout-container">
<SplashPage v-if="!loggedIn" @login="handleLogin" /> <AppTopbar />
<div v-else class="layout-container"> <div class="layout-grid">
<AppTopbar /> <DevicesWidget @select="selectedDevice = $event" />
<div class="layout-grid"> <div v-if="selectedDevice" class="layout-grid-row">
<DevicesWidget @select="selectedDevice = $event" /> <PropertiesWidget
<div v-if="selectedDevice" class="layout-grid-row"> v-if="selectedDevice"
<PropertiesWidget :key="'props-' + selectedDevice"
v-if="selectedDevice" :device-id="selectedDevice"
:key="'props-' + selectedDevice" />
:device-id="selectedDevice"
/> <CommandsWidget
<CommandsWidget v-if="selectedDevice"
v-if="selectedDevice" :key="'cmds-' + selectedDevice"
:key="'cmds-' + selectedDevice" :device-id="selectedDevice"
:device-id="selectedDevice" />
/>
</div>
</div> </div>
<AppFooter /> <!-- <StatsWidget /> -->
<!-- <div class="layout-grid-row">
<SalesTrendWidget />
<RecentActivityWidget />
</div>-->
<!-- <ProductOverviewWidget /> -->
</div> </div>
<AppFooter />
</div> </div>
</template> </template>

View File

@ -97,18 +97,6 @@ body {
color: var(--p-primary-200); color: var(--p-primary-200);
} }
.offline {
color: color-mix(in srgb, red, transparent 40%);
border-color: color-mix(in srgb, red, transparent 70%);
background-color: color-mix(in srgb, red, transparent 80%);
}
.p-dark .offline {
color: color-mix(in srgb, red, transparent 40%);
border-color: color-mix(in srgb, red, transparent 70%);
background-color: color-mix(in srgb, red, transparent 90%);
}
.stats-header { .stats-header {
display: flex; display: flex;
align-items: flex-start; align-items: flex-start;

View File

@ -4,6 +4,10 @@
const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } = const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } =
useLayout() useLayout()
onMounted(() => {
toggleDarkMode()
})
</script> </script>
<template> <template>

View File

@ -1,60 +0,0 @@
<script setup>
import { ref } from 'vue'
const emit = defineEmits(['login'])
const username = ref('')
const password = ref('')
function submitLogin() {
if (username.value && password.value) {
emit('login', { username: username.value, password: password.value })
} else {
alert('Please enter username and password')
}
}
</script>
<template>
<div class="splash-page">
<div class="login-card">
<h2>MQTT Login</h2>
<input v-model="username" placeholder="Username" />
<input v-model="password" type="password" placeholder="Password" />
<Button @click="submitLogin">Login</Button>
</div>
</div>
</template>
<style scoped>
.splash-page {
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
background-color: var(--p-primary-color-light);
}
.login-card {
padding: 2rem;
border-radius: 12px;
box-shadow: 0 0 12px rgba(0, 0, 0, 0.15);
width: 300px;
text-align: center;
}
.login-card input {
display: block;
width: 100%;
margin: 0.5rem 0;
padding: 0.5rem;
border-radius: 6px;
border: 1px solid #ccc;
}
.login-card button {
margin-top: 1rem;
width: 100%;
padding: 0.5rem;
}
</style>

View File

@ -1,8 +1,6 @@
<script setup> <script setup>
import { ref, reactive, onMounted, onUnmounted } from 'vue' import { ref, reactive, onMounted } from 'vue'
import { inject } from 'vue' import MQTTService from '../../services/mqtt.js'
const mqttRef = inject('mqtt')
import Accordion from 'primevue/accordion' import Accordion from 'primevue/accordion'
import AccordionPanel from 'primevue/accordionpanel' import AccordionPanel from 'primevue/accordionpanel'
@ -27,6 +25,8 @@
deviceId: String, deviceId: String,
}) })
const mqtt2 = new MQTTService()
const filters = ref({}) const filters = ref({})
const commands = ref([]) const commands = ref([])
const commandMap = reactive({}) const commandMap = reactive({})
@ -65,10 +65,10 @@
var responseId = generateId() var responseId = generateId()
commandByResponseId[responseId] = cmd commandByResponseId[responseId] = cmd
mqttRef.value.publish(topic, payload || '{}', { mqtt2.publish(topic, payload || '{}', {
qos: 1, qos: 1,
properties: { properties: {
responseTopic: `client/${mqttRef.value.clientId}/responses`, responseTopic: `client/${mqtt2.clientId}/responses`,
correlationData: new TextEncoder().encode(responseId), correlationData: new TextEncoder().encode(responseId),
}, },
}) })
@ -93,7 +93,7 @@
} }
onMounted(() => { onMounted(() => {
mqttRef.value.subscribe(`device/${props.deviceId}/command/#`, (payload, topic) => { mqtt2.subscribe('device/+/command/#', (payload, topic) => {
const parts = topic.split('/') const parts = topic.split('/')
const device = parts[1] const device = parts[1]
@ -102,8 +102,6 @@
const command = parts[3] const command = parts[3]
const field = parts[4] const field = parts[4]
console.log(topic, payload)
if (field === '$description') { if (field === '$description') {
updateCommand(device, command, 'description', payload) updateCommand(device, command, 'description', payload)
} }
@ -113,7 +111,7 @@
} }
}) })
mqttRef.value.subscribe(`client/${mqttRef.value.clientId}/responses`, (payload, topic) => { mqtt2.subscribe(`client/${mqtt2.clientId}/responses`, (payload, topic) => {
let response = JSON.parse(payload) let response = JSON.parse(payload)
const responseId = response.correlation const responseId = response.correlation
@ -139,11 +137,6 @@
}) })
}) })
onUnmounted(() => {
mqttRef.value.unsubscribe(`device/${props.deviceId}/command/#`)
mqttRef.value.unsubscribe(`client/${mqttRef.value.clientId}/responses`)
})
const jsonFormsConfig = { const jsonFormsConfig = {
validationMode: 'ValidateOnTouched', validationMode: 'ValidateOnTouched',
showAllErrors: false, showAllErrors: false,

View File

@ -1,95 +1,93 @@
<script setup> <script setup>
import { ref, onMounted } from 'vue' import { ref, onMounted } from 'vue'
import { inject } from 'vue' import MQTTService from '../../services/mqtt.js'
const mqttRef = inject('mqtt') const emit = defineEmits(['select'])
const emit = defineEmits(['select'])
const devices = ref([]) const mqtt = new MQTTService()
const deviceMap = ref({}) // map deviceId => device object with all meta properties
const selected = ref(null)
// Add or update a device's meta property const devices = ref([])
function upsertDevice(deviceId, property, value) { const deviceSet = new Set()
if (!deviceMap.value[deviceId]) { const selected = ref(null)
deviceMap.value[deviceId] = {
id: deviceId, function upsertDevice(device, status) {
meta: {}, // stores all meta properties const existing = devices.value.find((d) => d.id === device)
title: deviceId,
value: '', // main status display if (existing) {
subtitle: 'MQTT Device', // update reactively
icon: 'pi-box', existing.value = status
} else {
deviceSet.add(device)
devices.value = [
...devices.value,
{
id: device,
title: device,
value: status,
subtitle: 'MQTT Device',
icon: status === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle',
},
]
} }
} }
function selectDevice(device) {
// update the property selected.value = device.id
deviceMap.value[deviceId].meta[property] = value emit('select', device.id)
// update card display values (example: show 'status' as main value)
if (property === 'status') {
deviceMap.value[deviceId].value = value.replace(/['"]+/g, '')
deviceMap.value[deviceId].icon = deviceMap.value[deviceId].value === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle'
} }
// rebuild reactive array for rendering onMounted(() => {
devices.value = Object.values(deviceMap.value) mqtt.subscribe('device/+/property/status', (payload, topic) => {
} const parts = topic.split('/')
const device = parts[1]
function selectDevice(device) { upsertDevice(device, payload)
selected.value = device.id })
emit('select', device.id)
}
onMounted(() => {
mqttRef.value.subscribe('device/+/meta/+', (payload, topic) => {
const parts = topic.split('/')
const deviceId = parts[1]
const property = parts.slice(3).join('/') // handles nested properties if any
upsertDevice(deviceId, property, payload)
}) })
})
</script> </script>
<template> <template>
<div class="layout-grid-row"> <div class="layout-grid-row">
<div <div
v-for="device in devices" v-for="(device, index) in devices"
:key="device.id" :key="device.id"
class="layout-card device-card" class="layout-card device-card"
:class="{ 'selected-device': device.id === selected }" :class="{ 'selected-device': device.id === selected }"
@click="selectDevice(device)" @click="selectDevice(device)"
> >
<div class="stats-header"> <div class="stats-header">
<span class="stats-title">{{ device.meta['type'] }}</span> <span class="stats-title">{{ device.title }}</span>
<span :class="[ device.value === 'OFFLINE' ? 'offline' : '', 'stats-icon-box']"> <span class="stats-icon-box">
<i <i
:class="['pi', device.icon]" :class="[
:style="{ color: device.value === 'ONLINE' ? 'inherit' : 'var(--p-secondary-color)' }" 'pi',
device.value === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle',
device.value === 'ONLINE' ? '' : 'text-red-500',
]"
></i> ></i>
</span> </span>
</div> </div>
<div class="stats-content"> <div class="stats-content">
<div class="stats-value">{{ device.meta['name'] }}</div> <div class="stats-value">{{ device.value }}</div>
<div class="stats-subtitle"><span class='italic'>{{ device.id }}</span> on <span class="font-bold">{{ device.meta['host'] }}</span></div> <div class="stats-subtitle">{{ device.subtitle }}</div>
<div class="stats-subtitle italic text-green-500"></div>
</div> </div>
</div> </div>
</div> </div>
</template> </template>
<style scoped> <style scoped>
.device-card { .device-card {
cursor: pointer; cursor: pointer;
transition: all 0.2s ease; transition: all 0.2s ease;
} }
.device-card:hover { .device-card:hover {
transform: translateY(-2px); transform: translateY(-2px);
} }
.selected-device { /* 🔥 selected state */
border: 2px solid var(--p-primary-color); .selected-device {
box-shadow: 0 0 8px var(--p-primary-color); border: 2px solid var(--p-primary-color);
} box-shadow: 0 0 8px var(--p-primary-color);
}
</style> </style>

View File

@ -0,0 +1,107 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
const products = ref([
{
name: 'Laptop Pro',
category: 'Electronics',
price: 2499,
status: 'In Stock',
},
{
name: 'Wireless Mouse',
category: 'Accessories',
price: 49,
status: 'Low Stock',
},
{
name: 'Monitor 4K',
category: 'Electronics',
price: 699,
status: 'Out of Stock',
},
{ name: 'Keyboard', category: 'Accessories', price: 149, status: 'In Stock' },
])
const selectedProduct = ref(null)
const searchQuery = ref('')
const loading = ref(false)
const filteredProducts = ref([])
const searchProducts = () => {
loading.value = true
filteredProducts.value = products.value.filter(
(product) =>
product.name.toLowerCase().includes(searchQuery.value.toLowerCase()) ||
product.category.toLowerCase().includes(searchQuery.value.toLowerCase()) ||
product.status.toLowerCase().includes(searchQuery.value.toLowerCase()),
)
setTimeout(() => {
loading.value = false
}, 300)
}
watch(searchQuery, () => {
searchProducts()
})
onMounted(() => {
filteredProducts.value = [...products.value]
})
</script>
<template>
<div class="layout-card">
<div class="products-header">
<span class="products-title">Products Overview</span>
<IconField class="search-field">
<InputIcon class="pi pi-search" />
<InputText
v-model="searchQuery"
placeholder="Search products..."
class="products-search"
@keyup.enter="searchProducts"
/>
</IconField>
</div>
<div class="products-table-container">
<DataTable
:value="filteredProducts"
v-model:selection="selectedProduct"
selectionMode="single"
:loading="loading"
:rows="5"
class="products-table"
:pt="{
mask: {
class: 'products-table-mask',
},
loadingIcon: {
class: 'products-table-loading',
},
}"
>
<Column field="name" header="Name" sortable></Column>
<Column field="category" header="Category" sortable></Column>
<Column field="price" header="Price" sortable>
<template #body="{ data }"> ${{ data.price }} </template>
</Column>
<Column field="status" header="Status">
<template #body="{ data }">
<Tag
:severity="
data.status === 'In Stock'
? 'success'
: data.status === 'Low Stock'
? 'warn'
: 'danger'
"
>
{{ data.status }}
</Tag>
</template>
</Column>
</DataTable>
</div>
</div>
</template>

View File

@ -1,9 +1,6 @@
<script setup> <script setup>
import mqtt from 'mqtt' import { ref, watch, onMounted } from 'vue'
import { ref, watch, onMounted, onUnmounted } from 'vue' import MQTTService from '../../services/mqtt.js'
import { inject } from 'vue'
const mqttRef = inject('mqtt')
const props = defineProps({ const props = defineProps({
deviceId: String, deviceId: String,
@ -13,6 +10,8 @@ import { ref, watch, onMounted, onUnmounted } from 'vue'
const filters = ref({}) const filters = ref({})
const filterMode = ref({ label: 'Lenient', value: 'lenient' }) const filterMode = ref({ label: 'Lenient', value: 'lenient' })
const mqtt = new MQTTService()
const changedKeys = ref({}) const changedKeys = ref({})
let propertyTree = {} let propertyTree = {}
@ -81,7 +80,7 @@ import { ref, watch, onMounted, onUnmounted } from 'vue'
) )
onMounted(() => { onMounted(() => {
mqttRef.value.subscribe(`device/${props.deviceId}/property/#`, (payload, topic) => { mqtt.subscribe('device/+/property/#', (payload, topic) => {
const parts = topic.split('/') const parts = topic.split('/')
const device = parts[1] const device = parts[1]
@ -92,10 +91,6 @@ import { ref, watch, onMounted, onUnmounted } from 'vue'
insertProperty(propertyPath, payload) insertProperty(propertyPath, payload)
}) })
}) })
onUnmounted(() => {
mqttRef.value.unsubscribe(`device/${props.deviceId}/property/#`)
})
</script> </script>
<template> <template>

View File

@ -0,0 +1,43 @@
<script setup>
const activities = [
{
icon: 'pi-shopping-cart',
text: 'New order #1123',
time: '2 minutes ago',
color: 'pink',
},
{
icon: 'pi-user-plus',
text: 'New customer registered',
time: '15 minutes ago',
color: 'green',
},
{
icon: 'pi-check-circle',
text: 'Payment processed',
time: '25 minutes ago',
color: 'blue',
},
{
icon: 'pi-inbox',
text: 'Inventory updated',
time: '40 minutes ago',
color: 'yellow',
},
]
</script>
<template>
<div class="layout-card col-item-2">
<span class="chart-title">Recent Activity</span>
<div class="activity-list">
<div v-for="(activity, index) in activities" :key="index" class="activity-item">
<i :class="['activity-icon', activity.color, 'pi', activity.icon]"></i>
<div class="activity-content">
<span class="activity-text">{{ activity.text }}</span>
<span class="activity-time">{{ activity.time }}</span>
</div>
</div>
</div>
</div>
</template>

View File

@ -0,0 +1,94 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
import { useLayout } from '../../composables/useLayout'
const { primary, surface, isDarkMode } = useLayout()
const chartData = ref(null)
const chartOptions = ref(null)
function setChartData() {
const documentStyle = getComputedStyle(document.documentElement)
return {
labels: ['Q1', 'Q2', 'Q3', 'Q4'],
datasets: [
{
type: 'bar',
label: 'Subscriptions',
backgroundColor: documentStyle.getPropertyValue('--p-primary-400'),
data: [4000, 10000, 15000, 4000],
barThickness: 32,
},
{
type: 'bar',
label: 'Advertising',
backgroundColor: documentStyle.getPropertyValue('--p-primary-300'),
data: [2100, 8400, 2400, 7500],
barThickness: 32,
},
{
type: 'bar',
label: 'Affiliate',
backgroundColor: documentStyle.getPropertyValue('--p-primary-200'),
data: [4100, 5200, 3400, 7400],
borderRadius: {
topLeft: 8,
topRight: 8,
},
barThickness: 32,
},
],
}
}
function setChartOptions() {
return {
maintainAspectRatio: false,
responsive: true,
plugins: {
legend: {
position: 'top',
},
},
scales: {
x: {
stacked: true,
grid: {
color: 'transparent',
borderColor: 'transparent',
},
},
y: {
stacked: true,
grid: {
color: 'transparent',
borderColor: 'transparent',
drawTicks: false,
},
},
},
}
}
watch([primary, surface, isDarkMode], () => {
chartData.value = setChartData()
chartOptions.value = setChartOptions()
})
onMounted(() => {
chartData.value = setChartData()
chartOptions.value = setChartOptions()
})
</script>
<template>
<div class="layout-card col-item-2">
<div class="chart-header">
<span class="chart-title">Sales Trend</span>
</div>
<div class="chart-content">
<Chart type="bar" :data="chartData" :options="chartOptions" style="height: 300px" />
</div>
</div>
</template>

View File

@ -0,0 +1,45 @@
<script setup>
const stats = [
{
title: 'Total Orders',
icon: 'pi-shopping-cart',
value: '1,234',
subtitle: 'Last 7 days',
},
{
title: 'Active Users',
icon: 'pi-users',
value: '2,573',
subtitle: 'Last 7 days',
},
{
title: 'Revenue',
icon: 'pi-dollar',
value: '$45,200',
subtitle: 'Last 7 days',
},
{
title: 'Success Rate',
icon: 'pi-chart-line',
value: '95%',
subtitle: 'Last 7 days',
},
]
</script>
<template>
<div class="stats">
<div v-for="(stat, index) in stats" :key="index" class="layout-card">
<div class="stats-header">
<span class="stats-title">{{ stat.title }}</span>
<span class="stats-icon-box">
<i :class="['pi', stat.icon]"></i>
</span>
</div>
<div class="stats-content">
<div class="stats-value">{{ stat.value }}</div>
<div class="stats-subtitle">{{ stat.subtitle }}</div>
</div>
</div>
</div>
</template>

View File

@ -1,79 +1,27 @@
import mqtt from 'mqtt' import mqtt from 'mqtt'
export default class MQTTService { export default class MQTTService {
/** constructor(brokerUrl = 'ws://127.0.0.1:8083/mqtt', clientId = null) {
* @param {string} brokerUrl - MQTT broker URL (e.g., ws://127.0.0.1:8083/mqtt)
* @param {string|null} clientId - Optional MQTT client ID
* @param {string|null} username - Optional MQTT username
* @param {string|null} password - Optional MQTT password
*/
constructor(
brokerUrl = 'ws://127.0.0.1:8083/mqtt',
username = null,
password = null,
clientId = null,
) {
this.clientId = clientId || 'vue-client-' + Math.random().toString(16).substr(2, 8) this.clientId = clientId || 'vue-client-' + Math.random().toString(16).substr(2, 8)
this.client = mqtt.connect(brokerUrl, { clientId: this.clientId, protocolVersion: 5 })
this.subscriptions = [] // array of {topic, callback}
// Connect options this.client.on('connect', () => console.log('Connected to MQTT broker'))
const options = {
clientId: this.clientId,
protocolVersion: 5,
}
if (username) options.username = username
if (password) options.password = password
this.client = mqtt.connect(brokerUrl, options)
this.subscriptions = [] // array of { topic, callback }
this.client.on('connect', () => {
console.log(`Connected to MQTT broker as ${this.clientId}`)
})
this.client.on('message', (topic, payload) => { this.client.on('message', (topic, payload) => {
const message = payload.toString() // iterate over subscriptions and check for matches
this.subscriptions.forEach(({ topic: subTopic, callback }) => { this.subscriptions.forEach(({ topic: subTopic, callback }) => {
if (mqttMatch(subTopic, topic)) { if (mqttMatch(subTopic, topic)) {
callback(message, topic) callback(payload.toString(), topic)
} }
}) })
}) })
} }
/**
* Subscribe to a topic
* @param {string} topic
* @param {function} callback
*/
subscribe(topic, callback) { subscribe(topic, callback) {
this.subscriptions.push({ topic, callback }) this.subscriptions.push({ topic, callback })
this.client.subscribe(topic) this.client.subscribe(topic)
} }
/**
* Unsubscribe from a topic
* @param {string} topic
* @param {function|null} callback - optional, remove only this callback
*/
unsubscribe(topic, callback = null) {
this.subscriptions = this.subscriptions.filter((sub) => {
if (sub.topic !== topic) return true
if (callback && sub.callback !== callback) return true
return false
})
// Actually tell the broker to unsubscribe only if no more callbacks exist for this topic
const stillSubscribed = this.subscriptions.some((sub) => sub.topic === topic)
if (!stillSubscribed) {
this.client.unsubscribe(topic)
}
}
/**
* Publish a message
* @param {string} topic
* @param {string|Buffer} message
* @param {object} options
*/
publish(topic, message, options = {}) { publish(topic, message, options = {}) {
this.client.publish(topic, message, options) this.client.publish(topic, message, options)
} }
@ -82,6 +30,6 @@ export default class MQTTService {
// helper function for MQTT wildcards // helper function for MQTT wildcards
function mqttMatch(subTopic, topic) { function mqttMatch(subTopic, topic) {
// replace MQTT wildcards with RegExp // replace MQTT wildcards with RegExp
const regex = '^' + subTopic.replace(/\+/g, '[^/]+').replace(/#/g, '.+') + '$' const regex = '^' + subTopic.replace('+', '[^/]+').replace('#', '.+') + '$'
return new RegExp(regex).test(topic) return new RegExp(regex).test(topic)
} }

View File

@ -9,10 +9,6 @@ services:
- 8084:8084 - 8084:8084
- 8883:8883 - 8883:8883
- 18083:18083 - 18083:18083
volumes:
- ./broker/emqx.conf:/opt/emqx/etc/emqx.conf:z
- ./broker/acl.conf:/opt/emqx/etc/acl.conf:z
- ./broker/passwd:/opt/emqx/etc/passwd:z
mediamtx: mediamtx:
container_name: mediamtx container_name: mediamtx

View File

@ -7,10 +7,9 @@ from dataclasses import dataclass
import aiohttp import aiohttp
from mqtthandler.command import command from mqtthandler.command import command
from mqtthandler.handler import MQTTHandler, task from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from prometheus_client.parser import text_string_to_metric_families from prometheus_client.parser import text_string_to_metric_families
@dataclass @dataclass
class MediaMTXConfig: class MediaMTXConfig:
host: str = "http://localhost" host: str = "http://localhost"
@ -22,10 +21,11 @@ class MediaMTXConfig:
class MediaMTXHandler(MQTTHandler): class MediaMTXHandler(MQTTHandler):
def __init__( def __init__(
self, self,
name: str, mqtt_config: MQTTConfig,
handler_id: str,
mediamtx_config: MediaMTXConfig, mediamtx_config: MediaMTXConfig,
): ):
super().__init__(name) super().__init__(mqtt_config, handler_id)
self.config = mediamtx_config self.config = mediamtx_config
@task @task
@ -33,14 +33,10 @@ class MediaMTXHandler(MQTTHandler):
cache = {} cache = {}
while True: while True:
auth = aiohttp.BasicAuth( auth = aiohttp.BasicAuth(login=self.config.username, password=self.config.password)
login=self.config.username, password=self.config.password
)
async with aiohttp.ClientSession(auth=auth) as session: async with aiohttp.ClientSession(auth=auth) as session:
while True: while True:
async with session.get( async with session.get(f"{self.config.host}{self.config.metrics_path}") as r:
f"{self.config.host}{self.config.metrics_path}"
) as r:
metrics = await r.text() metrics = await r.text()
for family in text_string_to_metric_families(metrics): for family in text_string_to_metric_families(metrics):
@ -51,9 +47,7 @@ class MediaMTXHandler(MQTTHandler):
cache[topic] = sample.value cache[topic] = sample.value
print(topic, sample.value) print(topic, sample.value)
await self.set_property( await self.set_property(topic, sample.value, 0, True)
topic, sample.value, qos=0, retain=True
)
await asyncio.sleep(1) await asyncio.sleep(1)
@ -62,9 +56,12 @@ class MediaMTXHandler(MQTTHandler):
async def main(): async def main():
handler = MediaMTXHandler("mediamtx", MediaMTXConfig()) handler_id = f"mediamtx-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = MediaMTXHandler(mqtt_config, handler_id, MediaMTXConfig())
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run("127.0.0.1", username="device", password="devicesecret") await handler.run()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -64,13 +64,3 @@ class CommandResponse:
def __str__(self): def __str__(self):
return json.dumps(asdict(self)) return json.dumps(asdict(self))
def enumerate_commands(obj: object):
commands = {}
for base in obj.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
commands[attr.name] = attr
return commands

View File

@ -4,147 +4,101 @@ import inspect
import paho import paho
import signal import signal
import json import json
import secrets from dataclasses import dataclass
import os
import socket
from pathlib import Path
from enum import Enum, auto
from .command import ( from .command import (
Command, Command,
CommandResponse, CommandResponse,
CommandArgumentError, CommandArgumentError,
CommandExecutionError, CommandExecutionError,
enumerate_commands,
) )
from .property import Property
@dataclass
def get_identifier(cache_path: Path) -> str: class MQTTConfig:
""" host: str
Determine an MQTT client ID using the following order: port: int = 1883
1. Environment variable IDENTIFIER username: str | None = None
2. Value stored in /tmp/<handler-name>.tmp password: str | None = None
3. Generate a new random ID using secrets.token_urlsafe keepalive: int = 60
The resulting client ID is written to /tmp/mqtt_client_id.tmp for future use.
"""
client_id = os.environ.get("IDENTIFIER", None)
if not client_id and cache_path.exists():
client_id = cache_path.read_text().strip()
elif not client_id:
client_id = generate_identifier()
cache_path.write_text(client_id)
return client_id
def generate_identifier() -> str:
return secrets.token_urlsafe(6)
class Status(Enum):
ONLINE = auto()
OFFLINE = auto()
class MQTTHandler: class MQTTHandler:
DEVICE = "device"
META = "meta"
PROPERTY = "property"
COMMAND = "command"
STATUS = "status" STATUS = "status"
def __init__(self, name: str): def __init__(
self.name = name self,
self.identifier = get_identifier(Path(f"/tmp/{self.name}.tmp")) mqtt_config: MQTTConfig,
handler_id: str,
):
self.handler_id = handler_id
self.mqtt_config = mqtt_config
self.topic_base = lambda: f"{MQTTHandler.DEVICE}/{self.identifier}" self.topic_base = f"device/{handler_id}"
self.meta_topic = lambda: f"{self.topic_base()}/{MQTTHandler.META}" self.command_topic = f"{self.topic_base}/command"
self.command_topic = lambda: f"{self.topic_base()}/{MQTTHandler.COMMAND}" self.property_topic = f"{self.topic_base}/property"
self.property_topic = lambda: f"{self.topic_base()}/{MQTTHandler.PROPERTY}"
self._shutdown_event = asyncio.Event() self._shutdown_event = asyncio.Event()
will = aiomqtt.Will(
self._mqtt_client = None topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True
self._commands = enumerate_commands(self)
self._properties = {}
self._meta = {}
async def set_property(self, name: str, value, **kwargs):
if name in self._properties:
await self._properties[name](value, **kwargs)
else:
# print(f"Warning: proeprty {name} is unregistered")
await self._publish(f"{self.property_topic()}/{name}", value, **kwargs)
async def register_property(
self, name: str, description: str | None = None, schema: dict | None = None
):
property = self._register_property(
f"{self.property_topic()}/{name}", description, schema
) )
self._properties[name] = property
async def _register_property( self.mqtt_client = aiomqtt.Client(
self, name: str, description: str | None = None, schema: dict | None = None self.mqtt_config.host,
): port=self.mqtt_config.port,
property = Property(name, description, schema, self._publish) identifier=handler_id,
data = { protocol=paho.mqtt.client.MQTTv5,
"schema": json.dumps(schema), will=will,
"description": description, )
}
for k, v in {k: v for k, v in data.items() if v is not None}.items():
await self._mqtt_client.publish(
f"{name}/${k}",
str(v),
qos=1,
retain=True,
)
return property self.commands = self.get_available_commands()
async def _publish(self, name: str, value, **kwargs): def get_available_commands(self):
await self._mqtt_client.publish(f"{name}", value, **kwargs) commands = {}
for base in self.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'")
commands[attr.name] = attr
async def _register_commands(self): return commands
for name, command in self._commands.items():
async def register_commands(self):
for name, command in self.commands.items():
for k, v in { for k, v in {
"schema": json.dumps(command.schema), "schema": json.dumps(command.schema),
"description": command.description, "description": command.description,
**command.additional_properties, **command.additional_properties,
}.items(): }.items():
await self._mqtt_client.publish( await self.mqtt_client.publish(
f"{self.command_topic()}/{command.name}/${k}", f"{self.command_topic}/{command.name}/${k}",
str(v), str(v),
qos=1, qos=1,
retain=True, retain=True,
) )
async def _announce(self): async def register_property(self, property: str, description: str | None = None, schema: dict | None = None):
# announce that we are online data = {
await self._register_commands() "schema": json.dumps(schema),
"description": description,
}
for k, v in {k:v for k,v in data.items() if v is not None}.items():
await self.mqtt_client.publish(
f"{self.property_topic}/{property}/${k}",
str(v),
qos=1,
retain=True,
)
self._meta[MQTTHandler.STATUS] = await self._register_property( async def set_property(self, property: str, value, qos=0, retain=False):
f"{self.meta_topic()}/{MQTTHandler.STATUS}", await self.mqtt_client.publish(
"Indicates the status of the device.", f"{self.property_topic}/{property}",
{"type": "string", "enum": list(Status.__members__.keys())}, str(value),
) qos=qos,
await self._meta[MQTTHandler.STATUS]( retain=retain,
self, json.dumps(Status.ONLINE.name), qos=1, retain=True
) )
await self._publish(f"{self.meta_topic()}/name", self.name, qos=1, retain=True) async def execute_command(
await self._publish(
f"{self.meta_topic()}/type", type(self).__name__, qos=1, retain=True
)
await self._publish(
f"{self.meta_topic()}/host", socket.gethostname(), qos=1, retain=True
)
async def _execute_command(
self, self,
command_name: str, command_name: str,
payload: str, payload: str,
@ -161,7 +115,7 @@ class MQTTHandler:
if hasattr(properties, "CorrelationData") if hasattr(properties, "CorrelationData")
else None else None
) )
await self._mqtt_client.publish( await self.mqtt_client.publish(
properties.ResponseTopic, properties.ResponseTopic,
str(CommandResponse(success, str(message), correlation)), str(CommandResponse(success, str(message), correlation)),
qos=1, qos=1,
@ -169,7 +123,7 @@ class MQTTHandler:
) )
try: try:
command = self._commands[command_name] command = self.commands[command_name]
result = await command(self, payload) result = await command(self, payload)
await respond(True, result) await respond(True, result)
except (CommandArgumentError, CommandExecutionError) as e: except (CommandArgumentError, CommandExecutionError) as e:
@ -178,53 +132,39 @@ class MQTTHandler:
print(f"Failed to execute command {command_name} with unknown cause: ", e) print(f"Failed to execute command {command_name} with unknown cause: ", e)
await respond(False, "Unexpected error") await respond(False, "Unexpected error")
async def _command_executor(self): async def mqtt_command_writer_task(self):
await self._mqtt_client.subscribe(f"{self.command_topic()}/+") async for message in self.mqtt_client.messages:
async for message in self._mqtt_client.messages:
topic = str(message.topic) topic = str(message.topic)
payload = message.payload.decode("utf-8") payload = message.payload.decode("utf-8")
if topic.startswith(self.command_topic()): if topic.startswith(self.command_topic):
command_name = topic.removeprefix(f"{self.command_topic()}/") command_name = topic.removeprefix(f"{self.command_topic}/")
await self._execute_command(command_name, payload, message.properties) await self.execute_command(command_name, payload, message.properties)
async def _shutdown_watcher(self): async def shutdown_watcher(self):
await self._shutdown_event.wait() await self._shutdown_event.wait()
await self._meta[MQTTHandler.STATUS]( await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True)
self, json.dumps(Status.OFFLINE.name), qos=1, retain=True
)
def stop(self): def stop(self):
self._shutdown_event.set() self._shutdown_event.set()
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
async def run(self, host: str, **kwargs): async def run(self):
INTERVAL = 5 INTERVAL = 5
will = aiomqtt.Will(
topic=f"{self.meta_topic()}/{MQTTHandler.STATUS}",
payload=json.dumps(Status.OFFLINE.name),
qos=1,
retain=True,
)
while True: while True:
try: try:
async with aiomqtt.Client( async with self.mqtt_client as client:
host, await client.subscribe(f"{self.command_topic}/+")
protocol=paho.mqtt.client.MQTTv5, await self.register_commands()
will=will,
identifier=self.identifier,
**kwargs,
) as client:
self._mqtt_client = client
tasks = [ # announce that we are online
self._command_executor(), await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True)
self._shutdown_watcher(), await self.register_property("status", "Indicates the status of the device.", {
self._announce(), "type": "string",
] "enum": ["ONLINE", "OFFLINE"]
})
tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()]
# Inspect instance methods # Inspect instance methods
for attr_name in dir(self): for attr_name in dir(self):
@ -239,12 +179,11 @@ class MQTTHandler:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
except aiomqtt.MqttError as e: except aiomqtt.MqttError as e:
print(f"MQTT connection error: {e}. Reconnecting in {INTERVAL}s...") print(
f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {INTERVAL}s..."
)
await asyncio.sleep(INTERVAL) await asyncio.sleep(INTERVAL)
finally:
self._mqtt_client = None
def task(func): def task(func):
"""Decorator to mark async methods for automatic gathering.""" """Decorator to mark async methods for automatic gathering."""

View File

@ -1,58 +0,0 @@
import json
import jsonschema
class PropertyValueError(ValueError):
pass
class Property:
"""
Presumes that the handler will take the same arguments as aiomqtt.Client.publish
ie: async publish(
topic: str,
payload: str | bytes | bytearray | int | float | None = None,
qos: int = 0,
retain: bool = False,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any
) None
"""
def __init__(
self,
name: str,
description: str | None = None,
schema: dict | None = None,
handler=None,
**kwargs,
):
self.name = name
self.description = description
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
def __call__(self, handler_instance, payload, **kwargs):
if self.handler is None:
raise NotImplementedError(f"No handler bound for property '{self.name}'")
try:
value = json.loads(payload)
if self.schema is not None:
jsonschema.validate(value, self.schema)
except json.decoder.JSONDecodeError as e:
raise PropertyValueError(
f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}"
)
except jsonschema.ValidationError as e:
raise PropertyValueError(f"Schema error in {e.json_path}: {e.message}")
try:
return self.handler(self.name, payload, **kwargs)
except Exception as e:
print("Failed to set property: ", e)
raise RuntimeError(f"Failed to set property.")

View File

@ -6,23 +6,24 @@ import asyncio
from dataclasses import dataclass from dataclasses import dataclass
from mqtthandler.command import command from mqtthandler.command import command
from mqtthandler.handler import MQTTHandler, task from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from streamer.fileradio import FileRadio from streamer.fileradio import FileRadio
class RadioHandler(MQTTHandler): class RadioHandler(MQTTHandler):
def __init__( def __init__(
self, self,
name: str, mqtt_config: MQTTConfig,
handler_id: str,
): ):
super().__init__(name) super().__init__(mqtt_config, handler_id)
self.radio = FileRadio("./data/StarWars60.mp3", name)
self.radio = FileRadio("./data/StarWars60.mp3", handler_id)
@task @task
async def publish_stream_path(self): async def publish_stream_path(self):
await self.set_property("path", self.radio.stream_path(), qos=1, retain=True) await self.set_property("path", self.radio.stream_path(), 1, True)
await self.set_property("file", self.radio.path, qos=1, retain=True) await self.set_property("file", self.radio.path, 1, True)
@command({"type": "object"}, "Start the radio stream.") @command({"type": "object"}, "Start the radio stream.")
async def start(self, args): async def start(self, args):
@ -46,11 +47,13 @@ class RadioHandler(MQTTHandler):
self.radio = FileRadio(args, self.radio.name) self.radio = FileRadio(args, self.radio.name)
await self.publish_stream_path() await self.publish_stream_path()
async def main(): async def main():
handler = RadioHandler("radio") handler_id = f"radio-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = RadioHandler(mqtt_config, handler_id)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run("127.0.0.1", username="device", password="devicesecret") await handler.run()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -21,9 +21,9 @@ class FileRadio(Streamer):
self.playback = subprocess.Popen( self.playback = subprocess.Popen(
[ [
"/usr/bin/ffmpeg", "/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag "-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream - "-stream_loop", # Loop the stream -
"-1", # ...indefinitely "-1", # ...indefinitely
"-i", "-i",
self.path, self.path,
"-c:a", "-c:a",

View File

@ -1,6 +1,5 @@
from threading import Thread from threading import Thread
def is_alive(subprocess): def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False return True if (subprocess and subprocess.poll() is None) else False

37
ubx.py
View File

@ -2,15 +2,17 @@
import asyncio import asyncio
import aioserial import aioserial
import aiomqtt
import pyubx2 import pyubx2
import io import io
import logging import logging
import aioserial import aioserial
import asyncio import asyncio
import socket
import signal import signal
from mqtthandler.command import command from mqtthandler.command import command
from mqtthandler.handler import MQTTHandler, task from mqtthandler.handler import MQTTConfig, MQTTHandler, task
# The pyubx2 library spams the console with errors that aren't errors. # The pyubx2 library spams the console with errors that aren't errors.
# I don't care if you failed to parse an incomplete buffer. # I don't care if you failed to parse an incomplete buffer.
@ -45,10 +47,11 @@ class UBXAsyncParser:
class UBXHandler(MQTTHandler): class UBXHandler(MQTTHandler):
def __init__( def __init__(
self, self,
name: str, mqtt_client: aiomqtt.Client,
handler_id: str,
serial_port: aioserial.AioSerial, serial_port: aioserial.AioSerial,
): ):
super().__init__(name) super().__init__(mqtt_client, handler_id)
self.serial_port = serial_port self.serial_port = serial_port
@task @task
@ -74,7 +77,7 @@ class UBXHandler(MQTTHandler):
continue continue
property = f"{message.identity}/{name}" property = f"{message.identity}/{name}"
await self.set_property(property, value, qos=0, retain=True) await self.set_property(property, value, 1, True)
else: else:
# print("Unexpected response:", message) # print("Unexpected response:", message)
pass pass
@ -109,7 +112,7 @@ class UBXHandler(MQTTHandler):
460800, 460800,
921600, 921600,
], ],
"default": 9600, "default": 9600
}, },
}, },
"required": ["portID", "baudRate"], "required": ["portID", "baudRate"],
@ -144,7 +147,7 @@ class UBXHandler(MQTTHandler):
"minimum": 1, "minimum": 1,
"maximum": 127, "maximum": 127,
"description": "Number of measurement cycles per navigation solution", "description": "Number of measurement cycles per navigation solution",
"default": 1, "default": 1
}, },
"timeRef": { "timeRef": {
"type": "integer", "type": "integer",
@ -171,20 +174,20 @@ class UBXHandler(MQTTHandler):
async def main(): async def main():
handler = UBXHandler( handler_id = f"example-gps-{socket.gethostname()}"
"example-gps", mqtt_config = MQTTConfig(host="127.0.0.1", port=1883)
aioserial.AioSerial(
port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
),
)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run( serial_port = aioserial.AioSerial(
"127.0.0.1", port=1883, username="device", password="devicesecret" port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
) )
handler = UBXHandler(mqtt_config, handler_id, serial_port)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())