| class Client:
__schema = 'https://zyp.no/schema/foo/0.1/ila.json'
def __init__(self, analyzer):
self._analyzer = analyzer
assert self.__schema in analyzer.annotations
self._metadata = analyzer.annotations[self.__schema]
async def start(self):
await self._analyzer.cmd.write(1)
async def stop(self):
await self._analyzer.cmd.write(2)
async def is_done(self):
return await self._analyzer.status.read() == 3
async def level(self):
return await self._analyzer.level.read()
async def download(self):
samples = []
for _ in range(await self.level()):
sample = 0
for shift in range(0, self._metadata['buffer_width'], self._analyzer.data.width):
sample |= await self._analyzer.data.read() << shift
samples.append(sample)
return Data(samples, self._metadata)
@click.command()
@click.version_option(package_name = 'katsuo-ila')
@options.add_common
@options.csr_bridge
@click.option('--analyzer', type = str)
@click.option('--vcd', type = click.Path(dir_okay = False, path_type = pathlib.Path))
@async_command
async def main(csr_bridge, analyzer, vcd):
# Look up the analyzer CSR block.
analyzer_periph = csr_bridge
if analyzer is not None:
for name in analyzer.split('.'):
analyzer_periph = analyzer_periph[name]
client = Client(analyzer_periph)
print(time.time(), 'Starting capture...')
# Start the capture.
await client.start()
print(time.time(), 'Waiting for capture to finish...')
# Wait for the capture to finish.
while not await client.is_done():
pass
print(time.time(), 'Downloading data...')
# Read the captured data.
data = await client.download()
print(time.time(), 'Done.')
if vcd is not None:
data.save_vcd(vcd)
|