class StreamInterface(Interface): def __init__(self, signature, *, path): super().__init__(signature, path = path) if signature.backpressure == False: self.ready = C(1) async def recv(self): await self.ready.set(1) await Settle() while not (await self.valid.get()): await Tick() await Settle() payload = await self.payload.get() await Tick() await self.ready.set(0) return payload async def send(self, value): await self.payload.set(value) await self.valid.set(1) await Settle() while not (await self.ready.get()): await Tick() await Settle() await Tick() await self.valid.set(0) class Stream(Signature): def __init__(self, payload_shape, *, backpressure = True): super().__init__({ 'payload': Out(payload_shape), 'valid': Out(1), 'ready': In(1), }) self.backpressure = backpressure def create(self, *, path = ()): return StreamInterface(self, path = path)