Compare commits

...

5 Commits

24 changed files with 661 additions and 540 deletions

49
broker/acl.conf Normal file
View File

@ -0,0 +1,49 @@
%%--------------------------------------------------------------------
%% EMQX ACL configuration
%%--------------------------------------------------------------------
%% =========================
%% Device user permissions
%% =========================
%% Devices can publish ONLY to their own namespace
{allow, {user, "device"}, publish, ["device/${clientid}/meta/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/property/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/command/#"]}.
%% Devices can receive commands
{allow, {user, "device"}, subscribe, ["device/${clientid}/command/#"]}.
%% =========================
%% Authenticated users
%% =========================
%% Any authenticated user can read all device topics
{allow, all, subscribe, ["device/+/meta/#"]}.
{allow, all, subscribe, ["device/+/property/#"]}.
{allow, all, subscribe, ["device/+/command/#"]}.
%% Any authenticated user can publish commands to any device
{allow, all, publish, ["device/+/command/+"]}.
%% =========================
%% Response topic mechanism
%% =========================
%% Clients can SUBSCRIBE to their own response inbox
{allow, all, subscribe, ["client/${clientid}/responses/#"]}.
%% Authenticated users can PUBLISH to any client response inbox
{allow, all, publish, ["client/+/responses/#"]}.
%% (No subscribe permission for others -> enforced by default deny)
%% =========================
%% Default deny
%% =========================
{deny, all}.

81
broker/emqx.conf Normal file
View File

@ -0,0 +1,81 @@
## Place read-only configurations in this file.
## To define configurations that can later be overridden through UI/API/CLI, add them to `etc/base.hocon`.
##
## Config precedence order:
## etc/base.hocon < cluster.hocon < emqx.conf < environment variables
##
## See https://docs.emqx.com/en/enterprise/latest/configuration/configuration.html for more information.
## Configuration full example can be found in etc/examples
node {
name = "emqx@127.0.0.1"
cookie = "emqx50elixir"
data_dir = "data"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
dashboard {
listeners {
http.bind = 18083
# https.bind = 18084
https {
ssl_options {
certfile = "${EMQX_ETC_DIR}/certs/cert.pem"
keyfile = "${EMQX_ETC_DIR}/certs/key.pem"
}
}
}
}
##--------------------------------------------------------------------
## Authentication
##--------------------------------------------------------------------
## Load users from file
authn {
enable = true
sources = [
{
type = file
path = "etc/passwd"
password_hash_algorithm {
name = plain
}
}
]
}
##--------------------------------------------------------------------
## Authorization (ACL)
##--------------------------------------------------------------------
authorization {
sources = [
{
type = file
path = "etc/acl.conf"
}
]
no_match = deny
}
##--------------------------------------------------------------------
## Anonymous access
##--------------------------------------------------------------------
allow_anonymous = true
##--------------------------------------------------------------------
## Listener (basic)
##--------------------------------------------------------------------
listeners.tcp.default {
bind = "0.0.0.0:1883"
}

3
broker/passwd Normal file
View File

@ -0,0 +1,3 @@
device:devicesecret
alice:alicepass
bob:bobpass

View File

@ -1,47 +1,59 @@
<script setup> <script setup>
import { ref, provide, onMounted } from 'vue'
import { useLayout } from './composables/useLayout'
import { useToast } from 'primevue/usetoast'
const toast = useToast()
const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } =
useLayout()
import MQTTService from './services/mqtt.js'
import AppTopbar from './components/AppTopbar.vue' import AppTopbar from './components/AppTopbar.vue'
import AppFooter from './components/AppFooter.vue' import AppFooter from './components/AppFooter.vue'
import StatsWidget from './components/dashboard/StatsWidget.vue' import DevicesWidget from './components/dashboard/DevicesWidget.vue'
import SalesTrendWidget from './components/dashboard/SalesTrendWidget.vue'
import RecentActivityWidget from './components/dashboard/RecentActivityWidget.vue'
import ProductOverviewWidget from './components/dashboard/ProductOverviewWidget.vue'
import PropertiesWidget from './components/dashboard/PropertiesWidget.vue' import PropertiesWidget from './components/dashboard/PropertiesWidget.vue'
import CommandsWidget from './components/dashboard/CommandsWidget.vue' import CommandsWidget from './components/dashboard/CommandsWidget.vue'
import { ref } from 'vue' const mqtt = ref(null)
import DevicesWidget from './components/dashboard/DevicesWidget.vue' const loggedIn = ref(false)
const selectedDevice = ref(null) const selectedDevice = ref(null)
function handleLogin({ username, password }) {
// Create the shared MQTT client
mqtt.value = new MQTTService('ws://127.0.0.1:8083/mqtt', username, password)
loggedIn.value = true
}
onMounted(() => {
if(isDarkMode !== false) toggleDarkMode()
})
provide('mqtt', mqtt)
</script> </script>
<template> <template>
<Toast position="bottom-right" /> <Toast position="bottom-right" />
<div class="layout-container"> <div id="app">
<AppTopbar /> <SplashPage v-if="!loggedIn" @login="handleLogin" />
<div class="layout-grid"> <div v-else class="layout-container">
<DevicesWidget @select="selectedDevice = $event" /> <AppTopbar />
<div v-if="selectedDevice" class="layout-grid-row"> <div class="layout-grid">
<PropertiesWidget <DevicesWidget @select="selectedDevice = $event" />
v-if="selectedDevice" <div v-if="selectedDevice" class="layout-grid-row">
:key="'props-' + selectedDevice" <PropertiesWidget
:device-id="selectedDevice" v-if="selectedDevice"
/> :key="'props-' + selectedDevice"
:device-id="selectedDevice"
<CommandsWidget />
v-if="selectedDevice" <CommandsWidget
:key="'cmds-' + selectedDevice" v-if="selectedDevice"
:device-id="selectedDevice" :key="'cmds-' + selectedDevice"
/> :device-id="selectedDevice"
/>
</div>
</div> </div>
<!-- <StatsWidget /> --> <AppFooter />
<!-- <div class="layout-grid-row">
<SalesTrendWidget />
<RecentActivityWidget />
</div>-->
<!-- <ProductOverviewWidget /> -->
</div> </div>
<AppFooter />
</div> </div>
</template> </template>

