| def tb_with_bridge_client(sim: Simulator, dut, *, input_stream_fc = 0, output_stream_fc = 0):
def wrapper(f):
class Transport:
def __init__(self):
sim.add_testbench(self._input_testbench, background = True)
sim.add_testbench(self._output_testbench)
self.input_queue = []
async def send(self, data: bytes):
self.input_queue.extend(data)
async def recv(self, n = 1):
buf = bytearray()
for _ in range(n):
if output_stream_fc:
await self.ctx.tick().repeat(output_stream_fc)
buf.append(await stream_get(self.ctx, dut.output))
return buf
async def _input_testbench(self, ctx: SimulatorContext):
while True:
await ctx.tick()
if self.input_queue:
with ctx.critical():
while self.input_queue:
if input_stream_fc:
await ctx.tick().repeat(input_stream_fc)
await stream_put(ctx, dut.input, self.input_queue.pop(0))
async def _output_testbench(self, ctx: SimulatorContext):
transport.ctx = ctx
await f(ctx, bridge.Client(transport))
transport = Transport()
return wrapper
|