Source code for usbcore.sm.header_test
#!/usr/bin/env python3
import unittest
from migen import *
from ..pid import PIDTypes
from ..rx.pipeline import RxPipeline
from ..utils.packet import *
from ..test.common import BaseUsbTestCase
from .header import PacketHeaderDecode
[docs]class TestPacketHeaderDecode(BaseUsbTestCase):
[docs] def sim(self, stim):
rx = RxPipeline()
dut = PacketHeaderDecode(rx)
run_simulation(
dut, stim(dut),
vcd_name=self.make_vcd_name(),
clocks={"sys": 12, "usb_48": 48, "usb_12": 192},
)
[docs] def recv_packet(self, dut, bits, tick):
if not tick:
def tick():
if False:
yield
for i in range(len(bits)):
b = bits[i]
if b == ' ':
continue
elif b == '_':
# SE0 - both lines pulled low
yield dut.rx.i_usbp.eq(0)
yield dut.rx.i_usbn.eq(0)
elif b == 'J':
yield dut.rx.i_usbp.eq(1)
yield dut.rx.i_usbn.eq(0)
elif b == 'K':
yield dut.rx.i_usbp.eq(0)
yield dut.rx.i_usbn.eq(1)
else:
assert False, "Unknown value: %s" % v
# four 48MHz cycles = 1 bit time
for t in range(0, 4):
yield
continue_sim = yield from tick(dut)
if not continue_sim:
break
MAX_ITER=10000
if continue_sim:
for i in range(0, MAX_ITER):
continue_sim = yield from tick(dut)
if not continue_sim:
break
yield
self.assertFalse(continue_sim)
self.assertLess(i, MAX_ITER-1)
[docs] def check_packet(self, expected_pid, expected_addr, expected_endp, packet):
def stim(dut):
for i in range(100):
yield
def tick(dut):
return not (yield dut.o_decoded)
yield from self.recv_packet(
dut,
packet,
tick,
)
decoded = yield dut.o_decoded
self.assertTrue(decoded)
actual_pid = yield dut.o_pid
self.assertEqual(expected_pid, actual_pid)
actual_addr = yield dut.o_addr
self.assertEqual(expected_addr, actual_addr)
actual_endp = yield dut.o_endp
self.assertEqual(expected_endp, actual_endp)
for i in range(100):
yield
self.sim(stim)
[docs] def check_token(self, expected_pid, expected_addr, expected_endp):
self.check_packet(
expected_pid, expected_addr, expected_endp,
wrap_packet(token_packet(expected_pid, expected_addr, expected_endp)),
)
[docs] def check_data(self, expected_pid, data):
self.check_packet(
expected_pid, 0, 0,
wrap_packet(data_packet(expected_pid, data)),
)
[docs] def check_status(self, expected_pid):
# Status packet is a data_packet with no data.
self.check_packet(
expected_pid, 0, 0,
wrap_packet(data_packet(expected_pid, [])),
)
[docs] def check_handshake(self, expected_pid):
self.check_packet(
expected_pid, 0, 0,
wrap_packet(handshake_packet(expected_pid)),
)
if __name__ == "__main__":
unittest.main()