Source code for usbcore.cpu.usbwishbonebridge

from migen import *

from migen.genlib.misc import chooser, WaitTimer
from migen.genlib.record import Record
from migen.genlib.fsm import FSM, NextState

from litex.soc.interconnect import wishbone
from litex.soc.interconnect import stream

from ..pid import PID, PIDTypes

[docs]class USBWishboneBridge(Module): def __init__(self, usb_core, clk_freq=12000000, magic_packet=0x43): self.wishbone = wishbone.Interface() # # # byte_counter = Signal(3, reset_less=True) byte_counter_reset = Signal() byte_counter_ce = Signal() self.sync += \ If(byte_counter_reset, byte_counter.eq(0) ).Elif(byte_counter_ce, byte_counter.eq(byte_counter + 1) ) # Unlike the UART or Ethernet bridges, we explicitly only # support two commands: reading and writing. This gets # integrated into the USB protocol, so it's not really a # state. 1 is "USB Device to Host", and is therefore a "read", # while 0 is "USB Host to Device", and is therefore a "write". cmd = Signal(1, reset_less=True) cmd_ce = Signal() # Instead of self.source and self.sink, we let the wrapping # module handle packing and unpacking the data. self.sink_data = Signal(8) # True when the "sink" value has data self.sink_valid = Signal() self.send_ack = Signal() # Indicates whether a "debug" packet is currently being processed self.n_debug_in_progress = Signal(reset=1) address = Signal(32, reset_less=True) address_ce = Signal() data = Signal(32, reset_less=True) rx_data_ce = Signal() tx_data_ce = Signal() self.sync += [ If(cmd_ce, cmd.eq(usb_core.data_recv_payload[7:8])), If(address_ce, address.eq(Cat(address[8:32], usb_core.data_recv_payload))), If(rx_data_ce, data.eq(Cat(data[8:32], usb_core.data_recv_payload)) ).Elif(tx_data_ce, data.eq(self.wishbone.dat_r) ) ] fsm = ResetInserter()(FSM(reset_state="IDLE")) self.submodules += fsm fsm.act("IDLE", self.n_debug_in_progress.eq(1), If(usb_core.data_recv_put, If(usb_core.tok == PID.SETUP, If(usb_core.endp == 0, # If we get a SETUP packet with a "Vendor" type # going to this device, treat that as a DEBUG packet. cmd_ce.eq(1), byte_counter_reset.eq(1), If(usb_core.data_recv_payload[0:7] == magic_packet, NextState("RECEIVE_ADDRESS"), ).Else( # Wait for the end of the packet, to avoid # messing with normal USB operation NextState("WAIT_PKT_END"), ), ) ) ) ) # The target address comes as the wValue and wIndex in the SETUP # packet. Once we get that data, we're ready to do the operation. fsm.act("RECEIVE_ADDRESS", self.n_debug_in_progress.eq(0), If(usb_core.data_recv_put, byte_counter_ce.eq(1), If((byte_counter >= 1), If((byte_counter <= 4), address_ce.eq(1), ), ), ), # We don't need to explicitly ACK the SETUP packet, because # they're always acknowledged implicitly. Wait until the # packet ends (i.e. until we've sent the ACK packet) before # moving to the next state. If(usb_core.end, byte_counter_reset.eq(1), If(cmd, NextState("READ_DATA") ).Else( NextState("RECEIVE_DATA") ), ), ) fsm.act("RECEIVE_DATA", # Set the "ACK" bit to 1, so we acknowledge the packet # once it comes in, and so that we're in a position to # receive data. self.send_ack.eq(usb_core.endp == 0), self.n_debug_in_progress.eq(0), If(usb_core.endp == 0, If(usb_core.data_recv_put, rx_data_ce.eq(1), byte_counter_ce.eq(1), If(byte_counter == 3, NextState("WAIT_RECEIVE_DATA_END"), ).Elif(usb_core.end, NextState("WRITE_DATA"), ) ) ) ) fsm.act("WAIT_RECEIVE_DATA_END", self.n_debug_in_progress.eq(0), self.send_ack.eq(1), # Wait for the end of the USB packet, if # it hasn't come already. If(usb_core.end, NextState("WRITE_DATA") ) ) self.comb += [ # Trim off the last two bits of the address, because wishbone addresses # are word-based, and a word is 32-bits. Therefore, the last two bits # should always be zero. self.wishbone.adr.eq(address[2:]), self.wishbone.dat_w.eq(data), self.wishbone.sel.eq(2**len(self.wishbone.sel) - 1) ] fsm.act("WRITE_DATA", self.n_debug_in_progress.eq(0), self.wishbone.stb.eq(1), self.wishbone.we.eq(1), self.wishbone.cyc.eq(1), If(self.wishbone.ack | self.wishbone.err, NextState("WAIT_SEND_ACK_START"), ) ) fsm.act("READ_DATA", self.n_debug_in_progress.eq(0), self.wishbone.stb.eq(1), self.wishbone.we.eq(0), self.wishbone.cyc.eq(1), If(self.wishbone.ack | self.wishbone.err, tx_data_ce.eq(1), NextState("SEND_DATA_WAIT_START") ) ) fsm.act("SEND_DATA_WAIT_START", self.n_debug_in_progress.eq(0), byte_counter_reset.eq(1), If(usb_core.start, NextState("SEND_DATA"), ), ) self.comb += \ chooser(data, byte_counter, self.sink_data, n=4, reverse=False) fsm.act("SEND_DATA", self.n_debug_in_progress.eq(0), If(usb_core.endp != 0, NextState("SEND_DATA_WAIT_START"), ), # Keep sink_valid high during the packet, which indicates we have data # to send. This also causes an "ACK" to be transmitted. self.sink_valid.eq(usb_core.endp == 0), If(usb_core.data_send_get, byte_counter_ce.eq(1), ), If(byte_counter == 4, NextState("WAIT_SEND_ACK_START") ), If(usb_core.end, NextState("WAIT_SEND_ACK_START") ) ) # To validate the transaction was successful, the host will now # send an "IN" request. Acknowledge that by setting # self.send_ack, without putting anything in self.sink_data. fsm.act("WAIT_SEND_ACK_START", self.n_debug_in_progress.eq(0), If(usb_core.start, NextState("SEND_ACK") ), ) # Send the ACK. If the endpoint number is incorrect, go back and # wait again. fsm.act("SEND_ACK", self.n_debug_in_progress.eq(0), If(usb_core.endp != 0, NextState("WAIT_SEND_ACK_START") ), # If(usb_core.retry, # If(cmd, # byte_counter_reset.eq(1), # NextState("SEND_DATA"), # ), # ), self.send_ack.eq(usb_core.endp == 0), If(usb_core.end, NextState("IDLE"), ) ) fsm.act("WAIT_PKT_END", self.n_debug_in_progress.eq(1), If(usb_core.end, NextState("IDLE"), ) )