View File

@ -97,6 +97,18 @@ body {
color: var(--p-primary-200); color: var(--p-primary-200);
} }
.offline {
color: color-mix(in srgb, red, transparent 40%);
border-color: color-mix(in srgb, red, transparent 70%);
background-color: color-mix(in srgb, red, transparent 80%);
}
.p-dark .offline {
color: color-mix(in srgb, red, transparent 40%);
border-color: color-mix(in srgb, red, transparent 70%);
background-color: color-mix(in srgb, red, transparent 90%);
}
.stats-header { .stats-header {
display: flex; display: flex;
align-items: flex-start; align-items: flex-start;

View File

@ -4,10 +4,6 @@
const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } = const { primaryColors, surfaces, primary, surface, isDarkMode, updateColors, toggleDarkMode } =
useLayout() useLayout()
onMounted(() => {
toggleDarkMode()
})
</script> </script>
<template> <template>

View File

@ -0,0 +1,60 @@
<script setup>
import { ref } from 'vue'
const emit = defineEmits(['login'])
const username = ref('')
const password = ref('')
function submitLogin() {
if (username.value && password.value) {
emit('login', { username: username.value, password: password.value })
} else {
alert('Please enter username and password')
}
}
</script>
<template>
<div class="splash-page">
<div class="login-card">
<h2>MQTT Login</h2>
<input v-model="username" placeholder="Username" />
<input v-model="password" type="password" placeholder="Password" />
<Button @click="submitLogin">Login</Button>
</div>
</div>
</template>
<style scoped>
.splash-page {
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
background-color: var(--p-primary-color-light);
}
.login-card {
padding: 2rem;
border-radius: 12px;
box-shadow: 0 0 12px rgba(0, 0, 0, 0.15);
width: 300px;
text-align: center;
}
.login-card input {
display: block;
width: 100%;
margin: 0.5rem 0;
padding: 0.5rem;
border-radius: 6px;
border: 1px solid #ccc;
}
.login-card button {
margin-top: 1rem;
width: 100%;
padding: 0.5rem;
}
</style>

View File

@ -1,6 +1,8 @@
<script setup> <script setup>
import { ref, reactive, onMounted } from 'vue' import { ref, reactive, onMounted, onUnmounted } from 'vue'
import MQTTService from '../../services/mqtt.js' import { inject } from 'vue'
const mqttRef = inject('mqtt')
import Accordion from 'primevue/accordion' import Accordion from 'primevue/accordion'
import AccordionPanel from 'primevue/accordionpanel' import AccordionPanel from 'primevue/accordionpanel'
@ -25,8 +27,6 @@
deviceId: String, deviceId: String,
}) })
const mqtt2 = new MQTTService()
const filters = ref({}) const filters = ref({})
const commands = ref([]) const commands = ref([])
const commandMap = reactive({}) const commandMap = reactive({})
@ -65,10 +65,10 @@
var responseId = generateId() var responseId = generateId()
commandByResponseId[responseId] = cmd commandByResponseId[responseId] = cmd
mqtt2.publish(topic, payload || '{}', { mqttRef.value.publish(topic, payload || '{}', {
qos: 1, qos: 1,
properties: { properties: {
responseTopic: `client/${mqtt2.clientId}/responses`, responseTopic: `client/${mqttRef.value.clientId}/responses`,
correlationData: new TextEncoder().encode(responseId), correlationData: new TextEncoder().encode(responseId),
}, },
}) })
@ -93,7 +93,7 @@
} }
onMounted(() => { onMounted(() => {
mqtt2.subscribe('device/+/command/#', (payload, topic) => { mqttRef.value.subscribe(`device/${props.deviceId}/command/#`, (payload, topic) => {
const parts = topic.split('/') const parts = topic.split('/')
const device = parts[1] const device = parts[1]
@ -102,6 +102,8 @@
const command = parts[3] const command = parts[3]
const field = parts[4] const field = parts[4]
console.log(topic, payload)
if (field === '$description') { if (field === '$description') {
updateCommand(device, command, 'description', payload) updateCommand(device, command, 'description', payload)
} }
@ -111,7 +113,7 @@
} }
}) })
mqtt2.subscribe(`client/${mqtt2.clientId}/responses`, (payload, topic) => { mqttRef.value.subscribe(`client/${mqttRef.value.clientId}/responses`, (payload, topic) => {
let response = JSON.parse(payload) let response = JSON.parse(payload)
const responseId = response.correlation const responseId = response.correlation
@ -137,6 +139,11 @@
}) })
}) })
onUnmounted(() => {
mqttRef.value.unsubscribe(`device/${props.deviceId}/command/#`)
mqttRef.value.unsubscribe(`client/${mqttRef.value.clientId}/responses`)
})
const jsonFormsConfig = { const jsonFormsConfig = {
validationMode: 'ValidateOnTouched', validationMode: 'ValidateOnTouched',
showAllErrors: false, showAllErrors: false,

View File

@ -1,93 +1,95 @@
<script setup> <script setup>
import { ref, onMounted } from 'vue' import { ref, onMounted } from 'vue'
import MQTTService from '../../services/mqtt.js' import { inject } from 'vue'
const emit = defineEmits(['select']) const mqttRef = inject('mqtt')
const emit = defineEmits(['select'])
const mqtt = new MQTTService() const devices = ref([])
const deviceMap = ref({}) // map deviceId => device object with all meta properties
const selected = ref(null)
const devices = ref([]) // Add or update a device's meta property
const deviceSet = new Set() function upsertDevice(deviceId, property, value) {
const selected = ref(null) if (!deviceMap.value[deviceId]) {
deviceMap.value[deviceId] = {
function upsertDevice(device, status) { id: deviceId,
const existing = devices.value.find((d) => d.id === device) meta: {}, // stores all meta properties
title: deviceId,
if (existing) { value: '', // main status display
// update reactively subtitle: 'MQTT Device',
existing.value = status icon: 'pi-box',
} else {
deviceSet.add(device)
devices.value = [
...devices.value,
{
id: device,
title: device,
value: status,
subtitle: 'MQTT Device',
icon: status === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle',
},
]
} }
} }
function selectDevice(device) {
selected.value = device.id // update the property
emit('select', device.id) deviceMap.value[deviceId].meta[property] = value
// update card display values (example: show 'status' as main value)
if (property === 'status') {
deviceMap.value[deviceId].value = value.replace(/['"]+/g, '')
deviceMap.value[deviceId].icon = deviceMap.value[deviceId].value === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle'
} }
onMounted(() => { // rebuild reactive array for rendering
mqtt.subscribe('device/+/property/status', (payload, topic) => { devices.value = Object.values(deviceMap.value)
const parts = topic.split('/') }
const device = parts[1]
upsertDevice(device, payload) function selectDevice(device) {
}) selected.value = device.id
emit('select', device.id)
}
onMounted(() => {
mqttRef.value.subscribe('device/+/meta/+', (payload, topic) => {
const parts = topic.split('/')
const deviceId = parts[1]
const property = parts.slice(3).join('/') // handles nested properties if any
upsertDevice(deviceId, property, payload)
}) })
})
</script> </script>
<template> <template>
<div class="layout-grid-row"> <div class="layout-grid-row">
<div <div
v-for="(device, index) in devices" v-for="device in devices"
:key="device.id" :key="device.id"
class="layout-card device-card" class="layout-card device-card"
:class="{ 'selected-device': device.id === selected }" :class="{ 'selected-device': device.id === selected }"
@click="selectDevice(device)" @click="selectDevice(device)"
> >
<div class="stats-header"> <div class="stats-header">
<span class="stats-title">{{ device.title }}</span> <span class="stats-title">{{ device.meta['type'] }}</span>
<span class="stats-icon-box"> <span :class="[ device.value === 'OFFLINE' ? 'offline' : '', 'stats-icon-box']">
<i <i
:class="[ :class="['pi', device.icon]"
'pi', :style="{ color: device.value === 'ONLINE' ? 'inherit' : 'var(--p-secondary-color)' }"
device.value === 'ONLINE' ? 'pi-check-circle' : 'pi-times-circle',
device.value === 'ONLINE' ? '' : 'text-red-500',
]"
></i> ></i>
</span> </span>
</div> </div>
<div class="stats-content"> <div class="stats-content">
<div class="stats-value">{{ device.value }}</div> <div class="stats-value">{{ device.meta['name'] }}</div>
<div class="stats-subtitle">{{ device.subtitle }}</div> <div class="stats-subtitle"><span class='italic'>{{ device.id }}</span> on <span class="font-bold">{{ device.meta['host'] }}</span></div>
<div class="stats-subtitle italic text-green-500"></div>
</div> </div>
</div> </div>
</div> </div>
</template> </template>
<style scoped> <style scoped>
.device-card { .device-card {
cursor: pointer; cursor: pointer;
transition: all 0.2s ease; transition: all 0.2s ease;
} }
.device-card:hover { .device-card:hover {
transform: translateY(-2px); transform: translateY(-2px);
} }
/* 🔥 selected state */ .selected-device {
.selected-device { border: 2px solid var(--p-primary-color);
border: 2px solid var(--p-primary-color); box-shadow: 0 0 8px var(--p-primary-color);
box-shadow: 0 0 8px var(--p-primary-color); }
}
</style> </style>

View File

@ -1,107 +0,0 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
const products = ref([
{
name: 'Laptop Pro',
category: 'Electronics',
price: 2499,
status: 'In Stock',
},
{
name: 'Wireless Mouse',
category: 'Accessories',
price: 49,
status: 'Low Stock',
},
{
name: 'Monitor 4K',
category: 'Electronics',
price: 699,
status: 'Out of Stock',
},
{ name: 'Keyboard', category: 'Accessories', price: 149, status: 'In Stock' },
])
const selectedProduct = ref(null)
const searchQuery = ref('')
const loading = ref(false)
const filteredProducts = ref([])
const searchProducts = () => {
loading.value = true
filteredProducts.value = products.value.filter(
(product) =>
product.name.toLowerCase().includes(searchQuery.value.toLowerCase()) ||
product.category.toLowerCase().includes(searchQuery.value.toLowerCase()) ||
product.status.toLowerCase().includes(searchQuery.value.toLowerCase()),
)
setTimeout(() => {
loading.value = false
}, 300)
}
watch(searchQuery, () => {
searchProducts()
})
onMounted(() => {
filteredProducts.value = [...products.value]
})
</script>
<template>
<div class="layout-card">
<div class="products-header">
<span class="products-title">Products Overview</span>
<IconField class="search-field">
<InputIcon class="pi pi-search" />
<InputText
v-model="searchQuery"
placeholder="Search products..."
class="products-search"
@keyup.enter="searchProducts"
/>
</IconField>
</div>
<div class="products-table-container">
<DataTable
:value="filteredProducts"
v-model:selection="selectedProduct"
selectionMode="single"
:loading="loading"
:rows="5"
class="products-table"
:pt="{
mask: {
class: 'products-table-mask',
},
loadingIcon: {
class: 'products-table-loading',
},
}"
>
<Column field="name" header="Name" sortable></Column>
<Column field="category" header="Category" sortable></Column>
<Column field="price" header="Price" sortable>
<template #body="{ data }"> ${{ data.price }} </template>
</Column>
<Column field="status" header="Status">
<template #body="{ data }">
<Tag
:severity="
data.status === 'In Stock'
? 'success'
: data.status === 'Low Stock'
? 'warn'
: 'danger'
"
>
{{ data.status }}
</Tag>
</template>
</Column>
</DataTable>
</div>
</div>
</template>

View File

@ -1,6 +1,9 @@
<script setup> <script setup>
import { ref, watch, onMounted } from 'vue' import mqtt from 'mqtt'
import MQTTService from '../../services/mqtt.js' import { ref, watch, onMounted, onUnmounted } from 'vue'
import { inject } from 'vue'
const mqttRef = inject('mqtt')
const props = defineProps({ const props = defineProps({
deviceId: String, deviceId: String,
@ -10,8 +13,6 @@
const filters = ref({}) const filters = ref({})
const filterMode = ref({ label: 'Lenient', value: 'lenient' }) const filterMode = ref({ label: 'Lenient', value: 'lenient' })
const mqtt = new MQTTService()
const changedKeys = ref({}) const changedKeys = ref({})
let propertyTree = {} let propertyTree = {}
@ -80,7 +81,7 @@
) )
onMounted(() => { onMounted(() => {
mqtt.subscribe('device/+/property/#', (payload, topic) => { mqttRef.value.subscribe(`device/${props.deviceId}/property/#`, (payload, topic) => {
const parts = topic.split('/') const parts = topic.split('/')
const device = parts[1] const device = parts[1]
@ -91,6 +92,10 @@
insertProperty(propertyPath, payload) insertProperty(propertyPath, payload)
}) })
}) })
onUnmounted(() => {
mqttRef.value.unsubscribe(`device/${props.deviceId}/property/#`)
})
</script> </script>
<template> <template>

View File

@ -1,43 +0,0 @@
<script setup>
const activities = [
{
icon: 'pi-shopping-cart',
text: 'New order #1123',
time: '2 minutes ago',
color: 'pink',
},
{
icon: 'pi-user-plus',
text: 'New customer registered',
time: '15 minutes ago',
color: 'green',
},
{
icon: 'pi-check-circle',
text: 'Payment processed',
time: '25 minutes ago',
color: 'blue',
},
{
icon: 'pi-inbox',
text: 'Inventory updated',
time: '40 minutes ago',
color: 'yellow',
},
]
</script>
<template>
<div class="layout-card col-item-2">
<span class="chart-title">Recent Activity</span>
<div class="activity-list">
<div v-for="(activity, index) in activities" :key="index" class="activity-item">
<i :class="['activity-icon', activity.color, 'pi', activity.icon]"></i>
<div class="activity-content">
<span class="activity-text">{{ activity.text }}</span>
<span class="activity-time">{{ activity.time }}</span>
</div>
</div>
</div>
</div>
</template>

View File

@ -1,94 +0,0 @@
<script setup>
import { ref, watch, onMounted } from 'vue'
import { useLayout } from '../../composables/useLayout'
const { primary, surface, isDarkMode } = useLayout()
const chartData = ref(null)
const chartOptions = ref(null)
function setChartData() {
const documentStyle = getComputedStyle(document.documentElement)
return {
labels: ['Q1', 'Q2', 'Q3', 'Q4'],
datasets: [
{
type: 'bar',
label: 'Subscriptions',
backgroundColor: documentStyle.getPropertyValue('--p-primary-400'),
data: [4000, 10000, 15000, 4000],
barThickness: 32,
},
{
type: 'bar',
label: 'Advertising',
backgroundColor: documentStyle.getPropertyValue('--p-primary-300'),
data: [2100, 8400, 2400, 7500],
barThickness: 32,
},
{
type: 'bar',
label: 'Affiliate',
backgroundColor: documentStyle.getPropertyValue('--p-primary-200'),
data: [4100, 5200, 3400, 7400],
borderRadius: {
topLeft: 8,
topRight: 8,
},
barThickness: 32,
},
],
}
}
function setChartOptions() {
return {
maintainAspectRatio: false,
responsive: true,
plugins: {
legend: {
position: 'top',
},
},
scales: {
x: {
stacked: true,
grid: {
color: 'transparent',
borderColor: 'transparent',
},
},
y: {
stacked: true,
grid: {
color: 'transparent',
borderColor: 'transparent',
drawTicks: false,
},
},
},
}
}
watch([primary, surface, isDarkMode], () => {
chartData.value = setChartData()
chartOptions.value = setChartOptions()
})
onMounted(() => {
chartData.value = setChartData()
chartOptions.value = setChartOptions()
})
</script>
<template>
<div class="layout-card col-item-2">
<div class="chart-header">
<span class="chart-title">Sales Trend</span>
</div>
<div class="chart-content">
<Chart type="bar" :data="chartData" :options="chartOptions" style="height: 300px" />
</div>
</div>
</template>

View File

@ -1,45 +0,0 @@
<script setup>
const stats = [
{
title: 'Total Orders',
icon: 'pi-shopping-cart',
value: '1,234',
subtitle: 'Last 7 days',
},
{
title: 'Active Users',
icon: 'pi-users',
value: '2,573',
subtitle: 'Last 7 days',
},
{
title: 'Revenue',
icon: 'pi-dollar',
value: '$45,200',
subtitle: 'Last 7 days',
},
{
title: 'Success Rate',
icon: 'pi-chart-line',
value: '95%',
subtitle: 'Last 7 days',
},
]
</script>
<template>
<div class="stats">
<div v-for="(stat, index) in stats" :key="index" class="layout-card">
<div class="stats-header">
<span class="stats-title">{{ stat.title }}</span>
<span class="stats-icon-box">
<i :class="['pi', stat.icon]"></i>
</span>
</div>
<div class="stats-content">
<div class="stats-value">{{ stat.value }}</div>
<div class="stats-subtitle">{{ stat.subtitle }}</div>
</div>
</div>
</div>
</template>

View File

@ -1,27 +1,79 @@
import mqtt from 'mqtt' import mqtt from 'mqtt'
export default class MQTTService { export default class MQTTService {
constructor(brokerUrl = 'ws://127.0.0.1:8083/mqtt', clientId = null) { /**
* @param {string} brokerUrl - MQTT broker URL (e.g., ws://127.0.0.1:8083/mqtt)
* @param {string|null} clientId - Optional MQTT client ID
* @param {string|null} username - Optional MQTT username
* @param {string|null} password - Optional MQTT password
*/
constructor(
brokerUrl = 'ws://127.0.0.1:8083/mqtt',
username = null,
password = null,
clientId = null,
) {
this.clientId = clientId || 'vue-client-' + Math.random().toString(16).substr(2, 8) this.clientId = clientId || 'vue-client-' + Math.random().toString(16).substr(2, 8)
this.client = mqtt.connect(brokerUrl, { clientId: this.clientId, protocolVersion: 5 })
this.subscriptions = [] // array of {topic, callback}
this.client.on('connect', () => console.log('Connected to MQTT broker')) // Connect options
const options = {
clientId: this.clientId,
protocolVersion: 5,
}
if (username) options.username = username
if (password) options.password = password
this.client = mqtt.connect(brokerUrl, options)
this.subscriptions = [] // array of { topic, callback }
this.client.on('connect', () => {
console.log(`Connected to MQTT broker as ${this.clientId}`)
})
this.client.on('message', (topic, payload) => { this.client.on('message', (topic, payload) => {
// iterate over subscriptions and check for matches const message = payload.toString()
this.subscriptions.forEach(({ topic: subTopic, callback }) => { this.subscriptions.forEach(({ topic: subTopic, callback }) => {
if (mqttMatch(subTopic, topic)) { if (mqttMatch(subTopic, topic)) {
callback(payload.toString(), topic) callback(message, topic)
} }
}) })
}) })
} }
/**
* Subscribe to a topic
* @param {string} topic
* @param {function} callback
*/
subscribe(topic, callback) { subscribe(topic, callback) {
this.subscriptions.push({ topic, callback }) this.subscriptions.push({ topic, callback })
this.client.subscribe(topic) this.client.subscribe(topic)
} }
/**
* Unsubscribe from a topic
* @param {string} topic
* @param {function|null} callback - optional, remove only this callback
*/
unsubscribe(topic, callback = null) {
this.subscriptions = this.subscriptions.filter((sub) => {
if (sub.topic !== topic) return true
if (callback && sub.callback !== callback) return true
return false
})
// Actually tell the broker to unsubscribe only if no more callbacks exist for this topic
const stillSubscribed = this.subscriptions.some((sub) => sub.topic === topic)
if (!stillSubscribed) {
this.client.unsubscribe(topic)
}
}
/**
* Publish a message
* @param {string} topic
* @param {string|Buffer} message
* @param {object} options
*/
publish(topic, message, options = {}) { publish(topic, message, options = {}) {
this.client.publish(topic, message, options) this.client.publish(topic, message, options)
} }
@ -30,6 +82,6 @@ export default class MQTTService {
// helper function for MQTT wildcards // helper function for MQTT wildcards
function mqttMatch(subTopic, topic) { function mqttMatch(subTopic, topic) {
// replace MQTT wildcards with RegExp // replace MQTT wildcards with RegExp
const regex = '^' + subTopic.replace('+', '[^/]+').replace('#', '.+') + '$' const regex = '^' + subTopic.replace(/\+/g, '[^/]+').replace(/#/g, '.+') + '$'
return new RegExp(regex).test(topic) return new RegExp(regex).test(topic)
} }

View File

@ -9,6 +9,10 @@ services:
- 8084:8084 - 8084:8084
- 8883:8883 - 8883:8883
- 18083:18083 - 18083:18083
volumes:
- ./broker/emqx.conf:/opt/emqx/etc/emqx.conf:z
- ./broker/acl.conf:/opt/emqx/etc/acl.conf:z
- ./broker/passwd:/opt/emqx/etc/passwd:z
mediamtx: mediamtx:
container_name: mediamtx container_name: mediamtx

View File

@ -7,9 +7,10 @@ from dataclasses import dataclass
import aiohttp import aiohttp
from mqtthandler.command import command from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task from mqtthandler.handler import MQTTHandler, task
from prometheus_client.parser import text_string_to_metric_families from prometheus_client.parser import text_string_to_metric_families
@dataclass @dataclass
class MediaMTXConfig: class MediaMTXConfig:
host: str = "http://localhost" host: str = "http://localhost"
@ -21,11 +22,10 @@ class MediaMTXConfig:
class MediaMTXHandler(MQTTHandler): class MediaMTXHandler(MQTTHandler):
def __init__( def __init__(
self, self,
mqtt_config: MQTTConfig, name: str,
handler_id: str,
mediamtx_config: MediaMTXConfig, mediamtx_config: MediaMTXConfig,
): ):
super().__init__(mqtt_config, handler_id) super().__init__(name)
self.config = mediamtx_config self.config = mediamtx_config
@task @task
@ -33,10 +33,14 @@ class MediaMTXHandler(MQTTHandler):
cache = {} cache = {}
while True: while True:
auth = aiohttp.BasicAuth(login=self.config.username, password=self.config.password) auth = aiohttp.BasicAuth(
login=self.config.username, password=self.config.password
)
async with aiohttp.ClientSession(auth=auth) as session: async with aiohttp.ClientSession(auth=auth) as session:
while True: while True:
async with session.get(f"{self.config.host}{self.config.metrics_path}") as r: async with session.get(
f"{self.config.host}{self.config.metrics_path}"
) as r:
metrics = await r.text() metrics = await r.text()
for family in text_string_to_metric_families(metrics): for family in text_string_to_metric_families(metrics):
@ -47,7 +51,9 @@ class MediaMTXHandler(MQTTHandler):
cache[topic] = sample.value cache[topic] = sample.value
print(topic, sample.value) print(topic, sample.value)
await self.set_property(topic, sample.value, 0, True) await self.set_property(
topic, sample.value, qos=0, retain=True
)
await asyncio.sleep(1) await asyncio.sleep(1)
@ -56,12 +62,9 @@ class MediaMTXHandler(MQTTHandler):
async def main(): async def main():
handler_id = f"mediamtx-{socket.gethostname()}" handler = MediaMTXHandler("mediamtx", MediaMTXConfig())
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = MediaMTXHandler(mqtt_config, handler_id, MediaMTXConfig())
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run() await handler.run("127.0.0.1", username="device", password="devicesecret")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -64,3 +64,13 @@ class CommandResponse:
def __str__(self): def __str__(self):
return json.dumps(asdict(self)) return json.dumps(asdict(self))
def enumerate_commands(obj: object):
commands = {}
for base in obj.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
commands[attr.name] = attr
return commands

View File

@ -4,101 +4,147 @@ import inspect
import paho import paho
import signal import signal
import json import json
from dataclasses import dataclass import secrets
import os
import socket
from pathlib import Path
from enum import Enum, auto
from .command import ( from .command import (
Command, Command,
CommandResponse, CommandResponse,
CommandArgumentError, CommandArgumentError,
CommandExecutionError, CommandExecutionError,
enumerate_commands,
) )
from .property import Property
@dataclass
class MQTTConfig: def get_identifier(cache_path: Path) -> str:
host: str """
port: int = 1883 Determine an MQTT client ID using the following order:
username: str | None = None 1. Environment variable IDENTIFIER
password: str | None = None 2. Value stored in /tmp/<handler-name>.tmp
keepalive: int = 60 3. Generate a new random ID using secrets.token_urlsafe
The resulting client ID is written to /tmp/mqtt_client_id.tmp for future use.
"""
client_id = os.environ.get("IDENTIFIER", None)
if not client_id and cache_path.exists():
client_id = cache_path.read_text().strip()
elif not client_id:
client_id = generate_identifier()
cache_path.write_text(client_id)
return client_id
def generate_identifier() -> str:
return secrets.token_urlsafe(6)
class Status(Enum):
ONLINE = auto()
OFFLINE = auto()
class MQTTHandler: class MQTTHandler:
DEVICE = "device"
META = "meta"
PROPERTY = "property"
COMMAND = "command"
STATUS = "status" STATUS = "status"
def __init__( def __init__(self, name: str):
self, self.name = name
mqtt_config: MQTTConfig, self.identifier = get_identifier(Path(f"/tmp/{self.name}.tmp"))
handler_id: str,
):
self.handler_id = handler_id
self.mqtt_config = mqtt_config
self.topic_base = f"device/{handler_id}" self.topic_base = lambda: f"{MQTTHandler.DEVICE}/{self.identifier}"
self.command_topic = f"{self.topic_base}/command" self.meta_topic = lambda: f"{self.topic_base()}/{MQTTHandler.META}"
self.property_topic = f"{self.topic_base}/property" self.command_topic = lambda: f"{self.topic_base()}/{MQTTHandler.COMMAND}"
self.property_topic = lambda: f"{self.topic_base()}/{MQTTHandler.PROPERTY}"
self._shutdown_event = asyncio.Event() self._shutdown_event = asyncio.Event()
will = aiomqtt.Will(
topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True self._mqtt_client = None
self._commands = enumerate_commands(self)
self._properties = {}
self._meta = {}
async def set_property(self, name: str, value, **kwargs):
if name in self._properties:
await self._properties[name](value, **kwargs)
else:
# print(f"Warning: proeprty {name} is unregistered")
await self._publish(f"{self.property_topic()}/{name}", value, **kwargs)
async def register_property(
self, name: str, description: str | None = None, schema: dict | None = None
):
property = self._register_property(
f"{self.property_topic()}/{name}", description, schema
) )
self._properties[name] = property
self.mqtt_client = aiomqtt.Client( async def _register_property(
self.mqtt_config.host, self, name: str, description: str | None = None, schema: dict | None = None
port=self.mqtt_config.port, ):
identifier=handler_id, property = Property(name, description, schema, self._publish)
protocol=paho.mqtt.client.MQTTv5,
will=will,
)
self.commands = self.get_available_commands()
def get_available_commands(self):
commands = {}
for base in self.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'")
commands[attr.name] = attr
return commands
async def register_commands(self):
for name, command in self.commands.items():
for k, v in {
"schema": json.dumps(command.schema),
"description": command.description,
**command.additional_properties,
}.items():
await self.mqtt_client.publish(
f"{self.command_topic}/{command.name}/${k}",
str(v),
qos=1,
retain=True,
)
async def register_property(self, property: str, description: str | None = None, schema: dict | None = None):
data = { data = {
"schema": json.dumps(schema), "schema": json.dumps(schema),
"description": description, "description": description,
} }
for k, v in {k:v for k,v in data.items() if v is not None}.items(): for k, v in {k: v for k, v in data.items() if v is not None}.items():
await self.mqtt_client.publish( await self._mqtt_client.publish(
f"{self.property_topic}/{property}/${k}", f"{name}/${k}",
str(v), str(v),
qos=1, qos=1,
retain=True, retain=True,
) )
async def set_property(self, property: str, value, qos=0, retain=False): return property
await self.mqtt_client.publish(
f"{self.property_topic}/{property}", async def _publish(self, name: str, value, **kwargs):
str(value), await self._mqtt_client.publish(f"{name}", value, **kwargs)
qos=qos,
retain=retain, async def _register_commands(self):
for name, command in self._commands.items():
for k, v in {
"schema": json.dumps(command.schema),
"description": command.description,
**command.additional_properties,
}.items():
await self._mqtt_client.publish(
f"{self.command_topic()}/{command.name}/${k}",
str(v),
qos=1,
retain=True,
)
async def _announce(self):
# announce that we are online
await self._register_commands()
self._meta[MQTTHandler.STATUS] = await self._register_property(
f"{self.meta_topic()}/{MQTTHandler.STATUS}",
"Indicates the status of the device.",
{"type": "string", "enum": list(Status.__members__.keys())},
)
await self._meta[MQTTHandler.STATUS](
self, json.dumps(Status.ONLINE.name), qos=1, retain=True
) )
async def execute_command( await self._publish(f"{self.meta_topic()}/name", self.name, qos=1, retain=True)
await self._publish(
f"{self.meta_topic()}/type", type(self).__name__, qos=1, retain=True
)
await self._publish(
f"{self.meta_topic()}/host", socket.gethostname(), qos=1, retain=True
)
async def _execute_command(
self, self,
command_name: str, command_name: str,
payload: str, payload: str,
@ -115,7 +161,7 @@ class MQTTHandler:
if hasattr(properties, "CorrelationData") if hasattr(properties, "CorrelationData")
else None else None
) )
await self.mqtt_client.publish( await self._mqtt_client.publish(
properties.ResponseTopic, properties.ResponseTopic,
str(CommandResponse(success, str(message), correlation)), str(CommandResponse(success, str(message), correlation)),
qos=1, qos=1,
@ -123,7 +169,7 @@ class MQTTHandler:
) )
try: try:
command = self.commands[command_name] command = self._commands[command_name]
result = await command(self, payload) result = await command(self, payload)
await respond(True, result) await respond(True, result)
except (CommandArgumentError, CommandExecutionError) as e: except (CommandArgumentError, CommandExecutionError) as e:
@ -132,39 +178,53 @@ class MQTTHandler:
print(f"Failed to execute command {command_name} with unknown cause: ", e) print(f"Failed to execute command {command_name} with unknown cause: ", e)
await respond(False, "Unexpected error") await respond(False, "Unexpected error")
async def mqtt_command_writer_task(self): async def _command_executor(self):
async for message in self.mqtt_client.messages: await self._mqtt_client.subscribe(f"{self.command_topic()}/+")
async for message in self._mqtt_client.messages:
topic = str(message.topic) topic = str(message.topic)
payload = message.payload.decode("utf-8") payload = message.payload.decode("utf-8")
if topic.startswith(self.command_topic): if topic.startswith(self.command_topic()):
command_name = topic.removeprefix(f"{self.command_topic}/") command_name = topic.removeprefix(f"{self.command_topic()}/")
await self.execute_command(command_name, payload, message.properties) await self._execute_command(command_name, payload, message.properties)
async def shutdown_watcher(self): async def _shutdown_watcher(self):
await self._shutdown_event.wait() await self._shutdown_event.wait()
await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True) await self._meta[MQTTHandler.STATUS](
self, json.dumps(Status.OFFLINE.name), qos=1, retain=True
)
def stop(self): def stop(self):
self._shutdown_event.set() self._shutdown_event.set()
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
async def run(self): async def run(self, host: str, **kwargs):
INTERVAL = 5 INTERVAL = 5
will = aiomqtt.Will(
topic=f"{self.meta_topic()}/{MQTTHandler.STATUS}",
payload=json.dumps(Status.OFFLINE.name),
qos=1,
retain=True,
)
while True: while True:
try: try:
async with self.mqtt_client as client: async with aiomqtt.Client(
await client.subscribe(f"{self.command_topic}/+") host,
await self.register_commands() protocol=paho.mqtt.client.MQTTv5,
will=will,
identifier=self.identifier,
**kwargs,
) as client:
self._mqtt_client = client
# announce that we are online tasks = [
await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True) self._command_executor(),
await self.register_property("status", "Indicates the status of the device.", { self._shutdown_watcher(),
"type": "string", self._announce(),
"enum": ["ONLINE", "OFFLINE"] ]
})
tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()]
# Inspect instance methods # Inspect instance methods
for attr_name in dir(self): for attr_name in dir(self):
@ -179,11 +239,12 @@ class MQTTHandler:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
except aiomqtt.MqttError as e: except aiomqtt.MqttError as e:
print( print(f"MQTT connection error: {e}. Reconnecting in {INTERVAL}s...")
f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {INTERVAL}s..."
)
await asyncio.sleep(INTERVAL) await asyncio.sleep(INTERVAL)
finally:
self._mqtt_client = None
def task(func): def task(func):
"""Decorator to mark async methods for automatic gathering.""" """Decorator to mark async methods for automatic gathering."""

58
mqtthandler/property.py Normal file
View File

@ -0,0 +1,58 @@
import json
import jsonschema
class PropertyValueError(ValueError):
pass
class Property:
"""
Presumes that the handler will take the same arguments as aiomqtt.Client.publish
ie: async publish(
topic: str,
payload: str | bytes | bytearray | int | float | None = None,
qos: int = 0,
retain: bool = False,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any
) None
"""
def __init__(
self,
name: str,
description: str | None = None,
schema: dict | None = None,
handler=None,
**kwargs,
):
self.name = name
self.description = description
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
def __call__(self, handler_instance, payload, **kwargs):
if self.handler is None:
raise NotImplementedError(f"No handler bound for property '{self.name}'")
try:
value = json.loads(payload)
if self.schema is not None:
jsonschema.validate(value, self.schema)
except json.decoder.JSONDecodeError as e:
raise PropertyValueError(
f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}"
)
except jsonschema.ValidationError as e:
raise PropertyValueError(f"Schema error in {e.json_path}: {e.message}")
try:
return self.handler(self.name, payload, **kwargs)
except Exception as e:
print("Failed to set property: ", e)
raise RuntimeError(f"Failed to set property.")

View File

@ -6,24 +6,23 @@ import asyncio
from dataclasses import dataclass from dataclasses import dataclass
from mqtthandler.command import command from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task from mqtthandler.handler import MQTTHandler, task
from streamer.fileradio import FileRadio from streamer.fileradio import FileRadio
class RadioHandler(MQTTHandler): class RadioHandler(MQTTHandler):
def __init__( def __init__(
self, self,
mqtt_config: MQTTConfig, name: str,
handler_id: str,
): ):
super().__init__(mqtt_config, handler_id) super().__init__(name)
self.radio = FileRadio("./data/StarWars60.mp3", name)
self.radio = FileRadio("./data/StarWars60.mp3", handler_id)
@task @task
async def publish_stream_path(self): async def publish_stream_path(self):
await self.set_property("path", self.radio.stream_path(), 1, True) await self.set_property("path", self.radio.stream_path(), qos=1, retain=True)
await self.set_property("file", self.radio.path, 1, True) await self.set_property("file", self.radio.path, qos=1, retain=True)
@command({"type": "object"}, "Start the radio stream.") @command({"type": "object"}, "Start the radio stream.")
async def start(self, args): async def start(self, args):
@ -47,13 +46,11 @@ class RadioHandler(MQTTHandler):
self.radio = FileRadio(args, self.radio.name) self.radio = FileRadio(args, self.radio.name)
await self.publish_stream_path() await self.publish_stream_path()
async def main():
handler_id = f"radio-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = RadioHandler(mqtt_config, handler_id)
async def main():
handler = RadioHandler("radio")
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run() await handler.run("127.0.0.1", username="device", password="devicesecret")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -21,9 +21,9 @@ class FileRadio(Streamer):
self.playback = subprocess.Popen( self.playback = subprocess.Popen(
[ [
"/usr/bin/ffmpeg", "/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag "-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream - "-stream_loop", # Loop the stream -
"-1", # ...indefinitely "-1", # ...indefinitely
"-i", "-i",
self.path, self.path,
"-c:a", "-c:a",

View File

@ -1,5 +1,6 @@
from threading import Thread from threading import Thread
def is_alive(subprocess): def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False return True if (subprocess and subprocess.poll() is None) else False

37
ubx.py
View File

@ -2,17 +2,15 @@
import asyncio import asyncio
import aioserial import aioserial
import aiomqtt
import pyubx2 import pyubx2
import io import io
import logging import logging
import aioserial import aioserial
import asyncio import asyncio
import socket
import signal import signal
from mqtthandler.command import command from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task from mqtthandler.handler import MQTTHandler, task
# The pyubx2 library spams the console with errors that aren't errors. # The pyubx2 library spams the console with errors that aren't errors.
# I don't care if you failed to parse an incomplete buffer. # I don't care if you failed to parse an incomplete buffer.
@ -47,11 +45,10 @@ class UBXAsyncParser:
class UBXHandler(MQTTHandler): class UBXHandler(MQTTHandler):
def __init__( def __init__(
self, self,
mqtt_client: aiomqtt.Client, name: str,
handler_id: str,
serial_port: aioserial.AioSerial, serial_port: aioserial.AioSerial,
): ):
super().__init__(mqtt_client, handler_id) super().__init__(name)
self.serial_port = serial_port self.serial_port = serial_port
@task @task
@ -77,7 +74,7 @@ class UBXHandler(MQTTHandler):
continue continue
property = f"{message.identity}/{name}" property = f"{message.identity}/{name}"
await self.set_property(property, value, 1, True) await self.set_property(property, value, qos=0, retain=True)
else: else:
# print("Unexpected response:", message) # print("Unexpected response:", message)
pass pass
@ -112,7 +109,7 @@ class UBXHandler(MQTTHandler):
460800, 460800,
921600, 921600,
], ],
"default": 9600 "default": 9600,
}, },
}, },
"required": ["portID", "baudRate"], "required": ["portID", "baudRate"],
@ -147,7 +144,7 @@ class UBXHandler(MQTTHandler):
"minimum": 1, "minimum": 1,
"maximum": 127, "maximum": 127,
"description": "Number of measurement cycles per navigation solution", "description": "Number of measurement cycles per navigation solution",
"default": 1 "default": 1,
}, },
"timeRef": { "timeRef": {
"type": "integer", "type": "integer",
@ -174,19 +171,19 @@ class UBXHandler(MQTTHandler):
async def main(): async def main():
handler_id = f"example-gps-{socket.gethostname()}" handler = UBXHandler(
mqtt_config = MQTTConfig(host="127.0.0.1", port=1883) "example-gps",
aioserial.AioSerial(
serial_port = aioserial.AioSerial( port="/tmp/ttyV0",
port="/tmp/ttyV0", baudrate=115200,
baudrate=115200, timeout=0.05, # 50 ms
timeout=0.05, # 50 ms ),
) )
handler = UBXHandler(mqtt_config, handler_id, serial_port)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
await handler.run(
"127.0.0.1", port=1883, username="device", password="devicesecret"
)
if __name__ == "__main__": if __name__ == "__main__":