systemair.py

Pasted by zyp on Tue Dec 24 11:01:26 2019 UTC as Python
#!/usr/bin/env python3

import yaml, json

from pymodbus.client.asynchronous.serial import AsyncModbusSerialClient
from pymodbus.client.asynchronous import schedulers

loop, client = AsyncModbusSerialClient(schedulers.ASYNC_IO, method='rtu', port='/dev/cu.usbserial-AL063QQD', baudrate = 115200)

registers = yaml.load(open('registers.yml'), Loader = yaml.Loader)

async def update():
	data = {}
	for sensor in registers['sensors']:
		rr = await client.protocol.read_holding_registers(unit = 1, address = sensor['address'], count = 1)
		if not rr.isError():
			data[sensor['name']] = rr.registers[0] / (1 / sensor.get('scale', 1))
			print('%s\t%.1f %s' % (sensor['name'], rr.registers[0] * sensor.get('scale', 1), sensor['unit']))
		else:
			print('error: ', rr)
	return data