Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore(cdc): polish descriptor and improve rx handling
of OUT Endpoint. There was also a bug in the control itf
union descriptor.
  • Loading branch information
hoihu committed Aug 15, 2023
commit 3411ce14bd48704c0cc1ac2f59e656afe387cb2d
117 changes: 89 additions & 28 deletions micropython/usbd/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
)
from .utils import (
endpoint_descriptor,
split_bmRequestType,
STAGE_SETUP,
REQ_TYPE_STANDARD,
REQ_TYPE_CLASS,
EP_IN_FLAG
)
from micropython import const
Expand All @@ -17,9 +21,20 @@
_ITF_ASSOCIATION_DESC_TYPE = const(0xb) # Interface Association descriptor

# CDC control interface definitions
_CDC_ITF_CONTROL_CLASS = const(2)
_CDC_ITF_CONTROL_SUBCLASS = const(2) # Abstract Control Mode
_CDC_ITF_CONTROL_PROT = const(0) # no protocol
_INTERFACE_CLASS_CDC = const(2)
_INTERFACE_SUBCLASS_CDC = const(2) # Abstract Control Mode
_PROTOCOL_NONE = const(0) # no protocol

# CDC descriptor subtype
# see also CDC120.pdf, table 13
_CDC_FUNC_DESC_HEADER = const(0)
_CDC_FUNC_DESC_CALL_MANAGEMENT = const(1)
_CDC_FUNC_DESC_ABSTRACT_CONTROL = const(2)
_CDC_FUNC_DESC_UNION = const(6)

# Other definitions
_CDC_VERSION = const(0x0120) # release number in binary-coded decimal


# CDC data interface definitions
_CDC_ITF_DATA_CLASS = const(0xa)
Expand All @@ -41,47 +56,84 @@ def setup_CDC_device():
class CDCControlInterface(USBInterface):
# Implements the CDC Control Interface

def __init__(self, interface_str):
super().__init__(_CDC_ITF_CONTROL_CLASS, _CDC_ITF_CONTROL_SUBCLASS,
_CDC_ITF_CONTROL_PROT)
def __init__(self, _):
super().__init__(_INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC, _PROTOCOL_NONE)

def get_itf_descriptor(self, num_eps, itf_idx, str_idx):
# CDC needs a Interface Association Descriptor (IAD)
# first interface is zero, two interfaces in total
desc = ustruct.pack("<BBBBBBBB", 8, _ITF_ASSOCIATION_DESC_TYPE, itf_idx, 2,
_CDC_ITF_CONTROL_CLASS, _CDC_ITF_CONTROL_SUBCLASS,
_CDC_ITF_CONTROL_PROT, 0) # "IAD"
# two interfaces in total
desc = ustruct.pack("<BBBBBBBB",
8,
_ITF_ASSOCIATION_DESC_TYPE,
itf_idx,
2,
_INTERFACE_CLASS_CDC,
_INTERFACE_SUBCLASS_CDC,
_PROTOCOL_NONE,
0)

itf, strs = super().get_itf_descriptor(num_eps, itf_idx, str_idx)
desc += itf
# Append the CDC class-specific interface descriptor
# see also USB spec document CDC120-track, p20
desc += ustruct.pack("<BBBH", 5, _CS_DESC_TYPE, 0, 0x0120) # "Header"
desc += ustruct.pack("<BBBBB", 5, _CS_DESC_TYPE, 1, 0, 1) # "Call Management"
desc += ustruct.pack("<BBBB", 4, _CS_DESC_TYPE, 2, 2) # "Abstract Control"
desc += ustruct.pack("<BBBH", 5, _CS_DESC_TYPE, 6, itf_idx, 1) # "Union"
# see CDC120-track, p20
desc += ustruct.pack("<BBBH",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_HEADER, # bDescriptorSubtype
_CDC_VERSION) # cdc version

