Untitled

Pasted by zyp on Mon Feb 12 18:56:45 2024 UTC as Python
from amaranth import *

from amaranth.lib.wiring import Component, Signature, In, Out, connect, flipped
from amaranth.lib.fixed import SQ
from amaranth.lib import data

from signal_types import DQEnc, ABCEnc, ABCVector, DQVector, Current

from zyp_amaranth_libs.stream import StreamSignature

class TorquePI(Component):
    enable: In(1)
    setpoint: In(DQVector)
    input: In(StreamSignature(DQVector))
    output: Out(StreamSignature(DQVector))

    gain_kp: In(SQ(8, 8))
    gain_ki: In(SQ(8, 8))
    gain_ff0: In(SQ(8, 8))
    out_max: In(SQ(16, 0))

    def elaborate(self, platform):
        m = Module()

        error = Signal(DQVector)
        i_acc = Signal(DQVector)

        mul_gain = Signal(SQ(8, 8))
        mul_sig = Signal(SQ(16, 0))
        mul_add = Signal(SQ(16, 0))
        mul_res = Signal(SQ(24, 0))
        mul_res_clamped = Signal(SQ(16, 0))

        m.d.sync += mul_res.eq(mul_gain * mul_sig + mul_add)

        m.d.comb += [
            mul_res_clamped.eq(mul_res),
        ]

        with m.If(mul_res > self.out_max):
            m.d.comb += mul_res_clamped.eq(self.out_max)

        with m.If(mul_res < -self.out_max):
            m.d.comb += mul_res_clamped.eq(-self.out_max)

        # FSM
        with m.FSM() as fsm:
            with m.State('IDLE'):
                m.d.comb += self.input.ready.eq(1)
                with m.If(self.input.valid):
                    m.d.sync += [
                        error[0].eq(self.setpoint[0] - self.input.data[0]),
                        error[1].eq(self.setpoint[1] - self.input.data[1]),
                    ]
                    m.next = 'START'

            with m.State('START'):
                m.next = 'I_D'
                m.d.comb += [
                    mul_gain.eq(self.gain_ki),
                    mul_sig.eq(error[0]),
                    mul_add.eq(i_acc[0]),
                ]

            with m.State('I_D'):
                m.d.sync += i_acc[0].eq(mul_res_clamped)

                m.next = 'I_Q'
                m.d.comb += [
                    mul_gain.eq(self.gain_ki),
                    mul_sig.eq(error[1]),
                    mul_add.eq(i_acc[1]),
                ]

            with m.State('I_Q'):
                m.d.sync += i_acc[1].eq(mul_res_clamped)

                m.next = 'P_D'
                m.d.comb += [
                    mul_gain.eq(self.gain_kp),
                    mul_sig.eq(error[0]),
                    mul_add.eq(i_acc[0]),
                ]

            with m.State('P_D'):
                m.d.sync += self.output.data[0].eq(mul_res_clamped)

                m.next = 'P_Q'
                m.d.comb += [
                    mul_gain.eq(self.gain_kp),
                    mul_sig.eq(error[1]),
                    mul_add.eq(i_acc[1]),
                ]

            with m.State('P_Q'):
                m.d.sync += self.output.data[1].eq(mul_res_clamped)

                m.next = 'FF0_D'
                m.d.comb += [
                    mul_gain.eq(self.gain_ff0),
                    mul_sig.eq(self.setpoint[0]),
                    mul_add.eq(self.output.data[0]),
                ]

            with m.State('FF0_D'):
                m.d.sync += self.output.data[0].eq(mul_res_clamped)

                m.next = 'FF0_Q'
                m.d.comb += [
                    mul_gain.eq(self.gain_ff0),
                    mul_sig.eq(self.setpoint[1]),
                    mul_add.eq(self.output.data[1]),
                ]

            with m.State('FF0_Q'):
                m.d.sync += self.output.data[1].eq(mul_res_clamped)

                m.next = 'DONE'

            with m.State('DONE'):
                m.d.comb += self.output.valid.eq(1)
                with m.If(self.output.ready):
                    m.next = 'IDLE'

        with m.If(self.enable == 0):
            m.d.sync += [
                i_acc.eq(0),
                self.output.data.eq(0),
            ]

        return m