mqttdevicemanager/scripts/ubxsimulator
2026-03-18 15:39:27 +10:30

43 lines
833 B
Python
Executable File

#! /usr/bin/env python3
import asyncio
import aioserial
import pyubxutils
import pyubx2
SERIAL_PORT = "/tmp/ttyV1"
BAUD = 115200
async def simulator_to_serial(stream, ser):
ubr = pyubx2.UBXReader(stream)
for raw, parsed in ubr:
await ser.write_async(raw)
async def serial_to_simulator(stream, ser):
while True:
data = await ser.read_async(1024)
if data:
stream.write(data)
async def main():
ser = aioserial.AioSerial(port=SERIAL_PORT, baudrate=BAUD)
with pyubxutils.UBXSimulator(
configfile="./ubxsimulator.json",
interval=1000,
timeout=3,
) as stream:
await asyncio.gather(
simulator_to_serial(stream, ser),
serial_to_simulator(stream, ser),
)
if __name__ == "__main__":
asyncio.run(main())