Untitled

Pasted by zyp on Wed Apr 9 20:28:24 2025 UTC as Python
def tb_with_bridge_client(sim: Simulator, dut, *, input_stream_fc = 0, output_stream_fc = 0):
    def wrapper(f):
        class Transport:
            def __init__(self):
                sim.add_testbench(self._input_testbench, background = True)
                sim.add_testbench(self._output_testbench)
                self.input_queue = []
            
            async def send(self, data: bytes):
                self.input_queue.extend(data)

            async def recv(self, n = 1):
                buf = bytearray()
                for _ in range(n):
                    if output_stream_fc:
                        await self.ctx.tick().repeat(output_stream_fc)
                    buf.append(await stream_get(self.ctx, dut.output))
                return buf
            
            async def _input_testbench(self, ctx: SimulatorContext):
                while True:
                    await ctx.tick()
                    if self.input_queue:
                        with ctx.critical():
                            while self.input_queue:
                                if input_stream_fc:
                                    await ctx.tick().repeat(input_stream_fc)
                                await stream_put(ctx, dut.input, self.input_queue.pop(0))

            async def _output_testbench(self, ctx: SimulatorContext):
                transport.ctx = ctx
                await f(ctx, bridge.Client(transport))

        transport = Transport()
    
    return wrapper