class Client: __schema = 'https://zyp.no/schema/foo/0.1/ila.json' def __init__(self, analyzer): self._analyzer = analyzer assert self.__schema in analyzer.annotations self._metadata = analyzer.annotations[self.__schema] async def start(self): await self._analyzer.cmd.write(1) async def stop(self): await self._analyzer.cmd.write(2) async def is_done(self): return await self._analyzer.status.read() == 3 async def level(self): return await self._analyzer.level.read() async def download(self): samples = [] for _ in range(await self.level()): sample = 0 for shift in range(0, self._metadata['buffer_width'], self._analyzer.data.width): sample |= await self._analyzer.data.read() << shift samples.append(sample) return Data(samples, self._metadata) @click.command() @click.version_option(package_name = 'katsuo-ila') @options.add_common @options.csr_bridge @click.option('--analyzer', type = str) @click.option('--vcd', type = click.Path(dir_okay = False, path_type = pathlib.Path)) @async_command async def main(csr_bridge, analyzer, vcd): # Look up the analyzer CSR block. analyzer_periph = csr_bridge if analyzer is not None: for name in analyzer.split('.'): analyzer_periph = analyzer_periph[name] client = Client(analyzer_periph) print(time.time(), 'Starting capture...') # Start the capture. await client.start() print(time.time(), 'Waiting for capture to finish...') # Wait for the capture to finish. while not await client.is_done(): pass print(time.time(), 'Downloading data...') # Read the captured data. data = await client.download() print(time.time(), 'Done.') if vcd is not None: data.save_vcd(vcd)