#! /usr/bin/env python3 import asyncio import aioserial import pyubxutils import pyubx2 SERIAL_PORT = "/tmp/ttyV1" BAUD = 115200 async def simulator_to_serial(stream, ser): ubr = pyubx2.UBXReader(stream) for raw, parsed in ubr: await ser.write_async(raw) async def serial_to_simulator(stream, ser): while True: data = await ser.read_async(1024) if data: stream.write(data) async def main(): ser = aioserial.AioSerial(port=SERIAL_PORT, baudrate=BAUD) with pyubxutils.UBXSimulator( configfile="./ubxsimulator.json", interval=1000, timeout=3, ) as stream: await asyncio.gather( simulator_to_serial(stream, ser), serial_to_simulator(stream, ser), ) if __name__ == "__main__": asyncio.run(main())