Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
usbd: Add cdc example and a basic read function.
  • Loading branch information
hoihu committed Jul 19, 2023
commit d61f000f9519a7e7f24998499c6e57b811e5354d
31 changes: 26 additions & 5 deletions micropython/usbd/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from micropython import const
import ustruct
import time

_DEV_CLASS_MISC = const(0xef)
_CS_DESC_TYPE = const(0x24) # CS Interface type communication descriptor
Expand Down Expand Up @@ -69,18 +70,38 @@ def get_endpoint_descriptors(self, ep_addr, str_idx):
class CDCDataInterface(USBInterface):
# Implements the CDC Data Interface

def __init__(self, interface_str):
def __init__(self, interface_str, timeout=1):
super().__init__(_CDC_ITF_DATA_CLASS, _CDC_ITF_DATA_SUBCLASS,
_CDC_ITF_DATA_PROT)
self.rx_buf = bytearray(256)
self.mv_buf = memoryview(self.rx_buf)
self.rx_done = False
self.rx_nbytes = 0
self.timeout = timeout

def get_endpoint_descriptors(self, ep_addr, str_idx):
# XXX OUT = 0x00 but is defined as 0x80?
self.ep_in = (ep_addr + 2) | EP_OUT_FLAG
self.ep_out = (ep_addr + 2) & ~EP_OUT_FLAG
print("cdc in={} out={}".format(self.ep_in, self.ep_out))
# one IN / OUT Endpoint
e_out = endpoint_descriptor(self.ep_out, "bulk", 64, 0)
e_in = endpoint_descriptor(self.ep_in, "bulk", 64, 0)
desc = e_out + e_in
return (desc, [], (self.ep_out, self.ep_in))

return (e_out + e_in, [], (self.ep_out, self.ep_in))

def write(self, data):
super().submit_xfer(self.ep_in, data)

def read(self, nbytes=0):
# XXX PoC.. When returning, it should probably
# copy it to a ringbuffer instead of leaving it here
super().submit_xfer(self.ep_out, self.rx_buf, self._cb_rx)
now = time.time()
self.rx_done = False
self.rx_nbytes = 0
while ((time.time() - now) < self.timeout) and not self.rx_done:
time.sleep_ms(10)
return bytes(self.mv_buf[:self.rx_nbytes]) if self.rx_done else None

def _cb_rx(self, ep, res, num_bytes):
self.rx_done = True
self.rx_nbytes = num_bytes
14 changes: 14 additions & 0 deletions micropython/usbd/cdc_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from usbd import device, cdc

ud = device.get_usbdevice()
cdc.setup_CDC_device()
ctrl_cdc = cdc.CDCControlInterface('')
data_cdc = cdc.CDCDataInterface('')
ud.add_interface(ctrl_cdc)
ud.add_interface(data_cdc)
ud.reenumerate()

# sending something over CDC
data_cdc.write(b'Hello World')
# receiving something..
print(data_cdc.read(10))