# CDC-PSTN table3 "Call Management"
# set to No
desc += ustruct.pack("<BBBBB",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_CALL_MANAGEMENT, # bDescriptorSubtype
0, # bmCapabilities - XXX no call managment so far
1) # bDataInterface - interface 1

# CDC-PSTN table4 "Abstract Control"
# set to support line_coding and send_break
desc += ustruct.pack("<BBBB",
4, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_ABSTRACT_CONTROL, # bDescriptorSubtype
0x6) # bmCapabilities D1, D2
# CDC-PSTN "Union"
# set control interface / data interface number
desc += ustruct.pack("<BBBH",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_UNION, # bDescriptorSubtype
itf_idx, # bControlInterface
itf_idx+1) # bSubordinateInterface0 (data class itf number)
return desc, strs

def get_endpoint_descriptors(self, ep_addr, str_idx):
self.ep_in = endpoint_descriptor((ep_addr + 1) | EP_IN_FLAG, "interrupt", 8, 16)
return (self.ep_in, [], ((ep_addr+1) | EP_IN_FLAG,))

def handle_interface_control_xfer(self, stage, request):
# Handle standard and class-specific interface control transfers for HID devices.
bmRequestType, bRequest, wValue, _, _ = request
recipient, req_type, _ = split_bmRequestType(bmRequestType)

print(f'itf cntrl: {recipient}, {req_type}')
super().handle_interface_control_xfer(stage, request)


class CDCDataInterface(USBInterface):
# Implements the CDC Data Interface

def __init__(self, interface_str, timeout=1):
super().__init__(_CDC_ITF_DATA_CLASS, _CDC_ITF_DATA_SUBCLASS,
_CDC_ITF_DATA_PROT)
self.rx_buf = bytearray(256)
self.rx_buf = bytearray(64)
self.mv_buf = memoryview(self.rx_buf)
self.rx_done = False
self.rx_nbytes = 0
self.timeout = timeout

def get_endpoint_descriptors(self, ep_addr, str_idx):
self.ep_in = (ep_addr + 2) | EP_IN_FLAG
self.ep_out = (ep_addr + 2) & ~EP_IN_FLAG
self.ep_in = (ep_addr + 1) | EP_IN_FLAG
self.ep_out = (ep_addr + 2)
# one IN / OUT Endpoint
e_out = endpoint_descriptor(self.ep_out, "bulk", 64, 0)
e_in = endpoint_descriptor(self.ep_in, "bulk", 64, 0)
Expand All @@ -90,17 +142,26 @@ def get_endpoint_descriptors(self, ep_addr, str_idx):
def write(self, data):
super().submit_xfer(self.ep_in, data)

def _poll_rx_endpoint(self, cb):
super().submit_xfer(self.ep_out, self.rx_buf, cb)

def read(self, nbytes=0):
# XXX PoC.. When returning, it should probably
# copy it to a ringbuffer instead of leaving it here
super().submit_xfer(self.ep_out, self.rx_buf, self._cb_rx)
now = time.time()
self.rx_done = False
self.rx_nbytes = 0
while ((time.time() - now) < self.timeout) and not self.rx_done:
time.sleep_ms(10)
return bytes(self.mv_buf[:self.rx_nbytes]) if self.rx_done else None
self.rx_nbytes_requested = nbytes
self.total_rx = bytearray()
self._poll_rx_endpoint(self._cb_rx)
now = time.time()
while ((time.time() - now) < self.timeout):
if self.rx_nbytes >= nbytes:
break
time.sleep_ms(10) # XXX blocking.. could be async'd
return self.total_rx

def _cb_rx(self, ep, res, num_bytes):
self.rx_done = True
self.rx_nbytes = num_bytes
self.total_rx.extend(self.mv_buf[:num_bytes])
self.rx_nbytes += num_bytes
if self.rx_nbytes < self.rx_nbytes_requested:
# try to get more from endpoint
self._poll_rx_endpoint(self._cb_rx)