Untitled

Pasted by Anonymous on Wed Mar 10 22:28:03 2021 UTC as Python
import deps
import os

import luna.usb2
from luna.gateware.architecture.car import PHYResetController

import nmigen
from nmigen.back import verilog

import migen
from litex.soc.interconnect.stream import Endpoint

class NMigenWrapper(nmigen.Elaboratable):
    def __init__(self, name = None):
        self.name = name
        self.submodules = []
        self.signals = []
        self.comb = []

        self.connections = {}

    def elaborate(self, platform):
        m = nmigen.Module()

        m.domains += nmigen.ClockDomain('sync')
        #m.domains += nmigen.ClockDomain('usb')
        #m.domains.sync = nmigen.ClockDomain()
        #m.domains.usb  = nmigen.ClockDomain()

        m.submodules += self.submodules
        m.d.comb += self.comb

        return m
    
    def get_instance(self):
        return migen.Instance(self.name, **self.connections)

    def generate_verilog(self):
        ports = []
        for s in self.signals:
            ports.extend(s._lhs_signals())

        return verilog.convert(self, name = self.name, ports = ports)

class USBDevice(migen.Module):
    def __init__(self, platform, *args, **kwargs):
        self.platform = platform

        self._wrapper = NMigenWrapper('USBDevice')

        sync_clk = nmigen.Signal(name = 'sync_clk')
        sync_rst = nmigen.Signal(name = 'sync_rst')
        #usb_clk = nmigen.Signal(name = 'usb_clk')

        self._wrapper.signals.extend([
            sync_clk,
            sync_rst,
            #usb_clk,
        ])

        self._wrapper.comb.append([
            nmigen.ClockSignal('sync').eq(sync_clk),
            nmigen.ResetSignal('sync').eq(sync_rst),
            #nmigen.ClockSignal('usb').eq(usb_clk),
        ])

        phy_reset_controller = PHYResetController()
        self._wrapper.submodules.append(phy_reset_controller)

        self._wrapper.comb.append([
            nmigen.ResetSignal('usb').eq(phy_reset_controller.phy_reset),
        ])

        ulpi_pads = platform.request('ulpi')
        ulpi_data = migen.TSTriple(8)
        #ulpi_rst = migen.Signal()

        self.specials += ulpi_data.get_tristate(ulpi_pads.data)
        #self.comb += ulpi_pads.rst.eq(~ulpi_rst)

        self._wrapper.connections.update(dict(
            i_sync_clk = migen.ClockSignal(),
            i_sync_rst = migen.ResetSignal(),
            #i_usb_clk = migen.ClockSignal(),

            o_ulpi__data__o = ulpi_data.o,
            o_ulpi__data__oe = ulpi_data.oe,
            i_ulpi__data__i = ulpi_data.i,
            i_ulpi__clk__i = ulpi_pads.clk,
            o_ulpi__stp = ulpi_pads.stp,
            i_ulpi__nxt__i = ulpi_pads.nxt,
            i_ulpi__dir__i = ulpi_pads.dir,
            o_ulpi__rst = ulpi_pads.rst, 
            #o_ulpi__rst = ulpi_rst, 
        ))

        from nmigen.hdl.rec import DIR_FANIN, DIR_FANOUT, DIR_NONE
        self._ulpi = nmigen.Record(
            [
                ('data', [('i', 8, DIR_FANIN), ('o', 8, DIR_FANOUT), ('oe', 1, DIR_FANOUT)]),
                ('clk', [('i', 1, DIR_FANIN)]),
                ('stp', 1, DIR_FANOUT),
                ('nxt', [('i', 1, DIR_FANIN)]),
                ('dir', [('i', 1, DIR_FANIN)]),
                ('rst', 1, DIR_FANOUT),
            ],
            name = 'ulpi',
        )

        self._wrapper.signals.append(self._ulpi)

        self.usb = luna.usb2.USBDevice(bus = self._ulpi, *args, **kwargs)
        self._wrapper.submodules.append(self.usb)

        self._wrapper.comb.append([
            self.usb.connect.eq(1),
        ])

    def add_endpoint(self, ep):
        self.usb.add_endpoint(ep._ep)
        ep.wrap(self._wrapper)
    
    def do_finalize(self):
        verilog_filename = os.path.join(self.platform.output_dir, 'gateware', 'USBDevice.v')

        with open(verilog_filename, 'w') as f:
            f.write(self._wrapper.generate_verilog())

        self.platform.add_source(verilog_filename)

        self.specials += self._wrapper.get_instance()

