#!/usr/bin/env python3
from enum import IntEnum
from migen import *
from migen.genlib import fsm
from ..endpoint import EndpointType, EndpointResponse
from ..pid import PID, PIDTypes
from ..sm.transfer import UsbTransfer
from .usbwishbonebridge import USBWishboneBridge
[docs]class DummyUsb(Module):
"""
Implements a device that simply responds to the most common SETUP packets.
It is intended to be used alongside the Wishbone debug bridge.
"""
def __init__(self, iobuf, debug=False, vid=0x1209, pid=0x5bf0,
product="Fomu Bridge",
manufacturer="Foosn"):
# USB Core
self.submodules.usb_core = usb_core = UsbTransfer(iobuf)
if usb_core.iobuf.usb_pullup is not None:
self.comb += usb_core.iobuf.usb_pullup.eq(1)
self.iobuf = usb_core.iobuf
# SETUP packets contain a DATA segment that is always 8 bytes.
# However, we're only ever interested in the first 4 bytes, plus
# the last byte.
usbPacket = Signal(32)
wRequestAndType = Signal(16)
wValue = Signal(16)
wLength = Signal(8)
self.comb += [
wRequestAndType.eq(usbPacket[16:32]),
wValue.eq(usbPacket[0:16]),
]
setup_index = Signal(4)
address = Signal(7, reset=0)
self.comb += usb_core.addr.eq(address),
def make_usbstr(s):
usbstr = bytearray(2)
# The first byte is the number of characters in the string.
# Because strings are utf_16_le, each character is two-bytes.
# That leaves 126 bytes as the maximum length
assert(len(s) <= 126)
usbstr[0] = (len(s)*2)+2
usbstr[1] = 3
usbstr.extend(bytes(s, 'utf_16_le'))
return list(usbstr)
# Start with 0x8006
descriptors = {
# Config descriptor
# 80 06 00 02
0x0002: [
0x09, 0x02, 0x12, 0x00, 0x01, 0x01, 0x01, 0x80,
0x32, 0x09, 0x04, 0x00, 0x00, 0x00, 0xfe, 0x00,
0x00, 0x02,
],
# Device descriptor
# 80 06 00 01
0x0001: [
0x12, 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x40,
(vid>>0)&0xff, (vid>>8)&0xff,
(pid>>0)&0xff, (pid>>8)&0xff,
0x01, 0x01, 0x01, 0x02,
0x00, 0x01,
],
# String 0
0x0003: [
0x04, 0x03, 0x09, 0x04,
],
# String 1 (manufacturer)
0x0103: make_usbstr(manufacturer),
# String 2 (Product)
0x0203: make_usbstr(product),
# BOS descriptor
0x000f: [
0x05, 0x0f, 0x1d, 0x00, 0x01, 0x18, 0x10, 0x05,
0x00, 0x38, 0xb6, 0x08, 0x34, 0xa9, 0x09, 0xa0,
0x47, 0x8b, 0xfd, 0xa0, 0x76, 0x88, 0x15, 0xb6,
0x65, 0x00, 0x01, 0x02, 0x01,
],
0xee03: [
0x12, 0x03, 0x4d, 0x53, 0x46, 0x54, 0x31, 0x30,
0x30, 0x7e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
],
}
# Starts with 0xc07e or 0xc17e
usb_ms_compat_id_descriptor = [
0x28, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01, 0x57, 0x49, 0x4e, 0x55, 0x53, 0x42,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
]
class MemoryContents:
def __init__(self):
self.contents = [0x00]
self.offsets = {}
self.lengths = {}
def add(self, wRequestAndType, wValue, mem):
self.offsets[wRequestAndType << 16 | wValue] = len(self.contents)
self.lengths[wRequestAndType << 16 | wValue] = len(mem)
self.contents = self.contents + mem
mem = MemoryContents()
for key, value in descriptors.items():
mem.add(0x8006, key, value)
mem.add(0xc07e, 0x0000, usb_ms_compat_id_descriptor)
mem.add(0x8000, 0x0000, [0, 0]) # Get device status
mem.add(0x0009, 0x0100, []) # Set configuration 1
out_buffer = self.specials.out_buffer = Memory(8, len(mem.contents), init=mem.contents)
self.specials.out_buffer_rd = out_buffer_rd = out_buffer.get_port(write_capable=False, clock_domain="usb_12")
last_start = Signal()
# Set to 1 if we have a response that matches the requested descriptor
have_response = self.have_response = Signal()
# Needs to be able to index Memory
response_addr = Signal(9)
response_len = Signal(6)
response_ack = Signal()
bytes_remaining = Signal(6)
bytes_addr = Signal(9)
# Respond to various descriptor requests
cases = {}
for key in mem.offsets:
cases[key] = [
response_len.eq(mem.lengths[key]),
response_addr.eq(mem.offsets[key]),
]
self.comb += Case(usbPacket, cases)
# Used to respond to Transaction stage
transaction_queued = Signal()
new_address = Signal(7)
configuration = Signal(8)
# Generate debug signals, in case debug is enabled.
debug_packet_detected = Signal()
debug_sink_data = Signal(8)
debug_sink_data_ready = Signal()
debug_ack_response = Signal()
# Delay the "put" signal (and corresponding data) by one cycle, to allow
# the debug system to inhibit this write. In practice, this doesn't
# impact our latency at all as this signal runs at a rate of ~1 MHz.
data_recv_put_delayed = self.data_recv_put_delayed = Signal()
data_recv_payload_delayed = self.data_recv_payload_delayed = Signal(8)
self.sync += [
data_recv_put_delayed.eq(usb_core.data_recv_put),
data_recv_payload_delayed.eq(usb_core.data_recv_payload),
]
# Wire up debug signals if required
if debug:
debug_bridge = USBWishboneBridge(usb_core)
self.submodules.debug_bridge = ClockDomainsRenamer("usb_12")(debug_bridge)
self.comb += [
debug_packet_detected.eq(~self.debug_bridge.n_debug_in_progress),
debug_sink_data.eq(self.debug_bridge.sink_data),
debug_sink_data_ready.eq(self.debug_bridge.sink_valid),
debug_ack_response.eq(self.debug_bridge.send_ack | self.debug_bridge.sink_valid),
]
self.comb += [
usb_core.dtb.eq(1),
If(debug_packet_detected,
usb_core.sta.eq(0),
usb_core.arm.eq(debug_ack_response),
usb_core.data_send_payload.eq(debug_sink_data),
have_response.eq(debug_sink_data_ready),
).Else(
usb_core.sta.eq(~(have_response | response_ack)),
usb_core.arm.eq(have_response | response_ack),
usb_core.data_send_payload.eq(out_buffer_rd.dat_r),
have_response.eq(bytes_remaining > 0),
),
out_buffer_rd.adr.eq(bytes_addr),
usb_core.data_send_have.eq(have_response),
]
self.sync += [
usb_core.reset.eq(usb_core.error),
last_start.eq(usb_core.start),
If(last_start,
If(usb_core.tok == PID.SETUP,
setup_index.eq(0),
bytes_remaining.eq(0),
).Elif(transaction_queued,
response_ack.eq(1),
transaction_queued.eq(0),
address.eq(new_address),
)
),
If(usb_core.tok == PID.SETUP,
If(data_recv_put_delayed,
setup_index.eq(setup_index + 1),
Case(setup_index, {
0: usbPacket.eq(Cat(data_recv_payload_delayed, usbPacket[0:24], )),
1: usbPacket.eq(Cat(data_recv_payload_delayed, usbPacket[0:24], )),
2: usbPacket.eq(Cat(data_recv_payload_delayed, usbPacket[0:24], )),
3: usbPacket.eq(Cat(data_recv_payload_delayed, usbPacket[0:24], )),
# 4: wIndex.eq(data_recv_payload_delayed),
# 5: wIndex.eq(Cat(wIndex[0:8], data_recv_payload_delayed)),
6: wLength.eq(data_recv_payload_delayed),
# 7: wLength.eq(Cat(wLength[0:8], data_recv_payload_delayed)),
}),
),
),
# After a SETUP's DATA packet has come in, figure out if we need
# to respond to any special requests.
If(usb_core.setup,
# Set Address / Configuration
If(wRequestAndType == 0x0005,
# Set Address
new_address.eq(wValue[8:15]),
response_ack.eq(1),
).Elif(wRequestAndType == 0x0009,
configuration.eq(wValue[8:15]),
response_ack.eq(1),
),
If(response_len > wLength,
bytes_remaining.eq(wLength),
).Else(
bytes_remaining.eq(response_len),
),
bytes_addr.eq(response_addr),
),
If(usb_core.data_send_get,
response_ack.eq(1),
bytes_addr.eq(bytes_addr + 1),
If(bytes_remaining,
bytes_remaining.eq(bytes_remaining - 1),
),
),
If(self.data_recv_put_delayed,
response_ack.eq(0),
transaction_queued.eq(1),
),
]