| import deps
import os
import luna.usb2
from luna.gateware.architecture.car import PHYResetController
import nmigen
from nmigen.back import verilog
import migen
from litex.soc.interconnect.stream import Endpoint
class NMigenWrapper(nmigen.Elaboratable):
def __init__(self, name = None):
self.name = name
self.submodules = []
self.signals = []
self.comb = []
self.connections = {}
def elaborate(self, platform):
m = nmigen.Module()
m.domains += nmigen.ClockDomain('sync')
#m.domains += nmigen.ClockDomain('usb')
#m.domains.sync = nmigen.ClockDomain()
#m.domains.usb = nmigen.ClockDomain()
m.submodules += self.submodules
m.d.comb += self.comb
return m
def get_instance(self):
return migen.Instance(self.name, **self.connections)
def generate_verilog(self):
ports = []
for s in self.signals:
ports.extend(s._lhs_signals())
return verilog.convert(self, name = self.name, ports = ports)
class USBDevice(migen.Module):
def __init__(self, platform, *args, **kwargs):
self.platform = platform
self._wrapper = NMigenWrapper('USBDevice')
sync_clk = nmigen.Signal(name = 'sync_clk')
sync_rst = nmigen.Signal(name = 'sync_rst')
#usb_clk = nmigen.Signal(name = 'usb_clk')
self._wrapper.signals.extend([
sync_clk,
sync_rst,
#usb_clk,
])
self._wrapper.comb.append([
nmigen.ClockSignal('sync').eq(sync_clk),
nmigen.ResetSignal('sync').eq(sync_rst),
#nmigen.ClockSignal('usb').eq(usb_clk),
])
phy_reset_controller = PHYResetController()
self._wrapper.submodules.append(phy_reset_controller)
self._wrapper.comb.append([
nmigen.ResetSignal('usb').eq(phy_reset_controller.phy_reset),
])
ulpi_pads = platform.request('ulpi')
ulpi_data = migen.TSTriple(8)
#ulpi_rst = migen.Signal()
self.specials += ulpi_data.get_tristate(ulpi_pads.data)
#self.comb += ulpi_pads.rst.eq(~ulpi_rst)
self._wrapper.connections.update(dict(
i_sync_clk = migen.ClockSignal(),
i_sync_rst = migen.ResetSignal(),
#i_usb_clk = migen.ClockSignal(),
o_ulpi__data__o = ulpi_data.o,
o_ulpi__data__oe = ulpi_data.oe,
i_ulpi__data__i = ulpi_data.i,
i_ulpi__clk__i = ulpi_pads.clk,
o_ulpi__stp = ulpi_pads.stp,
i_ulpi__nxt__i = ulpi_pads.nxt,
i_ulpi__dir__i = ulpi_pads.dir,
o_ulpi__rst = ulpi_pads.rst,
#o_ulpi__rst = ulpi_rst,
))
from nmigen.hdl.rec import DIR_FANIN, DIR_FANOUT, DIR_NONE
self._ulpi = nmigen.Record(
[
('data', [('i', 8, DIR_FANIN), ('o', 8, DIR_FANOUT), ('oe', 1, DIR_FANOUT)]),
('clk', [('i', 1, DIR_FANIN)]),
('stp', 1, DIR_FANOUT),
('nxt', [('i', 1, DIR_FANIN)]),
('dir', [('i', 1, DIR_FANIN)]),
('rst', 1, DIR_FANOUT),
],
name = 'ulpi',
)
self._wrapper.signals.append(self._ulpi)
self.usb = luna.usb2.USBDevice(bus = self._ulpi, *args, **kwargs)
self._wrapper.submodules.append(self.usb)
self._wrapper.comb.append([
self.usb.connect.eq(1),
])
def add_endpoint(self, ep):
self.usb.add_endpoint(ep._ep)
ep.wrap(self._wrapper)
def do_finalize(self):
verilog_filename = os.path.join(self.platform.output_dir, 'gateware', 'USBDevice.v')
with open(verilog_filename, 'w') as f:
f.write(self._wrapper.generate_verilog())
self.platform.add_source(verilog_filename)
self.specials += self._wrapper.get_instance()
class USBStreamOutEndpoint:
def __init__(self, *, endpoint_number, **kwargs):
self._ep = luna.usb2.USBStreamOutEndpoint(endpoint_number = endpoint_number, **kwargs)
self.prefix = f'ep_{endpoint_number}_out'
self.source = Endpoint([('data', 8)])
def wrap(self, wrapper):
stream = nmigen.Record(self._ep.stream.layout, name = self.prefix)
wrapper.comb.append([
stream.payload.eq(self._ep.stream.payload),
stream.first.eq(self._ep.stream.first),
stream.last.eq(self._ep.stream.last),
stream.valid.eq(self._ep.stream.valid),
self._ep.stream.ready.eq(stream.ready),
])
wrapper.signals.append(stream)
wrapper.connections.update({
f'o_{self.prefix}__payload': self.source.data,
f'o_{self.prefix}__first': self.source.first,
f'o_{self.prefix}__last': self.source.last,
f'o_{self.prefix}__valid': self.source.valid,
f'i_{self.prefix}__ready': self.source.ready,
})
class USBStreamInEndpoint:
def __init__(self, *, endpoint_number, **kwargs):
self._ep = luna.usb2.USBStreamInEndpoint(endpoint_number = endpoint_number, **kwargs)
self.prefix = f'ep_{endpoint_number}_in'
self.sink = Endpoint([('data', 8)])
def wrap(self, wrapper):
stream = nmigen.Record(self._ep.stream.layout, name = self.prefix)
wrapper.comb.append([
self._ep.stream.payload.eq(stream.payload),
self._ep.stream.first.eq(stream.first),
self._ep.stream.last.eq(stream.last),
self._ep.stream.valid.eq(stream.valid),
stream.ready.eq(self._ep.stream.ready),
])
wrapper.signals.append(stream)
wrapper.connections.update({
f'i_{self.prefix}__payload': self.sink.data,
f'i_{self.prefix}__first': self.sink.first,
f'i_{self.prefix}__last': self.sink.last,
f'i_{self.prefix}__valid': self.sink.valid,
f'o_{self.prefix}__ready': self.sink.ready,
})
class USBMultibyteStreamInEndpoint:
def __init__(self, *, endpoint_number, byte_width, **kwargs):
self._ep = luna.usb2.USBMultibyteStreamInEndpoint(endpoint_number = endpoint_number, byte_width = byte_width, **kwargs)
self.prefix = f'ep_{endpoint_number}_in'
self.sink = Endpoint([('data', 8 * byte_width)])
def wrap(self, wrapper):
stream = nmigen.Record(self._ep.stream.layout, name = self.prefix)
wrapper.comb.append([
self._ep.stream.payload.eq(stream.payload),
self._ep.stream.first.eq(stream.first),
self._ep.stream.last.eq(stream.last),
self._ep.stream.valid.eq(stream.valid),
stream.ready.eq(self._ep.stream.ready),
])
wrapper.signals.append(stream)
wrapper.connections.update({
f'i_{self.prefix}__payload': self.sink.data,
f'i_{self.prefix}__first': self.sink.first,
f'i_{self.prefix}__last': self.sink.last,
f'i_{self.prefix}__valid': self.sink.valid,
f'o_{self.prefix}__ready': self.sink.ready,
})
|