class USBStreamOutEndpoint:
    def __init__(self, *, endpoint_number, **kwargs):
        self._ep = luna.usb2.USBStreamOutEndpoint(endpoint_number = endpoint_number, **kwargs)
        self.prefix = f'ep_{endpoint_number}_out'

        self.source = Endpoint([('data', 8)])
    
    def wrap(self, wrapper):
        stream = nmigen.Record(self._ep.stream.layout, name = self.prefix)
        wrapper.comb.append([
            stream.payload.eq(self._ep.stream.payload),
            stream.first.eq(self._ep.stream.first),
            stream.last.eq(self._ep.stream.last),
            stream.valid.eq(self._ep.stream.valid),
            self._ep.stream.ready.eq(stream.ready),
        ])
        wrapper.signals.append(stream)
    
        wrapper.connections.update({
            f'o_{self.prefix}__payload': self.source.data,
            f'o_{self.prefix}__first': self.source.first,
            f'o_{self.prefix}__last': self.source.last,
            f'o_{self.prefix}__valid': self.source.valid,
            f'i_{self.prefix}__ready': self.source.ready,
        })

class USBStreamInEndpoint:
    def __init__(self, *, endpoint_number, **kwargs):
        self._ep = luna.usb2.USBStreamInEndpoint(endpoint_number = endpoint_number, **kwargs)
        self.prefix = f'ep_{endpoint_number}_in'

        self.sink = Endpoint([('data', 8)])
    
    def wrap(self, wrapper):
        stream = nmigen.Record(self._ep.stream.layout, name = self.prefix)
        wrapper.comb.append([
            self._ep.stream.payload.eq(stream.payload),
            self._ep.stream.first.eq(stream.first),
            self._ep.stream.last.eq(stream.last),
            self._ep.stream.valid.eq(stream.valid),
            stream.ready.eq(self._ep.stream.ready),
        ])
        wrapper.signals.append(stream)

        wrapper.connections.update({
            f'i_{self.prefix}__payload': self.sink.data,
            f'i_{self.prefix}__first': self.sink.first,
            f'i_{self.prefix}__last': self.sink.last,
            f'i_{self.prefix}__valid': self.sink.valid,
            f'o_{self.prefix}__ready': self.sink.ready,
        })

class USBMultibyteStreamInEndpoint:
    def __init__(self, *, endpoint_number, byte_width, **kwargs):
        self._ep = luna.usb2.USBMultibyteStreamInEndpoint(endpoint_number = endpoint_number, byte_width = byte_width, **kwargs)
        self.prefix = f'ep_{endpoint_number}_in'

        self.sink = Endpoint([('data', 8 * byte_width)])
    
    def wrap(self, wrapper):
        stream = nmigen.Record(self._ep.stream.layout, name = self.prefix)
        wrapper.comb.append([
            self._ep.stream.payload.eq(stream.payload),
            self._ep.stream.first.eq(stream.first),
            self._ep.stream.last.eq(stream.last),
            self._ep.stream.valid.eq(stream.valid),
            stream.ready.eq(self._ep.stream.ready),
        ])
        wrapper.signals.append(stream)

        wrapper.connections.update({
            f'i_{self.prefix}__payload': self.sink.data,
            f'i_{self.prefix}__first': self.sink.first,
            f'i_{self.prefix}__last': self.sink.last,
            f'i_{self.prefix}__valid': self.sink.valid,
            f'o_{self.prefix}__ready': self.sink.ready,
        })