Source code for usbcore.tx.crc

#!/usr/bin/env python3

import functools
import operator
import unittest

from migen import *

from migen.fhdl.decorators import CEInserter, ResetInserter

from ..utils.CrcMoose3 import CrcAlgorithm
from ..utils.packet import crc16, encode_data, b
from .shifter import TxShifter
from .tester import module_tester
from ..test.common import BaseUsbTestCase


[docs]@CEInserter() @ResetInserter() class TxSerialCrcGenerator(Module): """ Transmit CRC Generator TxSerialCrcGenerator generates a running CRC. https://www.pjrc.com/teensy/beta/usb20.pdf, USB2 Spec, 8.3.5 https://en.wikipedia.org/wiki/Cyclic_redundancy_check Parameters ---------- Parameters are passed in via the constructor. width : int Width of the CRC. polynomial : int CRC polynomial in integer form. initial : int Initial value of the CRC register before data starts shifting in. Input Ports ------------ i_data : Signal(1) Serial data to generate CRC for. Output Ports ------------ o_crc : Signal(width) Current CRC value. """ def __init__(self, width, polynomial, initial): self.i_data = Signal() crc = Signal(width, reset=initial) crc_invert = Signal(1) self.comb += [ crc_invert.eq(self.i_data ^ crc[width - 1]) ] for i in range(width): rhs_data = None if i == 0: rhs_data = crc_invert else: if (polynomial >> i) & 1: rhs_data = crc[i - 1] ^ crc_invert else: rhs_data = crc[i - 1] self.sync += [ crc[i].eq(rhs_data) ] self.o_crc = Signal(width) for i in range(width): self.comb += [ self.o_crc[i].eq(1 ^ crc[width - i - 1]), ]
[docs]def bytes_to_int(d): """Convert a list of bytes to an int Bytes are in LSB first. >>> hex(bytes_to_int([0, 1])) '0x100' >>> hex(bytes_to_int([1, 2])) '0x201' """ v = 0 for i,d in enumerate(d): v |= d << (i*8) return v
[docs]def cols(rows): """ >>> a = [ ... [1, 2], ... ['a', 'b'], ... [4, 5], ... ] >>> for c in cols(a): ... print(c) [1, 'a', 4] [2, 'b', 5] >>> a = [ ... [1, 2, 3], ... ['a', 'b', 'c'], ... ] >>> for c in cols(a): ... print(c) [1, 'a'] [2, 'b'] [3, 'c'] """ all_c = [] for ci in range(len(rows[0])): all_c.append([]) for ci in range(len(rows[0])): for ri in range(len(rows)): assert len(rows[ri]) == len(all_c), "len(%r) != %i" % (rows[ri], len(all_c)) all_c[ci].append(rows[ri][ci]) return all_c
[docs]def lfsr_serial_shift_crc(lfsr_poly, lfsr_cur, data): """ shift_by == num_data_bits len(data_cur) == num_data_bits >>> for i in range(5): ... l = [0]*5; l[i] = 1 ... r = lfsr_serial_shift_crc( ... lfsr_poly=[0,0,1,0,1], # (5, 2, 0) ... lfsr_cur=l, ... data=[0,0,0,0], ... ) ... print("Min[%i] =" % i, r) Min[0] = [1, 0, 0, 0, 0] Min[1] = [0, 0, 1, 0, 1] Min[2] = [0, 1, 0, 1, 0] Min[3] = [1, 0, 1, 0, 0] Min[4] = [0, 1, 1, 0, 1] >>> for i in range(4): ... d = [0]*4; d[i] = 1 ... r = lfsr_serial_shift_crc( ... lfsr_poly=[0,0,1,0,1], # (5, 2, 0) ... lfsr_cur=[0,0,0,0,0], ... data=d, ... ) ... print("Nin[%i] =" % i, r) Nin[0] = [0, 0, 1, 0, 1] Nin[1] = [0, 1, 0, 1, 0] Nin[2] = [1, 0, 1, 0, 0] Nin[3] = [0, 1, 1, 0, 1] """ lfsr_poly = lfsr_poly[::-1] data = data[::-1] shift_by = len(data) lfsr_poly_size = len(lfsr_poly) assert lfsr_poly_size > 1 assert len(lfsr_cur) == lfsr_poly_size lfsr_next = list(lfsr_cur) for j in range(shift_by): lfsr_upper_bit = lfsr_next[lfsr_poly_size-1] for i in range(lfsr_poly_size-1, 0, -1): if lfsr_poly[i]: lfsr_next[i] = lfsr_next[i-1] ^ lfsr_upper_bit ^ data[j] else: lfsr_next[i] = lfsr_next[i-1] lfsr_next[0] = lfsr_upper_bit ^ data[j] return list(lfsr_next[::-1])
[docs]def build_matrix(lfsr_poly, data_width): """ >>> print("\\n".join(build_matrix([0,0,1,0,1], 4)[0])) lfsr([0, 0, 1, 0, 1], [0, 0, 0, 0, 0], [1, 0, 0, 0]) = [0, 0, 1, 0, 1] lfsr([0, 0, 1, 0, 1], [0, 0, 0, 0, 0], [0, 1, 0, 0]) = [0, 1, 0, 1, 0] lfsr([0, 0, 1, 0, 1], [0, 0, 0, 0, 0], [0, 0, 1, 0]) = [1, 0, 1, 0, 0] lfsr([0, 0, 1, 0, 1], [0, 0, 0, 0, 0], [0, 0, 0, 1]) = [0, 1, 1, 0, 1] <BLANKLINE> lfsr([0, 0, 1, 0, 1], [1, 0, 0, 0, 0], [0, 0, 0, 0]) = [1, 0, 0, 0, 0] lfsr([0, 0, 1, 0, 1], [0, 1, 0, 0, 0], [0, 0, 0, 0]) = [0, 0, 1, 0, 1] lfsr([0, 0, 1, 0, 1], [0, 0, 1, 0, 0], [0, 0, 0, 0]) = [0, 1, 0, 1, 0] lfsr([0, 0, 1, 0, 1], [0, 0, 0, 1, 0], [0, 0, 0, 0]) = [1, 0, 1, 0, 0] lfsr([0, 0, 1, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0]) = [0, 1, 1, 0, 1] <BLANKLINE> Mout[4] = [0, 0, 1, 0] [1, 0, 0, 1, 0] Mout[3] = [0, 1, 0, 1] [0, 0, 1, 0, 1] Mout[2] = [1, 0, 1, 1] [0, 1, 0, 1, 1] Mout[1] = [0, 1, 0, 0] [0, 0, 1, 0, 0] Mout[0] = [1, 0, 0, 1] [0, 1, 0, 0, 1] """ lfsr_poly_size = len(lfsr_poly) # data_width*lfsr_polysize matrix == lfsr(0,Nin) rows_nin = [] # (a) calculate the N values when Min=0 and Build NxM matrix # - Each value is one hot encoded (there is only one bit) # - IE N=4, 0x1, 0x2, 0x4, 0x8 # - Mout = F(Nin,Min=0) # - Each row contains the results of (a) # - IE row[0] == 0x1, row[1] == 0x2 # - Output is M-bit wide (CRC width) # - Each column of the matrix represents an output bit Mout[i] as a function of Nin info = [] for i in range(data_width): # lfsr_cur = [0,...,0] = Min lfsr_cur = [0,]*lfsr_poly_size # data = [0,..,1,..,0] = Nin data = [0,]*data_width data[i] = 1 # Calculate the CRC rows_nin.append(lfsr_serial_shift_crc(lfsr_poly, lfsr_cur, data)) info.append("lfsr(%r, %r, %r) = %r" % (lfsr_poly, lfsr_cur, data, rows_nin[-1])) assert len(rows_nin) == data_width cols_nin = cols(rows_nin)[::-1] # lfsr_polysize*lfsr_polysize matrix == lfsr(Min,0) info.append("") rows_min = [] for i in range(lfsr_poly_size): # lfsr_cur = [0,..,1,...,0] = Min lfsr_cur = [0,]*lfsr_poly_size lfsr_cur[i] = 1 # data = [0,..,0] = Nin data = [0,]*data_width # Calculate the crc rows_min.append(lfsr_serial_shift_crc(lfsr_poly, lfsr_cur, data)) info.append("lfsr(%r, %r, %r) = %r" % (lfsr_poly, lfsr_cur, data, rows_min[-1])) assert len(rows_min) == lfsr_poly_size cols_min = cols(rows_min)[::-1] # (c) Calculate CRC for the M values when Nin=0 and Build MxM matrix # - Each value is one hot encoded # - Mout = F(Nin=0,Min) # - Each row contains results from (7) info.append("") for i in range(data_width, -1, -1): info.append("Mout[%i] = %r %r" % (i, cols_nin[i], cols_min[i])) return info, cols_nin, cols_min
[docs]@ResetInserter() class TxParallelCrcGenerator(Module): """ Transmit CRC Generator TxParallelCrcGenerator generates a running CRC. https://www.pjrc.com/teensy/beta/usb20.pdf, USB2 Spec, 8.3.5 https://en.wikipedia.org/wiki/Cyclic_redundancy_check Parameters ---------- Parameters are passed in via the constructor. width : int Width of the CRC. polynomial : int CRC polynomial in integer form. initial : int Initial value of the CRC register before data starts shifting in. Input Ports ------------ i_data_payload : Signal(8) Byte wide data to generate CRC for. i_data_strobe : Signal(1) Strobe signal for the payload. Output Ports ------------ o_crc : Signal(width) Current CRC value. """ def __init__(self, data_width, crc_width, polynomial, initial=0): self.i_data_payload = Signal(data_width) self.i_data_strobe = Signal() self.o_crc = Signal(crc_width) crc_dat = Signal(data_width) crc_cur = Signal(crc_width, reset=initial) crc_next = Signal(crc_width, reset_less=True) crc_cur_reset_bits = [ int(i) for i in "{0:0{width}b}".format( crc_cur.reset.value,width=crc_width)[::-1]] self.comb += [ crc_dat.eq(self.i_data_payload[::-1]), # FIXME: Is XOR ^ initial actually correct here? self.o_crc.eq(crc_cur[::-1] ^ initial), ] self.sync += [ If(self.i_data_strobe, crc_cur.eq(crc_next), ), ] poly_list = [] for i in range(crc_width): poly_list.insert(0, polynomial >> i & 0x1) assert len(poly_list) == crc_width _, cols_nin, cols_min = build_matrix(poly_list, data_width) crc_next_reset_bits = list(crc_cur_reset_bits) for i in range(crc_width): to_xor = [] crc_next_reset_bit_i = [] for j, use in enumerate(cols_nin[i]): if use: to_xor.append(crc_dat[j]) crc_next_reset_bit_i.append(0) for j, use in enumerate(cols_min[i]): if use: to_xor.append(crc_cur[j]) crc_next_reset_bit_i.append(crc_cur_reset_bits[j]) crc_next_reset_bits[i] = functools.reduce(operator.xor, crc_next_reset_bit_i) self.comb += [ crc_next[i].eq(functools.reduce(operator.xor, to_xor)), ] crc_next_reset_value = int("0b"+"".join(str(i) for i in crc_next_reset_bits[::-1]), 2) crc_next.reset.value = crc_next_reset_value
[docs]class TxCrcPipeline(Module): def __init__(self): self.i_data_payload = Signal(8) self.o_data_ack = Signal() self.o_crc16 = Signal(16) self.reset = reset = Signal() reset_n1 = Signal() reset_n2 = Signal() self.ce = ce = Signal() self.sync += [ reset_n2.eq(reset_n1), reset_n1.eq(reset), ] self.submodules.shifter = shifter = TxShifter(width=8) self.comb += [ shifter.i_data.eq(self.i_data_payload), shifter.reset.eq(reset), shifter.ce.eq(ce), self.o_data_ack.eq(shifter.o_get), ] self.submodules.crc = crc_calc = TxSerialCrcGenerator( width = 16, polynomial = 0b1000000000000101, initial = 0b1111111111111111, ) self.comb += [ crc_calc.i_data.eq(shifter.o_data), crc_calc.reset.eq(reset_n2), crc_calc.ce.eq(ce), self.o_crc16.eq(crc_calc.o_crc), ]