Source code for usbcore.cpu.epmem

#!/usr/bin/env python3

from enum import IntEnum

from migen import *
from migen.genlib import fifo
from migen.genlib import cdc

from litex.soc.interconnect import stream
from litex.soc.interconnect import wishbone
from litex.soc.interconnect import csr_eventmanager as ev
from litex.soc.interconnect.csr import *

from litex.soc.cores.gpio import GPIOOut

from ..pid import PID, PIDTypes
from ..sm.transfer import UsbTransfer


[docs]class MemInterface(Module, AutoCSR): """ Interfaces the USB state machine core to the soft CPU. This interface has two memory regions: * Output memory. Writable by CPU, readable by USB Core. * Input memory. Writable by USB Core, readable by CPU. Each endpoint has: * A current pointer * A current length * Control bits * A pending flag Pointers are all relative to the start of the memory. On output endpoints, both the pointer and length are read only. On input endpoints, the pointer and length are writable. To accept / send data from an endpoint you set the arm bit. The USB core will then respond to the next request and update the pointer / length. After a packet has been sent or received, the pending flag will be raised. While the pending flag is raised, the USB core will respond with NAK. The `arm`, `dtb`, and `sta` registers are bitmasks. They are packed in pairs of IO. If you only have one endpoint, then `arm`, `dtb`, and `sta` are packed like this: IO Where Bit 1 is set to affect EP0 IN, and Bit 0 is set to affect EP0 OUT Likewise, if you have 3 endpoints, they are packed as: IOIOIO |||||\- EP0 OUT ||||\-- EP0 IN |||\--- EP1 OUT ||\---- EP1 IN |\----- EP2 OUT \------ EP2 IN Therefore, to ARM the EP1 IN endpoint, do: arm_write((1<<1) | 1); Or for EP2 IN: arm_write((1<<2) | 1); """
[docs] def csr_bits(self, csr): """ Work around the lack of bit-addressability in CSRs by creating an array of signals that are aliases of the various CSR storage values. """ l = value_bits_sign(csr.storage)[0] bits = [Signal() for i in range(l)] self.comb += [bits[i].eq(csr.storage[i]) for i in range(l)] return Array(bits)
def __init__(self, iobuf, num_endpoints=3, depth=512): ptr_width = 9 # Signal(max=depth).size self.submodules.usb_core = usb_core = UsbTransfer(iobuf) self.submodules.pullup = GPIOOut(usb_core.iobuf.usb_pullup) self.iobuf = usb_core.iobuf # Output endpoints all_trig = [] trig = [] self.submodules.ev = ev.EventManager() for i in range(0, num_endpoints): exec("self.ev.oep{} = ev.EventSourcePulse()".format(i)) t = getattr(self.ev, "oep{}".format(i)).trigger all_trig.append(t.eq(1)) trig.append(t) exec("self.ev.iep{} = ev.EventSourcePulse()".format(i)) t = getattr(self.ev, "iep{}".format(i)).trigger all_trig.append(t.eq(1)) trig.append(t) self.ev.finalize() # eps_idx is the result of the last IN/OUT/SETUP token, and # therefore describes the current EP that the USB core sees. # The register is of the form: # EEEEI # Where: # E: The last endpoint number # I: True if the current endpoint is an IN endpoint self.eps_idx = eps_idx = Signal(5) self.comb += [ # Sic. The Cat() function places the first argument in the LSB, # and the second argument in the MSB. self.eps_idx.eq(Cat(usb_core.tok == PID.IN, usb_core.endp)), # self.eps_idx_in.eq(Cat(0, usb_core.endp)), ] signal_bits = num_endpoints * 2 # Keep a copy of the control bits for each endpoint # Stall endpoint self.sta = CSRStorage(signal_bits, write_from_dev=True) # Data toggle bit self.dtb = CSRStorage(signal_bits, write_from_dev=True) # Endpoint is ready self.arm = CSRStorage(signal_bits, write_from_dev=True) # Wire up the USB core control bits to the currently-active # endpoint bit. self.comb += [ usb_core.sta.eq(self.csr_bits(self.sta)[eps_idx]), usb_core.arm.eq(self.csr_bits(self.arm)[eps_idx]), usb_core.dtb.eq(~self.csr_bits(self.dtb)[eps_idx]), If(~iobuf.usb_pullup, *all_trig, ).Else( Array(trig)[eps_idx].eq(usb_core.commit), ), ] # Output pathway # ----------------------- self.specials.obuf = Memory(8, depth) self.specials.oport_wr = self.obuf.get_port(write_capable=True, clock_domain="usb_12") self.specials.oport_rd = self.obuf.get_port(clock_domain="sys") optrs = [] for i in range(0, num_endpoints): exec("self.optr_ep{0} = CSRStatus(ptr_width, name='optr_ep{0}')".format(i)) optrs.append(getattr(self, "optr_ep{}".format(i)).status) self.obuf_ptr = Signal(ptr_width) self.comb += [ self.oport_wr.adr.eq(self.obuf_ptr), self.oport_wr.dat_w.eq(usb_core.data_recv_payload), self.oport_wr.we.eq(usb_core.data_recv_put), ] # On a commit, copy the current obuf_ptr to the CSR register. self.sync.usb_12 += [ If(usb_core.commit, If((usb_core.tok == PID.OUT) | (usb_core.tok == PID.SETUP), Array(optrs)[usb_core.endp].eq(self.obuf_ptr), ), ), ] # Set up a signal to reset EP0 when we get a SETUP packet self.usb_ep0_reset = Signal() self.update_dtb = Signal() self.update_ctrl = Signal() self.should_check_ep0 = Signal() #self.obuf_full = Signal() #self.ibuf_empty = Signal() # self.comb += [ # self.usb_ep0_reset.eq(usb_core.start & (usb_core.tok == PID.SETUP)) # ] self.sync.usb_12 += [ self.arm.we.eq(0), self.sta.we.eq(0), self.dtb.we.eq(0), If(usb_core.data_recv_put, self.obuf_ptr.eq(self.obuf_ptr + 1) ), # If the EP0 needs resetting, then clear the EP0 IN and OUT bits, which # are stored in the lower two bits of the three control registers. If(usb_core.start, self.should_check_ep0.eq(1), ).Elif(self.should_check_ep0, self.should_check_ep0.eq(0), If(usb_core.tok == PID.SETUP, self.update_ctrl.eq(1), self.update_dtb.eq(1), self.arm.dat_w.eq(self.arm.storage & ~0b11), self.sta.dat_w.eq(self.sta.storage & ~0b11), self.dtb.dat_w.eq(self.dtb.storage & ~0b11), ), ).Elif(usb_core.commit, #self.update_ctrl.eq((self.obuf_full & ~eps_idx[0]) | (self.ibuf_empty & eps_idx[0])), self.update_ctrl.eq(1), self.update_dtb.eq(1), self.arm.dat_w.eq((self.arm.storage & ~(1 << eps_idx))), self.sta.dat_w.eq(self.sta.storage), self.dtb.dat_w.eq((self.dtb.storage ^ (1 << eps_idx))), ), If(self.update_ctrl, self.update_ctrl.eq(0), self.arm.we.eq(1), self.sta.we.eq(1), ), If(self.update_dtb, self.update_dtb.eq(0), self.dtb.we.eq(1), ) ] #self.diff_addr = Signal(ptr_width) #self.comb += [ # self.diff_addr.eq(self.oport_rd.adr - self.oport_wr.adr), # # check if there are at least 64 Bytes (== FS max packet size) left in the out buffer # self.obuf_full.eq((self.diff_addr != 0) & (self.diff_addr <= 64)), # #self.obuf_full.eq( (self.diff_addr[7::] == 0) & # # (self.diff_addr[0:7] != 0)), #] # Input pathway # ----------------------- self.specials.ibuf = Memory(8, depth) self.specials.iport_wr = self.ibuf.get_port(write_capable=True, clock_domain="sys") self.specials.iport_rd = self.ibuf.get_port(clock_domain="usb_12") #for i in range(0, num_endpoints): # exec("self.submodules.iptr_ep{0} = CSRStorage(ptr_width, name='iptr_ep{0}')".format(i)) # iptrs.append(getattr(self, "iptr_ep{}".format(i)).storage) # # exec("self.submodules.ilen_ep{0} = CSRStorage(ptr_width, name='ilen_ep{0}')".format(i)) # ilens.append(getattr(self, "ilen_ep{}".format(i)).storage) assert num_endpoints == 3 self.iptr_ep0 = CSRStorage(ptr_width) self.ilen_ep0 = CSRStorage(ptr_width) self.iptr_ep1 = CSRStorage(ptr_width) self.ilen_ep1 = CSRStorage(ptr_width) self.iptr_ep2 = CSRStorage(ptr_width) self.ilen_ep2 = CSRStorage(ptr_width) iptrs = [self.iptr_ep0.storage,self.iptr_ep1.storage,self.iptr_ep2.storage] ilens = [self.ilen_ep0.storage,self.ilen_ep1.storage,self.ilen_ep2.storage] self.ibuf_ptr = Signal(ptr_width) self.comb += [ self.iport_rd.adr.eq(self.ibuf_ptr), usb_core.data_send_payload.eq(self.iport_rd.dat_r), #self.iport_rd.re.eq(), ] # On a transfer start, copy the CSR register into ibuf_ptr self.sync.usb_12 += [ If(usb_core.start, self.ibuf_ptr.eq(Array(iptrs)[usb_core.endp]), ), ] self.sync.usb_12 += [ If(usb_core.data_send_get, self.ibuf_ptr.eq(self.ibuf_ptr + 1)), ] self.comb += [ usb_core.data_send_have.eq(self.ibuf_ptr != Array(ilens)[usb_core.endp]), ]