Source code for usbcore.utils.packet

#!/usr/bin/env python3

from ..pid import PID
from . import CrcMoose3 as crc


[docs]def b(s): """Byte string with LSB first into an integer. >>> b("1") 1 >>> b("01") 2 >>> b("101") 5 """ return int(s[::-1], 2)
[docs]def encode_data(data): """ Converts array of 8-bit ints into string of 0s and 1s. """ output = "" for b in data: output += ("{0:08b}".format(b))[::-1] return output
[docs]def encode_pid(value): if not isinstance(value, PID): value = PID(value) assert isinstance(value, PID), repr(value) return encode_data([value.byte()])
# width=5 poly=0x05 init=0x1f refin=true refout=true xorout=0x1f check=0x19 residue=0x06 name="CRC-5/USB"
[docs]def crc5(nibbles): """ >>> hex(crc5([0, 0])) '0x1' >>> hex(crc5([3, 0])) '0x13' """ reg = crc.CrcRegister(crc.CRC5_USB) for n in nibbles: reg.takeWord(n, 4) return reg.getFinalValue() & 0x1f
[docs]def crc5_token(addr, ep): """ >>> hex(crc5_token(0, 0)) '0x2' >>> hex(crc5_token(92, 0)) '0x1c' >>> hex(crc5_token(3, 0)) '0xa' >>> hex(crc5_token(56, 4)) '0xb' """ reg = crc.CrcRegister(crc.CRC5_USB) reg.takeWord(addr, 7) reg.takeWord(ep, 4) return reg.getFinalValue()
[docs]def crc5_sof(v): """ >>> hex(crc5_sof(1429)) '0x1' >>> hex(crc5_sof(1013)) '0x5' """ reg = crc.CrcRegister(crc.CRC5_USB) reg.takeWord(v, 11) return eval('0b' + bin(reg.getFinalValue() | 0x10000000)[::-1][:5])
[docs]def crc16(input_data): # width=16 poly=0x8005 init=0xffff refin=true refout=true xorout=0xffff check=0xb4c8 residue=0xb001 name="CRC-16/USB" # CRC appended low byte first. reg = crc.CrcRegister(crc.CRC16_USB) for d in input_data: assert d <= 0xff, input_data reg.takeWord(d, 8) crc16 = reg.getFinalValue() return [crc16 & 0xff, (crc16 >> 8) & 0xff]
[docs]def nrzi(data, cycles=4, init="J"): """Converts string of 0s and 1s into NRZI encoded string. >>> nrzi("11 00000001", 1) 'JJ KJKJKJKK' It will do bit stuffing. >>> nrzi("1111111111", 1) 'JJJJJJKKKKK' Support single ended zero >>> nrzi("1111111__", 1) 'JJJJJJKK__' Support pre-encoded mixing. >>> nrzi("11kkj11__", 1) 'JJKKJJJ__' Supports wider clock widths >>> nrzi("101", 4) 'JJJJKKKKKKKK' """ def toggle_state(state): if state == 'J': return 'K' if state == 'K': return 'J' return state state = init output = "" stuffed = [] i = 0 for bit in data: stuffed.append(bit) if bit == '1': i += 1 else: i = 0 if i > 5: stuffed.append('0') i = 0 for bit in stuffed: if bit == ' ': output += bit continue # only toggle the state on '0' if bit == '0': state = toggle_state(state) elif bit == '1': pass elif bit in "jk_": state = bit.upper() else: assert False, "Unknown bit %s in %r" % (bit, data) output += (state * cycles) return output
[docs]def sync(): return "kjkjkjkk"
[docs]def eop(): return "__j"
[docs]def wrap_packet(data, cycles=4): """Add the sync + eop sections and do nrzi encoding. >>> wrap_packet(handshake_packet(PID.ACK), cycles=1) 'KJKJKJKKJJKJJKKK__J' >>> wrap_packet(token_packet(PID.SETUP, 0, 0), cycles=1) 'KJKJKJKKKJJJKKJKJKJKJKJKJKJKKJKJ__J' >>> wrap_packet(data_packet(PID.DATA0, [5, 6]), cycles=1) 'KJKJKJKKKKJKJKKKKJJKJKJKJJJKJKJKKJJJJJJKKJJJJKJK__J' >>> wrap_packet(data_packet(PID.DATA0, [0x1]), cycles=1) 'KJKJKJKKKKJKJKKKKJKJKJKJJKJKJKJJJJJJJKKKJ__J' """ return nrzi(sync() + data + eop(), cycles)
[docs]def token_packet(pid, addr, endp): """Create a token packet for testing. sync, pid, addr (7bit), endp(4bit), crc5(5bit), eop >>> token_packet(PID.SETUP, 0x0, 0x0) '101101000000000000001000' PPPPPPPP - 8 bits - PID AAAAAAA - 7 bits - ADDR EEEE - 4 bits - EP CCCCC - 5 bits - CRC >>> token_packet(PID.IN, 0x3, 0x0) # 0x0A '100101101100000000001010' >>> token_packet(PID.OUT, 0x3a, 0xa) '100001110101110010111100' >>> token_packet(PID.SETUP, 0x70, 0xa) '101101000000111010110101' >>> token_packet(PID.SETUP, 40, 2) '101101000001010010000011' >>> token_packet(PID.SETUP, 28, 2) '101101000011100010001001' PPPPPPPP - 8 bits - PID AAAAAAA - 7 bits - ADDR EEEE - 4 bits - EP CCCCC - 5 bits - CRC """ assert addr < 128, addr assert endp < 2**4, endp assert pid in (PID.OUT, PID.IN, PID.SETUP), pid token = encode_pid(pid) token += "{0:07b}".format(addr)[::-1] # 7 bits address token += "{0:04b}".format(endp)[::-1] # 4 bits endpoint token += "{0:05b}".format(crc5_token(addr, endp))[::-1] # 5 bits CRC5 assert len(token) == 24, token return token
[docs]def data_packet(pid, payload): """Create a data packet for testing. sync, pid, data, crc16, eop FIXME: data should be multiples of 8? >>> data_packet(PID.DATA0, [0x80, 0x06, 0x03, 0x03, 0x09, 0x04, 0x00, 0x02]) '1100001100000001011000001100000011000000100100000010000000000000010000000110101011011100' >>> data_packet(PID.DATA1, []) '110100100000000000000000' """ assert pid in (PID.DATA0, PID.DATA1), pid payload = list(payload) return encode_pid(pid) + encode_data(payload + crc16(payload))
[docs]def handshake_packet(pid): """ Create a handshake packet for testing. sync, pid, eop ack / nak / stall / nyet (high speed only) >>> handshake_packet(PID.ACK) '01001011' >>> handshake_packet(PID.NAK) '01011010' """ assert pid in (PID.ACK, PID.NAK, PID.STALL), pid return encode_pid(pid)
[docs]def sof_packet(frame): """Create a SOF packet for testing. sync, pid, frame no (11bits), crc5(5bits), eop >>> sof_packet(1) '101001011000000000010111' >>> sof_packet(100) '101001010010011000011111' >>> sof_packet(257) '101001011000000010000011' >>> sof_packet(1429) '101001011010100110110000' >>> sof_packet(2**11 - 2) '101001010111111111111101' """ def rev_byte(x): return int("{0:08b}".format(x)[:8][::-1], 2) assert frame < 2**11, (frame, '<', 2**11) frame_rev = int("{0:011b}".format(frame)[:11][::-1], 2) data = [frame_rev >> 3, (frame_rev & 0b111) << 5] data[-1] = data[-1] | crc5_sof(frame) data[0] = rev_byte(data[0]) data[1] = rev_byte(data[1]) return encode_pid(PID.SOF) + encode_data(data)
[docs]def diff(value): """Convert J/K encoding into bits for P/N diff pair. >>> diff('KJ_') ('010', '100') >>> # Convert ACK handshake packet >>> p,n = diff('KJKJKJKKJJKJJKKK__J') >>> p '0101010011011000001' >>> n '1010101100100111000' """ usbp = "" usbn = "" for i in range(len(value)): v = value[i] if v == ' ': continue elif v == '_': # SE0 - both lines pulled low usbp += "0" usbn += "0" elif v == 'J': usbp += "1" usbn += "0" elif v == 'K': usbp += "0" usbn += "1" else: assert False, "Unknown value: %s" % v return usbp, usbn
[docs]def undiff(usbp, usbn): """Convert P/N diff pair bits into J/K encoding. >>> from usbcore.utils.pprint import pp_packet >>> undiff( ... #EJK_ ... '1100', # p ... '1010', # n ... ) 'EJK_' >>> print(pp_packet(undiff( ... #KJKJKJKKJJKJJKKK__J - ACK handshake packet ... '0101010011011000001', # p ... '1010101100100111000', # n ... ), cycles=1)) - K 1 Sync J 2 Sync K 3 Sync J 4 Sync K 5 Sync J 6 Sync K 7 Sync K 8 Sync - J 1 PID (PID.ACK) J 2 PID K 3 PID J 4 PID J 5 PID K 6 PID K 7 PID K 8 PID - _ SE0 _ SE0 J END >>> print(pp_packet(undiff(*diff(wrap_packet(sof_packet(0)))))) ---- KKKK 1 Sync JJJJ 2 Sync KKKK 3 Sync JJJJ 4 Sync KKKK 5 Sync JJJJ 6 Sync KKKK 7 Sync KKKK 8 Sync ---- KKKK 1 PID (PID.SOF) JJJJ 2 PID JJJJ 3 PID KKKK 4 PID JJJJ 5 PID JJJJ 6 PID KKKK 7 PID KKKK 8 PID ---- JJJJ 1 Frame # KKKK 2 Frame # JJJJ 3 Frame # KKKK 4 Frame # JJJJ 5 Frame # KKKK 6 Frame # JJJJ 7 Frame # KKKK 8 Frame # ---- JJJJ 9 Frame # JJJJ 10 Frame # KKKK 11 Frame # JJJJ 1 CRC5 KKKK 2 CRC5 JJJJ 3 CRC5 KKKK 4 CRC5 JJJJ 5 CRC5 ---- ____ SE0 ____ SE0 JJJJ END """ assert len(usbp) == len(usbn), "Sequence different lengths!\n%s\n%s\n" % ( usbp, usbn) value = [] for i in range(0, len(usbp)): p = usbp[i] n = usbn[i] value.append({ #pn '00': '_', '11': 'E', '10': 'J', '01': 'K', }[p+n]) return "".join(value)
if __name__ == "__main__": import doctest doctest.testmod()