| #!/usr/bin/env python3
import yaml, json
from pymodbus.client.asynchronous.serial import AsyncModbusSerialClient
from pymodbus.client.asynchronous import schedulers
loop, client = AsyncModbusSerialClient(schedulers.ASYNC_IO, method='rtu', port='/dev/cu.usbserial-AL063QQD', baudrate = 115200)
registers = yaml.load(open('registers.yml'), Loader = yaml.Loader)
async def update():
data = {}
for sensor in registers['sensors']:
rr = await client.protocol.read_holding_registers(unit = 1, address = sensor['address'], count = 1)
if not rr.isError():
data[sensor['name']] = rr.registers[0] / (1 / sensor.get('scale', 1))
print('%s\t%.1f %s' % (sensor['name'], rr.registers[0] * sensor.get('scale', 1), sensor['unit']))
else:
print('error: ', rr)
return data
|