diff --git a/micropython/usbd/__init__.py b/micropython/usbd/__init__.py new file mode 100644 index 000000000..90739c27e --- /dev/null +++ b/micropython/usbd/__init__.py @@ -0,0 +1,4 @@ +from .device import get_usbdevice, USBInterface +from .hid import HIDInterface, MouseInterface +from .midi import DummyAudioInterface, MIDIInterface, MidiUSB +from . import utils diff --git a/micropython/usbd/device.py b/micropython/usbd/device.py new file mode 100644 index 000000000..492b42ea3 --- /dev/null +++ b/micropython/usbd/device.py @@ -0,0 +1,625 @@ +# MicroPython USB device module +# MIT license; Copyright (c) 2022 Angus Gratton +from micropython import const +import machine +import struct + +from .utils import split_bmRequestType, EP_IN_FLAG + +# USB descriptor types +_STD_DESC_DEVICE_TYPE = const(0x1) +_STD_DESC_CONFIG_TYPE = const(0x2) +_STD_DESC_STRING_TYPE = const(0x3) +_STD_DESC_INTERFACE_TYPE = const(0x4) +_STD_DESC_INTERFACE_ASSOC = const(0xB) + +# Standard USB descriptor lengths +_STD_DESC_CONFIG_LEN = const(9) +_STD_DESC_INTERFACE_LEN = const(9) + +# Standard control request bmRequest fields, can extract by calling split_bmRequestType() +_REQ_RECIPIENT_DEVICE = const(0x0) +_REQ_RECIPIENT_INTERFACE = const(0x1) +_REQ_RECIPIENT_ENDPOINT = const(0x2) +_REQ_RECIPIENT_OTHER = const(0x3) + +# Offsets into the standard configuration descriptor, to fixup +_OFFS_CONFIG_iConfiguration = const(6) + + +# Singleton _USBDevice instance +_inst = None + + +def get_usbdevice(): + # Access the singleton instance of the MicroPython _USBDevice object. + # + # TODO: It might be better to factor this as a module-level interface? + global _inst + if not _inst: + _inst = _USBDevice() + return _inst + + +class _USBDevice: + # Class that implements the Python parts of the MicroPython USBDevice. + # + # This object represents any interfaces on the USB device that are implemented + # in Python, and also allows disabling the 'static' USB interfaces that are + # implemented in Python (if include_static property is set to False). + # + # Should be accessed via the singleton getter module function get_usbdevice(), + # not instantiated directly.. + def __init__(self): + self._eps = {} # Mapping from endpoint address to interface object + self._ep_cbs = {} # Mapping from endpoint address to Optional[xfer callback] + self._itfs = [] # List of interfaces + self.include_static = True # Include static devices when enumerating? + + # Device properties, set non-NULL to override static values + self.manufacturer_str = None + self.product_str = None + self.serial_str = None + self.id_vendor = None + self.id_product = None + self.device_class = None + self.device_subclass = None + self.device_protocol = None + self.bcd_device = None + + # Configuration properties + self.config_str = None + self.max_power_ma = 50 + + self._strs = self._get_device_strs() + + usbd = self._usbd = machine.USBD() + usbd.init( + descriptor_device_cb=self._descriptor_device_cb, + descriptor_config_cb=self._descriptor_config_cb, + descriptor_string_cb=self._descriptor_string_cb, + open_cb=self._open_cb, + reset_cb=self._reset_cb, + control_xfer_cb=self._control_xfer_cb, + xfer_cb=self._xfer_cb, + ) + + def add_interface(self, itf): + # Add an instance of USBInterface to the USBDevice. + # + # The next time USB is reenumerated (by calling .reenumerate() or + # otherwise), this interface will appear to the host. + self._itfs.append(itf) + + def remove_interface(self, itf): + # Remove an instance of USBInterface from the USBDevice. + # + # If the USB device is currently enumerated to a host, and in particular + # if any endpoint transfers are pending, then this may cause it to + # misbehave as these transfers are not cancelled. + self._itfs.remove(itf) + + def reenumerate(self): + # Disconnect the USB device and then reconnect it, causing the host to + # reenumerate it. + # + # Any open USB interfaces (for example USB-CDC serial connection) will be + # temporarily terminated. + # + # This is the only way to change the composition of an existing USB device + # from the device side without disconnecting/reconnecting the port. + self._usbd.reenumerate() + + def _descriptor_device_cb(self): + # Singleton callback from TinyUSB to read the USB device descriptor. + # + # This function will build a new device descriptor based on the 'static' + # USB device values compiled into MicroPython, but many values can be + # optionally overriden by setting properties of this object. + + FMT = "= 0 # index shouldn't be in the static range + try: + return self._itfs[index] + except IndexError: + return None # host has old mappings for interfaces + + def _descriptor_config_cb(self): + # Singleton callback from TinyUSB to read the configuration descriptor. + # + # Each time this function is called (in response to a GET DESCRIPTOR - + # CONFIGURATION request from the host), it rebuilds the full configuration + # descriptor and also the list of strings stored in self._strs. + # + # This normally only happens during enumeration, but may happen more than + # once (the host will first ask for a minimum length descriptor, and then + # use the length field request to request the whole thing). + static = self._usbd.static + + # Rebuild the _strs list as we build the configuration descriptor + strs = self._get_device_strs() + + if self.include_static: + desc = bytearray(static.desc_cfg) + else: + desc = bytearray(_STD_DESC_CONFIG_LEN) + + self._eps = {} # rebuild endpoint mapping as we enumerate each interface + self._ep_cbs = {} + itf_idx = static.itf_max + ep_addr = static.ep_max + str_idx = static.str_max + len(strs) + for itf in self._itfs: + # Get the endpoint descriptors first so we know how many endpoints there are + ep_desc, ep_strs, ep_addrs = itf.get_endpoint_descriptors(ep_addr, str_idx) + strs += ep_strs + str_idx += len(ep_strs) + + # Now go back and get the interface descriptor + itf_desc, itf_strs = itf.get_itf_descriptor(len(ep_addrs), itf_idx, str_idx) + desc += itf_desc + strs += itf_strs + itf_idx += 1 + str_idx += len(itf_strs) + + desc += ep_desc + for e in ep_addrs: + self._eps[e] = itf + self._ep_cbs[e] = None # no pending callback + # TODO: check if always incrementing leaves too many gaps + ep_addr = max((e & ~EP_IN_FLAG) + 1, ep_addr) + + self._update_configuration_descriptor(desc) + + self._strs = strs + return desc + + def _update_configuration_descriptor(self, desc): + # Utility function to update the Standard Configuration Descriptor + # header supplied in the argument with values based on the current state + # of the device. + # + # See USB 2.0 specification section 9.6.3 p264 for details. + # + # Currently only one configuration per device is supported. + bmAttributes = ( + (1 << 7) # Reserved + | (0 if self.max_power_ma else (1 << 6)) # Self-Powered + # Remote Wakeup not currently supported + ) + + iConfiguration = self._get_str_index(self.config_str) + if self.include_static and not iConfiguration: + iConfiguration = desc[_OFFS_CONFIG_iConfiguration] + + bNumInterfaces = self._usbd.static.itf_max if self.include_static else 0 + bNumInterfaces += len(self._itfs) + + struct.pack_into( + "= 0 + ) # Shouldn't get any calls here where index is less than first dynamic string index + try: + return self._strs[index] + except IndexError: + return None + + def _open_cb(self, interface_desc_view): + # Singleton callback from TinyUSB custom class driver, when USB host does + # Set Configuration. The "runtime class device" accepts all interfaces that + # it has sent in descriptors, and calls this callback. + + # Walk the view of the "claimed" descriptor data provided in the + # callback and call handle_open() on each claimed interface + # + # ... this may be unnecessary at the moment, as only one configuration is supported so we + # can probably assume all the interfaces will be included. + i = 0 + while i < len(interface_desc_view): + # descriptor length, type, and index (if it's an interface descriptor) + dl, dt, di = interface_desc_view[i : i + 3] + if dt == _STD_DESC_INTERFACE_TYPE: + if di >= self._usbd.static.itf_max: + di -= self._usbd.static.itf_max + self._itfs[di].handle_open() + i += dl + assert dl + + def _reset_cb(self): + # Callback when the USB device is reset by the host + + # Cancel outstanding transfer callbacks + for k in self._ep_cbs.keys(): + self._ep_cbs[k] = None + + # Allow interfaces to respond to the reset + for itf in self._itfs: + itf.handle_reset() + + def _submit_xfer(self, ep_addr, data, done_cb=None): + # Singleton function to submit a USB transfer (of any type except control). + # + # Generally, drivers should call USBInterface.submit_xfer() instead. See + # that function for documentation about the possible parameter values. + cb = self._ep_cbs[ep_addr] + if cb: + raise RuntimeError(f"Pending xfer on EP {ep_addr}") + + # USBD callback may be called immediately, before Python execution + # continues + self._ep_cbs[ep_addr] = done_cb + + if not self._usbd.submit_xfer(ep_addr, data): + self._ep_cbs[ep_addr] = None + return False + return True + + def _xfer_cb(self, ep_addr, result, xferred_bytes): + # Singleton callback from TinyUSB custom class driver when a transfer completes. + cb = self._ep_cbs.get(ep_addr, None) + if cb: + self._ep_cbs[ep_addr] = None + cb(ep_addr, result, xferred_bytes) + + def _control_xfer_cb(self, stage, request): + # Singleton callback from TinyUSB custom class driver when a control + # transfer is in progress. + # + # stage determines appropriate responses (possible values + # utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK). + # + # The TinyUSB class driver framework only calls this function for + # particular types of control transfer, other standard control transfers + # are handled by TinyUSB itself. + bmRequestType, _, _, wIndex, _ = request + recipient, _, _ = split_bmRequestType(bmRequestType) + + itf = None + result = None + + if recipient == _REQ_RECIPIENT_DEVICE: + itf = self._get_interface(wIndex & 0xFFFF) + if itf: + result = itf.handle_device_control_xfer(stage, request) + elif recipient == _REQ_RECIPIENT_INTERFACE: + itf = self._get_interface(wIndex & 0xFFFF) + if itf: + result = itf.handle_interface_control_xfer(stage, request) + elif recipient == _REQ_RECIPIENT_ENDPOINT: + ep_num = wIndex & 0xFFFF + itf = self._eps.get(ep_num, None) + if itf: + result = itf.handle_endpoint_control_xfer(stage, request) + + if not itf: + # At time this code was written, only the control transfers shown + # above are passed to the class driver callback. See + # invoke_class_control() in tinyusb usbd.c + print(f"Unexpected control request type {bmRequestType:#x}") + return False + + # Accept the following possible replies from handle_NNN_control_xfer(): + # + # True - Continue transfer, no data + # False - STALL transfer + # Object with buffer interface - submit this data for the control transfer + if type(result) == bool: + return result + + return self._usbd.control_xfer(request, result) + + +class USBInterface: + # Abstract base class to implement a USBInterface (and associated endpoints) in Python + + def __init__( + self, + bInterfaceClass=0xFF, + bInterfaceSubClass=0, + bInterfaceProtocol=0xFF, + interface_str=None, + ): + # Create a new USBInterface object. Optionally can set bInterfaceClass, + # bInterfaceSubClass, bInterfaceProtocol values to specify the interface + # type. Can also optionally set a string descriptor value interface_str to describe this + # interface. + # + # The defaults are to set 'vendor' class and protocol values, the host + # will not attempt to use any standard class driver to talk to this + # interface. + + # Defaults set "vendor" class and protocol + self.bInterfaceClass = bInterfaceClass + self.bInterfaceSubClass = bInterfaceSubClass + self.bInterfaceProtocol = bInterfaceProtocol + self.interface_str = interface_str + self._open = False + + def get_itf_descriptor(self, num_eps, itf_idx, str_idx): + # Return the interface descriptor binary data and associated other + # descriptors for the interface (not including endpoint descriptors), plus + # associated string descriptor data. + # + # For most types of USB interface, this function doesn't need to be + # overriden. Only override if you need to append interface-specific + # descriptors before the first endpoint descriptor. To return an Interface + # Descriptor Association, on the first interface this function should + # return the IAD descriptor followed by the Interface descriptor. + # + # Parameters: + # + # - num_eps - number of endpoints in the interface, as returned by + # get_endpoint_descriptors() which is actually called before this + # function. + # + # - itf_idx - Interface index number for this interface. + # + # - str_idx - First string index number to assign for any string + # descriptor indexes included in the result. + # + # Result: + # + # Should be a 2-tuple: + # + # - Interface descriptor binary data, to return as part of the + # configuration descriptor. + # + # - List of any strings referenced in the interface descriptor data + # (indexes in the descriptor data should start from 'str_idx'.) + # + # See USB 2.0 specification section 9.6.5 p267 for standard interface descriptors. + desc = struct.pack( + "<" + "B" * _STD_DESC_INTERFACE_LEN, + _STD_DESC_INTERFACE_LEN, # bLength + _STD_DESC_INTERFACE_TYPE, # bDescriptorType + itf_idx, # bInterfaceNumber + 0, # bAlternateSetting, not currently supported + num_eps, + self.bInterfaceClass, + self.bInterfaceSubClass, + self.bInterfaceProtocol, + str_idx if self.interface_str else 0, # iInterface + ) + strs = [self.interface_str] if self.interface_str else [] + + return (desc, strs) + + def get_endpoint_descriptors(self, ep_addr, str_idx): + # Similar to get_itf_descriptor, returns descriptors for any endpoints + # in this interface, plus associated other configuration descriptor data. + # + # The base class returns no endpoints, so usually this is overriden in the subclass. + # + # This function is called any time the host asks for a configuration + # descriptor. It is actually called before get_itf_descriptor(), so that + # the number of endpoints is known. + # + # Parameters: + # + # - ep_addr - Address for this endpoint, without any utils.EP_IN_FLAG (0x80) bit set. + # - str_idx - Index to use for the first string descriptor in the result, if any. + # + # Result: + # + # Should be a 3-tuple: + # + # - Endpoint descriptor binary data and associated other descriptors for + # the endpoint, to return as part of the configuration descriptor. + # + # - List of any strings referenced in the descriptor data (indexes in the + # descriptor data should start from 'str_idx'.) + # + # - List of endpoint addresses referenced in the descriptor data (should + # start from ep_addr, optionally with the utils.EP_IN_FLAG bit set.) + return (b"", [], []) + + def handle_open(self): + # Callback called when the USB host accepts the device configuration. + # + # Override this function to initiate any operations that the USB interface + # should do when the USB device is configured to the host. + self._open = True + + def handle_reset(self): + # Callback called on every registered interface when the USB device is + # reset by the host. This can happen when the USB device is unplugged, + # or if the host triggers a reset for some other reason. + # + # Override this function to cancel any pending operations specific to + # the interface (outstanding USB transfers are already cancelled). + # + # At this point, no USB functionality is available - handle_open() will + # be called later if/when the USB host re-enumerates and configures the + # interface. + self._open = False + + def is_open(self): + # Returns True if the interface is in use + return self._open + + def handle_device_control_xfer(self, stage, request): + # Control transfer callback. Override to handle a non-standard device + # control transfer where bmRequestType Recipient is Device, Type is + # utils.REQ_TYPE_CLASS, and the lower byte of wIndex indicates this interface. + # + # (See USB 2.0 specification 9.4 Standard Device Requests, p250). + # + # This particular request type seems pretty uncommon for a device class + # driver to need to handle, most hosts will not send this so most + # implementations won't need to override it. + # + # Parameters: + # + # - stage is one of utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK. + # - request is a tuple of (bmRequestType, bRequest, wValue, wIndex, + # - wLength), as per USB 2.0 specification 9.3 USB Device Requests, p250. + # + # The function can call split_bmRequestType() to split bmRequestType into + # (Recipient, Type, Direction). + # + # Result, any of: + # + # - True to continue the request, False to STALL the endpoint. + # - Buffer interface object to provide a buffer to the host as part of the + # transfer, if possible. + return False + + def handle_interface_control_xfer(self, stage, request): + # Control transfer callback. Override to handle a device control + # transfer where bmRequestType Recipient is Interface, and the lower byte + # of wIndex indicates this interface. + # + # (See USB 2.0 specification 9.4 Standard Device Requests, p250). + # + # bmRequestType Type field may have different values. It's not necessary + # to handle the mandatory Standard requests (bmRequestType Type == + # utils.REQ_TYPE_STANDARD), if the driver returns False in these cases then + # TinyUSB will provide the necessary responses. + # + # See handle_device_control_xfer() for a description of the arguments and + # possible return values. + return False + + def handle_endpoint_control_xfer(self, stage, request): + # Control transfer callback. Override to handle a device + # control transfer where bmRequestType Recipient is Endpoint and + # the lower byte of wIndex indicates an endpoint address associated + # with this interface. + # + # bmRequestType Type will generally have any value except + # utils.REQ_TYPE_STANDARD, as Standard endpoint requests are handled by + # TinyUSB. The exception is the the Standard "Set Feature" request. This + # is handled by Tiny USB but also passed through to the driver in case it + # needs to change any internal state, but most drivers can ignore and + # return False in this case. + # + # (See USB 2.0 specification 9.4 Standard Device Requests, p250). + # + # See handle_device_control_xfer() for a description of the parameters and + # possible return values. + return False + + def submit_xfer(self, ep_addr, data, done_cb=None): + # Submit a USB transfer (of any type except control) + # + # Parameters: + # + # - ep_addr. Address of the endpoint to submit the transfer on. Caller is + # responsible for ensuring that ep_addr is correct and belongs to this + # interface. Only one transfer can be active at a time on each endpoint. + # + # - data. Buffer containing data to send, or for data to be read into + # (depending on endpoint direction). + # + # - done_cb. Optional callback function for when the transfer + # completes. The callback is called with arguments (ep_addr, result, + # xferred_bytes) where result is one of xfer_result_t enum (see top of + # this file), and xferred_bytes is an integer. + # + # Note that done_cb may be called immediately, possibly before this + # function has returned to the caller. + if not self._open: + raise RuntimeError + return get_usbdevice()._submit_xfer(ep_addr, data, done_cb) + + def set_ep_stall(self, ep_addr, stall): + # Set or clear endpoint STALL state, according to the bool "stall" parameter. + # + # Generally endpoint STALL is handled automatically by TinyUSB, but + # there are some device classes that need to explicitly stall or unstall + # an endpoint under certain conditions. + if not self._open or ep_addr not in get_usbdevice()._eps: + raise RuntimeError + get_usbdevice()._usbd.set_ep_stall(ep_addr, stall) + + def get_ep_stall(self, ep_addr): + # Get the current endpoint STALL state. + # + # Endpoint can be stalled/unstalled by host, TinyUSB stack, or calls to + # set_ep_stall(). + if not self._open or ep_addr not in get_usbdevice()._eps: + raise RuntimeError + return get_usbdevice()._usbd.get_ep_stall(ep_addr) diff --git a/micropython/usbd/hid.py b/micropython/usbd/hid.py new file mode 100644 index 000000000..a80f4613b --- /dev/null +++ b/micropython/usbd/hid.py @@ -0,0 +1,314 @@ +# MicroPython USB hid module +# MIT license; Copyright (c) 2023 Angus Gratton +from .device import ( + USBInterface, +) +from .utils import ( + endpoint_descriptor, + split_bmRequestType, + EP_IN_FLAG, + STAGE_SETUP, + STAGE_DATA, + REQ_TYPE_STANDARD, + REQ_TYPE_CLASS, +) +from micropython import const +import struct + +_DESC_HID_TYPE = const(0x21) +_DESC_REPORT_TYPE = const(0x22) +_DESC_PHYSICAL_TYPE = const(0x23) + +_INTERFACE_CLASS = const(0x03) +_INTERFACE_SUBCLASS_NONE = const(0x00) +_INTERFACE_SUBCLASS_BOOT = const(0x01) + +_INTERFACE_PROTOCOL_NONE = const(0x00) +_INTERFACE_PROTOCOL_KEYBOARD = const(0x01) +_INTERFACE_PROTOCOL_MOUSE = const(0x02) + +# bRequest values for HID control requests +_REQ_CONTROL_GET_REPORT = const(0x01) +_REQ_CONTROL_GET_IDLE = const(0x02) +_REQ_CONTROL_GET_PROTOCOL = const(0x03) +_REQ_CONTROL_GET_DESCRIPTOR = const(0x06) +_REQ_CONTROL_SET_REPORT = const(0x09) +_REQ_CONTROL_SET_IDLE = const(0x0A) +_REQ_CONTROL_SET_PROTOCOL = const(0x0B) + + +class HIDInterface(USBInterface): + # Abstract base class to implement a USB device HID interface in Python. + + def __init__( + self, + report_descriptor, + extra_descriptors=[], + set_report_buf=None, + protocol=_INTERFACE_PROTOCOL_NONE, + interface_str=None, + use_out_ep=False, + ): + # Construct a new HID interface. + # + # - report_descriptor is the only mandatory argument, which is the binary + # data consisting of the HID Report Descriptor. See Device Class + # Definition for Human Interface Devices (HID) v1.11 section 6.2.2 Report + # Descriptor, p23. + # + # - extra_descriptors is an optional argument holding additional HID + # descriptors, to append after the mandatory report descriptor. Most + # HID devices do not use these. + # + # - set_report_buf is an optional writable buffer object (i.e. + # bytearray), where SET_REPORT requests from the host can be + # written. Only necessary if the report_descriptor contains Output + # entries. If set, the size must be at least the size of the largest + # Output entry. + # + # - protocol can be set to a specific value as per HID v1.11 section 4.3 Protocols, p9. + # + # - interface_str is an optional string descriptor to associate with the HID USB interface. + # + # - use_out_ep needs to be set to True if you're using the OUT endpoint, e.g. to get + # keyboard LEDs + super().__init__(_INTERFACE_CLASS, _INTERFACE_SUBCLASS_NONE, protocol, interface_str) + self.extra_descriptors = extra_descriptors + self.report_descriptor = report_descriptor + self._set_report_buf = set_report_buf + self._int_ep = None # set during enumeration + self._out_ep = None + self.use_out_ep = use_out_ep + + def get_report(self): + return False + + def handle_set_report(self, report_data, report_id, report_type): + # Override this function in order to handle SET REPORT requests from the host, + # where it sends data to the HID device. + # + # This function will only be called if the Report descriptor contains at least one Output entry, + # and the set_report_buf argument is provided to the constructor. + # + # Return True to complete the control transfer normally, False to abort it. + return True + + def send_report(self, report_data): + # Helper function to send a HID report in the typical USB interrupt + # endpoint associated with a HID interface. return + self.submit_xfer(self._int_ep, report_data) + + def get_endpoint_descriptors(self, ep_addr, str_idx): + # Return the typical single USB interrupt endpoint descriptor associated + # with a HID interface. + # + # As per HID v1.11 section 7.1 Standard Requests, return the contents of + # the standard HID descriptor before the associated endpoint descriptor. + desc = self.get_hid_descriptor() + self._int_ep = ep_addr | EP_IN_FLAG + ep_addrs = [self._int_ep] + + desc += endpoint_descriptor(self._int_ep, "interrupt", 8, 8) + + if self.use_out_ep: + self._out_ep = (ep_addr + 1) & ~EP_IN_FLAG + desc += endpoint_descriptor(self._out_ep, "interrupt", 8, 8) + ep_addrs.append(self._out_ep) + + self.idle_rate = 0 + self.protocol = 0 + return (desc, [], ep_addrs) + + def get_hid_descriptor(self): + # Generate a full USB HID descriptor from the object's report descriptor + # and optional additional descriptors. + # + # See HID Specification Version 1.1, Section 6.2.1 HID Descriptor p22 + result = struct.pack( + "> 8 + if desc_type == _DESC_HID_TYPE: + return self.get_hid_descriptor() + if desc_type == _DESC_REPORT_TYPE: + return self.report_descriptor + elif req_type == REQ_TYPE_CLASS: + # HID Spec p50: 7.2 Class-Specific Requests + if bRequest == _REQ_CONTROL_GET_REPORT: + print("GET_REPORT?") + return False # Unsupported for now + if bRequest == _REQ_CONTROL_GET_IDLE: + return bytes([self.idle_rate]) + if bRequest == _REQ_CONTROL_GET_PROTOCOL: + return bytes([self.protocol]) + if bRequest == _REQ_CONTROL_SET_IDLE: + self.idle_rate = wValue >> 8 + return b"" + if bRequest == _REQ_CONTROL_SET_PROTOCOL: + self.protocol = wValue + return b"" + if bRequest == _REQ_CONTROL_SET_REPORT: + # Return the _set_report_buf to be filled with the + # report data + if not self._set_report_buf: + return False + elif wLength >= len(self._set_report_buf): + # Saves an allocation if the size is exactly right (or will be a short read) + return self._set_report_buf + else: + # Otherwise, need to wrap the buffer in a memoryview of the correct length + # + # TODO: check this is correct, maybe TinyUSB won't mind if we ask for more + # bytes than the host has offered us. + return memoryview(self._set_report_buf)[:wLength] + return False # Unsupported + + if stage == STAGE_DATA: + if req_type == REQ_TYPE_CLASS: + if bRequest == _REQ_CONTROL_SET_REPORT and self._set_report_buf: + report_id = wValue & 0xFF + report_type = wValue >> 8 + report_data = self._set_report_buf + if wLength < len(report_data): + # as above, need to truncate the buffer if we read less + # bytes than what was provided + report_data = memoryview(self._set_report_buf)[:wLength] + self.handle_set_report(report_data, report_id, report_type) + + return True # allow DATA/ACK stages to complete normally + + +# Basic 3-button mouse HID Report Descriptor. +# This is cribbed from Appendix E.10 of the HID v1.11 document. +_MOUSE_REPORT_DESC = bytes( + [ + 0x05, + 0x01, # Usage Page (Generic Desktop) + 0x09, + 0x02, # Usage (Mouse) + 0xA1, + 0x01, # Collection (Application) + 0x09, + 0x01, # Usage (Pointer) + 0xA1, + 0x00, # Collection (Physical) + 0x05, + 0x09, # Usage Page (Buttons) + 0x19, + 0x01, # Usage Minimum (01), + 0x29, + 0x03, # Usage Maximun (03), + 0x15, + 0x00, # Logical Minimum (0), + 0x25, + 0x01, # Logical Maximum (1), + 0x95, + 0x03, # Report Count (3), + 0x75, + 0x01, # Report Size (1), + 0x81, + 0x02, # Input (Data, Variable, Absolute), ;3 button bits + 0x95, + 0x01, # Report Count (1), + 0x75, + 0x05, # Report Size (5), + 0x81, + 0x01, # Input (Constant), ;5 bit padding + 0x05, + 0x01, # Usage Page (Generic Desktop), + 0x09, + 0x30, # Usage (X), + 0x09, + 0x31, # Usage (Y), + 0x15, + 0x81, # Logical Minimum (-127), + 0x25, + 0x7F, # Logical Maximum (127), + 0x75, + 0x08, # Report Size (8), + 0x95, + 0x02, # Report Count (2), + 0x81, + 0x06, # Input (Data, Variable, Relative), ;2 position bytes (X & Y) + 0xC0, # End Collection, + 0xC0, # End Collection + ] +) + + +class MouseInterface(HIDInterface): + # Very basic synchronous USB mouse HID interface + # TODO: This should be in a different package or an example + + def __init__(self): + super().__init__( + _MOUSE_REPORT_DESC, + protocol=_INTERFACE_PROTOCOL_MOUSE, + interface_str="MP Mouse!", + ) + self._l = False # Left button + self._m = False # Middle button + self._r = False # Right button + + def send_report(self, dx=0, dy=0): + b = 0 + if self._l: + b |= 1 << 0 + if self._r: + b |= 1 << 1 + if self._m: + b |= 1 << 2 + # Note: This allocates the bytes object 'report' each time a report is + # sent. + # + # However, at the moment the base class doesn't keep track of each + # transfer after it's submitted. So reusing a bytearray() creates a risk + # of a race condition if a new report transfer is submitted using the + # same buffer, before the previous one has completed. + report = struct.pack("Bbb", b, dx, dy) + + super().send_report(report) + + def click_left(self, down=True): + self._l = down + self.send_report() + + def click_middle(self, down=True): + self._m = down + self.send_report() + + def click_right(self, down=True): + self._r = down + self.send_report() + + def move_by(self, dx, dy): + # dx, dy are -127, 127 in range + self.send_report(dx, dy) diff --git a/micropython/usbd/hid_keypad.py b/micropython/usbd/hid_keypad.py new file mode 100644 index 000000000..9c5f4769a --- /dev/null +++ b/micropython/usbd/hid_keypad.py @@ -0,0 +1,96 @@ +# MicroPython USB keypad module +# MIT license; Copyright (c) 2023 Dave Wickham, Angus Gratton + +from .hid import HIDInterface +from micropython import const + +_INTERFACE_PROTOCOL_KEYBOARD = const(0x01) + +# See HID Usages and Descriptions 1.4, section 10 Keyboard/Keypad Page (0x07) +# +# This keypad example has a contiguous series of keys (KEYPAD_KEY_IDS) starting +# from the NumLock/Clear keypad key (0x53), but you can send any Key IDs from +# the table in the HID Usages specification. +_KEYPAD_KEY_OFFS = const(0x53) + +_KEYPAD_KEY_IDS = [ + "", + "/", + "*", + "-", + "+", + "", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + "0", + ".", +] + + +def _key_to_id(key): + # This is a little slower than making a dict for lookup, but uses + # less memory and O(n) can be fast enough when n is small. + return _KEYPAD_KEY_IDS.index(key) + _KEYPAD_KEY_OFFS + + +# fmt: off +_KEYPAD_REPORT_DESC = bytes( + [ + 0x05, 0x01, # Usage Page (Generic Desktop) + 0x09, 0x07, # Usage (Keypad) + 0xA1, 0x01, # Collection (Application) + 0x05, 0x07, # Usage Page (Keypad) + 0x19, 0x00, # Usage Minimum (0) + 0x29, 0xFF, # Usage Maximum (ff) + 0x15, 0x00, # Logical Minimum (0) + 0x25, 0xFF, # Logical Maximum (ff) + 0x95, 0x01, # Report Count (1), + 0x75, 0x08, # Report Size (8), + 0x81, 0x00, # Input (Data, Array, Absolute) + 0x05, 0x08, # Usage page (LEDs) + 0x19, 0x01, # Usage Minimum (1) + 0x29, 0x01, # Usage Maximum (1), + 0x95, 0x01, # Report Count (1), + 0x75, 0x01, # Report Size (1), + 0x91, 0x02, # Output (Data, Variable, Absolute) + 0x95, 0x01, # Report Count (1), + 0x75, 0x07, # Report Size (7), + 0x91, 0x01, # Output (Constant) - padding bits + 0xC0, # End Collection + ] +) +# fmt: on + + +class KeypadInterface(HIDInterface): + # Very basic synchronous USB keypad HID interface + + def __init__(self): + self.numlock = False + self.set_report_initialised = False + super().__init__( + _KEYPAD_REPORT_DESC, + set_report_buf=bytearray(1), + protocol=_INTERFACE_PROTOCOL_KEYBOARD, + interface_str="MicroPython Keypad", + ) + + def handle_set_report(self, report_data, _report_id, _report_type): + report = report_data[0] + b = bool(report & 1) + if b != self.numlock: + print("Numlock: ", b) + self.numlock = b + + def send_key(self, key=None): + if key is None: + self.send_report(b"\x00") + else: + self.send_report(_key_to_id(key).to_bytes(1, "big")) diff --git a/micropython/usbd/hidkeypad.py b/micropython/usbd/hidkeypad.py new file mode 100644 index 000000000..bd0319fe5 --- /dev/null +++ b/micropython/usbd/hidkeypad.py @@ -0,0 +1,93 @@ +# MicroPython USB keypad module +# MIT license; Copyright (c) 2023 Dave Wickham + +from .hid import HIDInterface +from .keycodes import KEYPAD_KEYS_TO_KEYCODES +from .utils import STAGE_SETUP, split_bmRequestType +from micropython import const +import micropython + +_INTERFACE_PROTOCOL_KEYBOARD = const(0x01) +_REQ_CONTROL_SET_REPORT = const(0x09) +_REQ_CONTROL_SET_IDLE = const(0x0A) + +# fmt: off +_KEYPAD_REPORT_DESC = bytes( + [ + 0x05, 0x01, # Usage Page (Generic Desktop) + 0x09, 0x07, # Usage (Keypad) + 0xA1, 0x01, # Collection (Application) + 0x05, 0x07, # Usage Page (Keypad) + 0x19, 0x00, # Usage Minimum (00), + 0x29, 0xFF, # Usage Maximum (ff), + 0x15, 0x00, # Logical Minimum (0), + 0x25, 0xFF, # Logical Maximum (ff), + 0x95, 0x01, # Report Count (1), + 0x75, 0x08, # Report Size (8), + 0x81, 0x00, # Input (Data, Array, Absolute) + 0x05, 0x08, # Usage page (LEDs) + 0x19, 0x01, # Usage minimum (1) + 0x29, 0x05, # Usage Maximum (5), + 0x95, 0x05, # Report Count (5), + 0x75, 0x01, # Report Size (1), + 0x91, 0x02, # Output (Data, Variable, Absolute) + 0x95, 0x01, # Report Count (1), + 0x75, 0x03, # Report Size (3), + 0x91, 0x01, # Output (Constant) + 0xC0, # End Collection + ] +) +# fmt: on + + +class KeypadInterface(HIDInterface): + # Very basic synchronous USB keypad HID interface + + def __init__(self): + self.numlock = None + self.capslock = None + self.scrolllock = None + self.compose = None + self.kana = None + self.set_report_initialised = False + super().__init__( + _KEYPAD_REPORT_DESC, + protocol=_INTERFACE_PROTOCOL_KEYBOARD, + interface_str="MicroPython Keypad!", + use_out_ep=True, + ) + + def handle_interface_control_xfer(self, stage, request): + if request[1] == _REQ_CONTROL_SET_IDLE and not self.set_report_initialised: + # Hacky initialisation goes here + self.set_report() + self.set_report_initialised = True + + if stage == STAGE_SETUP: + return super().handle_interface_control_xfer(stage, request) + + bmRequestType, bRequest, wValue, _, _ = request + recipient, req_type, _ = split_bmRequestType(bmRequestType) + + return True + + def set_report(self, args=None): + self.out_buffer = bytearray(1) + self.submit_xfer(self._out_ep, self.out_buffer, self.set_report_cb) + return True + + def set_report_cb(self, ep_addr, result, xferred_bytes): + buf_result = int(self.out_buffer[0]) + self.numlock = buf_result & 1 + self.capslock = (buf_result >> 1) & 1 + self.scrolllock = (buf_result >> 2) & 1 + self.compose = (buf_result >> 3) & 1 + self.kana = (buf_result >> 4) & 1 + + micropython.schedule(self.set_report, None) + + def send_report(self, key=None): + if key is None: + super().send_report(bytes(1)) + else: + super().send_report(KEYPAD_KEYS_TO_KEYCODES[key].to_bytes(1, "big")) diff --git a/micropython/usbd/keycodes.py b/micropython/usbd/keycodes.py new file mode 100644 index 000000000..63732fda3 --- /dev/null +++ b/micropython/usbd/keycodes.py @@ -0,0 +1,24 @@ +# Keypad keycodes for use with USB HID +# MIT license; Copyright (c) 2023 Dave Wickham +_KEYPAD_KEYS = [ + "", + "/", + "*", + "-", + "+", + "", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + "0", + ".", +] + +KEYPAD_KEYCODES_TO_KEYS = {k + 0x53: v for k, v in enumerate(_KEYPAD_KEYS)} +KEYPAD_KEYS_TO_KEYCODES = {v: k for k, v in KEYPAD_KEYCODES_TO_KEYS.items()} diff --git a/micropython/usbd/manifest.py b/micropython/usbd/manifest.py new file mode 100644 index 000000000..78b2c69fb --- /dev/null +++ b/micropython/usbd/manifest.py @@ -0,0 +1,15 @@ +metadata(version="0.1.0") + +# TODO: split this up into sub-packages, and some code in example subdirectory +package( + "usbd", + files=( + "__init__.py", + "device.py", + "hid.py", + "hid_keypad.py", + "midi.py", + "utils.py", + ), + base_path="..", +) diff --git a/micropython/usbd/midi.py b/micropython/usbd/midi.py new file mode 100644 index 000000000..4e2243716 --- /dev/null +++ b/micropython/usbd/midi.py @@ -0,0 +1,306 @@ +# MicroPython USB MIDI module +# MIT license; Copyright (c) 2023 Angus Gratton, Paul Hamshere +from micropython import const +import struct + +from .device import USBInterface +from .utils import endpoint_descriptor, EP_IN_FLAG + +_INTERFACE_CLASS_AUDIO = const(0x01) +_INTERFACE_SUBCLASS_AUDIO_CONTROL = const(0x01) +_INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING = const(0x03) +_PROTOCOL_NONE = const(0x00) + +_JACK_TYPE_EMBEDDED = const(0x01) +_JACK_TYPE_EXTERNAL = const(0x02) + + +class RingBuf: + def __init__(self, size): + self.data = bytearray(size) + self.size = size + self.index_put = 0 + self.index_get = 0 + + def put(self, value): + next_index = (self.index_put + 1) % self.size + # check for overflow + if self.index_get != next_index: + self.data[self.index_put] = value + self.index_put = next_index + return value + else: + return None + + def get(self): + if self.index_get == self.index_put: + return None # buffer empty + else: + value = self.data[self.index_get] + self.index_get = (self.index_get + 1) % self.size + return value + + def is_empty(self): + return self.index_get == self.index_put + + +class DummyAudioInterface(USBInterface): + # An Audio Class interface is mandatory for MIDI Interfaces as well, this + # class implements the minimum necessary for this. + def __init__(self): + super().__init__(_INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_CONTROL, _PROTOCOL_NONE) + + def get_itf_descriptor(self, num_eps, itf_idx, str_idx): + # Return the MIDI USB interface descriptors. + + # Get the parent interface class + desc, strs = super().get_itf_descriptor(num_eps, itf_idx, str_idx) + + # Append the class-specific AudioControl interface descriptor + desc += struct.pack( + "Device) + # * Data goes via an Embedded MIDI IN Jack ("into" the USB-MIDI device) + # * Data goes out via a virtual External MIDI OUT Jack ("out" of the + # USB-MIDI device and into the world). This "out" jack may be + # theoretical, and only exists in the USB descriptor. + # + # - For each tx (total _num_tx), we have data flowing from the USB MIDI + # device to the USB host: + # * Data comes in via a virtual External MIDI IN Jack (from the + # outside world, theoretically) + # * Data goes via an Embedded MIDI OUT Jack ("out" of the USB-MIDI + # device). + # * Data goes into the host via MIDI IN Endpoint (Device->Host) + + # rx side + for idx in range(self._num_rx): + emb_id = self._emb_id(False, idx) + ext_id = emb_id + 1 + pin = idx + 1 + jacks += jack_in_desc(_JACK_TYPE_EMBEDDED, emb_id) # bJackID) + jacks += jack_out_desc( + _JACK_TYPE_EXTERNAL, + ext_id, # bJackID + emb_id, # baSourceID(1) + pin, # baSourcePin(1) + ) + + # tx side + for idx in range(self._num_tx): + emb_id = self._emb_id(True, idx) + ext_id = emb_id + 1 + pin = idx + 1 + + jacks += jack_in_desc( + _JACK_TYPE_EXTERNAL, + ext_id, # bJackID + ) + jacks += jack_out_desc( + _JACK_TYPE_EMBEDDED, + emb_id, + ext_id, # baSourceID(1) + pin, # baSourcePin(1) + ) + + iface = desc + cs_ms_interface + jacks + return (iface, strs) + + def _emb_id(self, is_tx, idx): + # Given a direction (False==rx, True==tx) and a 0-index + # of the MIDI connection, return the embedded JackID value. + # + # Embedded JackIDs take odd numbers 1,3,5,etc with all + # 'RX' jack numbers first and then all 'TX' jack numbers + # (see long comment above for explanation of RX, TX in + # this context.) + # + # This is used to keep jack IDs in sync between + # get_itf_descriptor() and get_endpoint_descriptors() + return 1 + 2 * (idx + (is_tx * self._num_rx)) + + def get_endpoint_descriptors(self, ep_addr, str_idx): + # One MIDI endpoint in each direction, plus the + # associated CS descriptors + + # The following implementation is *very* memory inefficient + # and needs optimising + + self.ep_out = ep_addr + 1 + self.ep_in = ep_addr + 2 | EP_IN_FLAG + + # rx side, USB "in" endpoint and embedded MIDI IN Jacks + e_out = endpoint_descriptor(self.ep_in, "bulk", 64, 0) + cs_out = struct.pack( + "= 0x80 + + def __bytes__(self): + return ustruct.pack( + "CSW->CBW chain here + if self.stage is None: + self.prepare_cbw() + return retval + + return False + + def reset(self): + """Perform a Reset Revovery""" + self.log("reset()") + self.stage = type(self).MSC_STAGE_CMD + self.transferred_length = 0 + self.storage_device.reset() + self.set_ep_stall(self.ep_in, False) + self.set_ep_stall(self.ep_out, False) + self.prepare_cbw() + return True + + def prepare_for_csw(self, status=CSW.STATUS_PASSED): + """Set up the variables for a CSW""" + self.log("prepare_for_csw()") + self.csw.bCSWStatus = int(status) + self.stage = type(self).MSC_STAGE_STATUS + return True + + def handle_endpoint_control_xfer(self, stage, request): + # This isn't currently being invoked at all + self.log("handle_endpoint_control_xfer") + if stage != STAGE_SETUP: + self.log(f"Got {stage}, only dealing with setup") + return True + + bmRequestType, bRequest, wValue, wIndex, _ = request + recipient, req_type, _ = split_bmRequestType(bmRequestType) + + ep_addr = wIndex & 0xFFFF + + if self.stage == type(self).MSC_STAGE_NEED_RESET: + # TODO: stall endpoint? + self.log("Needs reset") + return True + + if ep_addr == self.ep_in and self.stage == type(self).MSC_STAGE_STATUS: + return self.send_csw() + + if ep_addr == self.ep_out and self.stage == type(self).MSC_STAGE_CMD: + self.log("Preparing CBW") + self.prepare_cbw() + + return True + + def prepare_cbw(self, args=None): + """Prepare to have an incoming CBW""" + self.log("prepare_cbw()") + try: + self.stage = type(self).MSC_STAGE_CMD + self.transferred_length = 0 + self.rx_data = bytearray(31) + self.log("About to submit xfer for CBW") + self.submit_xfer(self.ep_out, self.rx_data, self.receive_cbw_callback) + except Exception as exc: + self.log(str(exc)) + raise + + def receive_cbw_callback(self, ep_addr, result, xferred_bytes): + """Callback stub to schedule actual CBW processing""" + self.log("receive_cbw_callback") + micropython.schedule(self.proc_receive_cbw_callback, (ep_addr, result, xferred_bytes)) + + def proc_receive_cbw_callback(self, args): + """Invoke CBW processing""" + (ep_addr, result, xferred_bytes) = args + if self.stage == type(self).MSC_STAGE_CMD: + self.cbw.from_binary(self.rx_data) + return self.handle_cbw() + + def handle_cbw(self): + """Deal with an incoming CBW""" + self.log("handle_cbw") + self.csw.dCSWTag = self.cbw.dCBWTag + self.csw.dCSWDataResidue = 0 + self.csw.bCSWStatus = CSW.STATUS_PASSED + + try: + status = int(self.validate_cbw()) + except BadCbw as exc: + self.log(str(exc)) + self.set_ep_stall(self.ep_in, True) + self.set_ep_stall(self.ep_out, True) + return False + + if status != CSW.STATUS_PASSED: + self.log(f"Didn't pass: {status}") + self.prepare_for_csw(status=status) + return micropython.schedule(self.send_csw, None) + + self.stage = type(self).MSC_STAGE_DATA + + cmd = self.cbw.CBWCB[0 : self.cbw.bCBWCBLength] + + try: + response = self.storage_device.handle_cmd(cmd) + except StorageDevice.StorageError as exc: + self.log(f"Error: {exc}") + self.prepare_for_csw(status=exc.status) + return micropython.schedule(self.send_csw, None) + + if response is None: + self.log("None response") + self.prepare_for_csw() + return micropython.schedule(self.send_csw, None) + + if len(response) > self.cbw.dCBWDataTransferLength: + self.log("Wrong size") + self.prepare_for_csw(status=CSW.STATUS_FAILED) + return micropython.schedule(self.send_csw, None) + + if len(response) == 0: + self.log("Empty response") + self.prepare_for_csw() + return micropython.schedule(self.send_csw, None) + + try: + self.data = bytearray(response) + self.proc_transfer_data((self.ep_in, None, 0)) + except Exception as exc: + self.log(str(exc)) + + self.log("Exiting handle_cbw") + return True + + def transfer_data(self, ep_addr, result, xferred_bytes): + """Callback function for scheduling transferring data function""" + self.log("transfer_data") + micropython.schedule(self.proc_transfer_data, (ep_addr, result, xferred_bytes)) + + def proc_transfer_data(self, args): + """Actual handler for transferring non-CSW data""" + (ep_addr, result, xferred_bytes) = args + self.log("proc_transfer_data") + self.transferred_length += xferred_bytes + + if self.stage != type(self).MSC_STAGE_DATA: + self.log("Wrong stage") + return False + + if len(self.data) > xferred_bytes: + self.data = self.data[xferred_bytes:] + else: + self.data = bytearray() + + if not self.data and self.storage_device.long_operation: + self.data = self.storage_device.long_operation["operation"]() + + # The above call will have cleared this if it was the last bit of data to send + if not self.storage_device.long_operation: + # We don't have more data to fetch... + if not self.data: + # We've already sent our final actual data packet + self.log("We're done") + self.prepare_for_csw() + return micropython.schedule(self.send_csw, None) + + # This is the last data we're sending, pad it out + residue = self.cbw.dCBWDataTransferLength - (self.transferred_length + len(self.data)) + if residue: + self.log(f"Adding {residue} bytes of padding for residue") + self.csw.dCSWDataResidue = residue + self.data.extend("\0" * residue) + + self.log(f"Preparing to submit data transfer, {len(self.data)} bytes") + self.submit_xfer(ep_addr, self.data, self.transfer_data) + + def validate_cbw(self) -> bool: + """Perform Valid and Meaningful checks on a CBW""" + self.log("validate_cbw") + # Valid checks (6.2.1) + if self.stage != type(self).MSC_STAGE_CMD: + self.log("Wrong stage") + return CSW.STATUS_PHASE_ERROR + + if len(self.rx_data) != 31: + raise BadCbw("Invalid: Wrong CBW length") + + if self.cbw.dCBWSignature != type(self).CBW_SIGNATURE: + raise BadCbw(f"Invalid: Wrong sig: {str(self.cbw.dCBWSignature)}") + + # Meaningful checks (6.2.2) + if self.cbw.bCBWLUN > 15 or not 0 < self.cbw.bCBWCBLength < 17: + raise BadCbw("Not meaningful: Wrong length command or invalid LUN") + + if self.cbw.bCBWLUN != self.lun: + raise BadCbw("Not meaningful: Wrong LUN") + + # Check if this is a valid SCSI command + try: + # The storage layer doesn't know about USB, it'll return True for valid and False for invalid + return not self.storage_device.validate_cmd(self.cbw.CBWCB[0 : self.cbw.bCBWCBLength]) + except Exception as exc: + self.log(str(exc)) + raise + + def padding_sent(self, ep_addr, result, xferred_bytes): + """Reschedule send_csw having sent some padding""" + micropython.schedule(self.send_csw, None) + + def send_csw(self, args): + """Send a CSW to the host""" + self.log("send_csw") + if self.stage == type(self).MSC_STAGE_STATUS_SENT: + self.log("Wrong status here") + + if self.csw.dCSWDataResidue == 0: + self.csw.dCSWDataResidue = int(self.cbw.dCBWDataTransferLength) - int( + self.transferred_length + ) + + # If the host sent a command that was expecting more than just a CSW, we may have to send them some nothing in the absence of being able to STALL + if self.transferred_length == 0 and self.csw.dCSWDataResidue != 0: + self.log(f"Sending {self.csw.dCSWDataResidue} bytes of nothing to pad it out") + self.transferred_length = self.csw.dCSWDataResidue + self.submit_xfer(self.ep_in, bytearray(self.csw.dCSWDataResidue), self.padding_sent) + # The flow from sending the CSW happens in the callback, not in whatever called us, so we can just return and re-call from the padding callback + return + + self.log( + f"Sending CSW for {hex(self.csw.dCSWTag)}, data residue {self.csw.dCSWDataResidue}, status {self.csw.bCSWStatus}" + ) + + self.stage = type(self).MSC_STAGE_STATUS_SENT + + self.submit_xfer(self.ep_in, self.csw.__bytes__(), self.send_csw_callback) + return True + + def send_csw_callback(self, ep_addr, result, xferred_bytes): + """Schedule the preparation for the next CBW on having sent a CSW""" + self.log("send_csw_callback") + micropython.schedule(self.prepare_cbw, None) + + +class StorageDevice: + """Storage Device - holds the SCSI parts + + Properties: + filesystem -- a bytes-like thing representing the data this device is handling. If set to None, then the + object will behave as if there is no medium inserted. This can be changed at runtime. + block_size -- what size the blocks are for SCSI commands. This should probably be left as-is, at 512. If + the device provides its own block size, that will be used instead + """ + + class StorageError(OSError): + def __init__(self, message, status): + super().__init__(message) + self.status = status + + NO_SENSE = const(0x00) + MEDIUM_NOT_PRESENT = const(0x01) + INVALID_COMMAND = const(0x02) + + def __init__(self, filesystem): + """Create a StorageDevice object + + filesystem -- either None or a bytes-like object to represent the filesystem being presented + """ + self.filesystem = filesystem + self.block_size = 512 + self.sense = None + self.additional_sense_code = None + self.long_operation = {} + + # A dict of SCSI commands and their handlers; the key is the opcode for the command + self.scsi_commands = { + 0x00: {"name": "TEST_UNIT_READY", "handler": self.handle_test_unit_ready}, + 0x03: {"name": "REQUEST_SENSE", "handler": self.handle_request_sense}, + 0x12: {"name": "INQUIRY", "handler": self.handle_inquiry}, + 0x15: {"name": "MODE_SELECT_6"}, + 0x1A: {"name": "MODE_SENSE_6", "handler": self.handle_mode_sense6}, + 0x1B: {"name": "START_STOP_UNIT"}, + 0x1E: {"name": "PREVENT_ALLOW_MEDIUM_REMOVAL"}, + 0x25: {"name": "READ_CAPACITY_10", "handler": self.handle_read_capacity_10}, + 0x23: { + "name": "READ_FORMAT_CAPCITY", + "handler": self.handle_read_format_capacity, + }, + 0x28: {"name": "READ_10", "handler": self.handle_read10}, + 0x2A: {"name": "WRITE_10"}, + 0x5A: {"name": "MODE_SENSE_10", "handler": self.handle_mode_sense10}, + } + + # KCQ values for different sense states + self.sense_values = { + # Key, code, qualifier + type(self).NO_SENSE: [0x00, 0x00, 0x00], + type(self).MEDIUM_NOT_PRESENT: [0x02, 0x3A, 0x00], + type(self).INVALID_COMMAND: [0x05, 0x20, 0x00], + } + + def reset(self): + self.sense_key = None + + def validate_cmd(self, cmd): + """Ensure that this is a command we can handle""" + if cmd[0] not in self.scsi_commands: + # We don't know about the command at all + self.sense = type(self).INVALID_COMMAND + return False + + if "handler" not in self.scsi_commands[cmd[0]]: + # We do know about the command, but not what to do with it + self.sense = type(self).INVALID_COMMAND + return False + + if self.scsi_commands[cmd[0]]["name"] != "REQUEST_SENSE": + self.sense = type(self).NO_SENSE + + # Windows seems to possibly send oversized CBDs by these rules in some circumstances? + return True + + # 0x00 to 0x1F should have 6-byte CBDs + if cmd[0] < 0x20: + return len(cmd) == 6 + + # 0x20 to 0x5F should have 10-byte CBDs + if cmd[0] < 0x60: + return len(cmd) == 10 + + # Other lengths exist, but aren't supported by us + + def fail_scsi(self, status): + """If we need to report a failure""" + raise StorageDevice.StorageError("Failing SCSI", CSW.STATUS_FAILED) + + def handle_cmd(self, cmd): + try: + return self.scsi_commands[cmd[0]]["handler"](cmd) + except Exception as exc: + raise StorageDevice.StorageError( + f"Error handling command {self.scsi_commands[cmd[0]]['name']}: {str(exc)}", + CSW.STATUS_FAILED, + ) + + # Below here are the SCSI command handlers + + def handle_mode_sense6(self, cmd): + return ustruct.pack( + ">BBBB", + 3, # Data length + 0x00, # What medium? + 0x80, # Write protected + 0x00, # Nope + ) + + def handle_mode_sense10(self, cmd): + return ustruct.pack( + ">HBBBBH", + 6, # Data length + 0x00, # What medium? + 0x80, # Write protected + 0x00, # Nope + 0x00, + 0x00, + ) + + def handle_test_unit_ready(self, cmd): + if self.filesystem is not None: + self.sense = type(self).NO_SENSE + return None + + self.sense = type(self).MEDIUM_NOT_PRESENT + raise StorageDevice.StorageError("No filesystem", status=CSW.STATUS_FAILED) + + def handle_read_capacity_10(self, cmd): + if self.filesystem is None: + self.sense = type(self).MEDIUM_NOT_PRESENT + raise StorageDevice.StorageError("No filesystem", status=CSW.STATUS_FAILED) + + # Do we have an AbstractBlockDev? + if getattr(self.filesystem, "ioctl", False): + max_lba = self.filesystem.ioctl(4, None) - 1 + block_size = self.filesystem.ioctl(5, None) or 512 + else: + max_lba = int(len(bytes(self.filesystem)) / self.block_size) - 1 + block_size = self.block_size + + return ustruct.pack(">LL", max_lba, block_size) + + def handle_read_format_capacity(self, cmd): + block_num = 0 + list_length = 8 + descriptor_type = 3 # 3 = no media present + block_size = self.block_size + if self.filesystem is not None: + descriptor_type = 2 # 2 = formatted media + # Do we have an AbstractBlockDev? + if getattr(self.filesystem, "ioctl", False): + block_num = self.filesystem.ioctl(4, None) + block_size = self.filesystem.ioctl(5, None) or 512 + else: + block_num = int(len(bytes(self.filesystem)) / self.block_size) + + return ustruct.pack( + ">BBBBLBBH", + 0x00, # Reserved + 0x00, # Reserved + 0x00, # Reserved + list_length, + block_num, + descriptor_type, + 0x00, # Reserved + block_size, + ) + + def handle_read10(self, cmd=None): + if cmd is None: + if not self.long_operation: + raise StorageDevice.StorageError( + "handle_read10 called with no cmd, but we are not in an existing command" + ) + + length = self.long_operation["remaining_length"] + lba = self.long_operation["current_lba"] + else: + (read10, flags, lba, group, length, control) = ustruct.unpack(">BBLBHB", cmd) + + # Do we have an AbstractBlockDev? + if getattr(self.filesystem, "readblocks", False): + gc.collect() + # Will we be able to comfortably fit this in RAM? + block_size = self.filesystem.ioctl(5, None) or 512 + max_size = int((gc.mem_free() / block_size) / 10) or 1 + if length > max_size: + self.long_operation["remaining_length"] = length - max_size + length = max_size + self.long_operation["current_lba"] = lba + max_size + self.long_operation["operation"] = self.handle_read10 + else: + self.long_operation = {} + + read_data = bytearray(length * block_size) + self.filesystem.readblocks(lba, read_data) + return read_data + + return self.filesystem[ + lba * self.block_size : lba * self.block_size + length * self.block_size + ] + + def handle_request_sense(self, cmd): + return ustruct.pack( + ">BBBLBLBBB3s", + 0x70, # Response code (+invalid INFORMATION) + 0, # Obsolete + self.sense_values[self.sense][0], # Sense key + 0, # Information + 9, # Additional sense length + 0, # Command specific information + self.sense_values[self.sense][1], # Additional sense code + self.sense_values[self.sense][2], # Additional sense code qualifier + 0, + ) + + def handle_inquiry(self, cmd): + (_, evpd, page_code, allocation_length, control) = ustruct.unpack(">BBBBB", cmd) + if evpd == 0: + return ustruct.pack( + ">BBBBBBBB8s16s4s", + 0x00, # SBC-4 device type, Windows may not like RBC? + # 0x0E, # RBC device type + 0x80, # set the top-most bit to say it's removable + 0x00, # Definitely not claiming to conform to any SCSI standard + 0x02, # Response data format of 2, other bits set to 0 + 32, # Extra length + 0x00, # Don't support any of this + 0x00, # Likewise + 0x00, # And again + "MPython", # Vendor + "MicroPython MSC", # Procut + "0000", # Revision level + ) + + if page_code == 0x80: + return ustruct.pack( + ">BBBB10s", + 0x00, # SBC-4 device type, Windows may not like RBC? + 0x80, # Page code + 0x00, # Reserved + 0x0A, # Randomly choose ten characters for a serial + "\0", + ) + + self.sense = type(self).INVALID_COMMAND + raise StorageDevice.StorageError("EVPD not implemented", status=CSW.STATUS_FAILED) diff --git a/micropython/usbd/utils.py b/micropython/usbd/utils.py new file mode 100644 index 000000000..017019575 --- /dev/null +++ b/micropython/usbd/utils.py @@ -0,0 +1,77 @@ +# MicroPython USB utility functions +# MIT license; Copyright (c) 2023 Angus Gratton +# +# Some constants and stateless utility functions for working with USB descriptors and requests. +from micropython import const +import ustruct + +# Shared constants +# +# It's a tough decision of when to make a constant "shared" like this. "Private" constants have no resource use, but these will take up flash space for the name. Suggest deciding on basis of: +# +# - Is this constant used in a lot of places, including potentially by users +# of this package? +# +# Otherwise, it's not the greatest sin to be copy-pasting "private" constants +# in a couple of places. I guess. :/ + +EP_IN_FLAG = const(1 << 7) + +# Control transfer stages +STAGE_IDLE = const(0) +STAGE_SETUP = const(1) +STAGE_DATA = const(2) +STAGE_ACK = const(3) + +# Request types +REQ_TYPE_STANDARD = const(0x0) +REQ_TYPE_CLASS = const(0x1) +REQ_TYPE_VENDOR = const(0x2) +REQ_TYPE_RESERVED = const(0x3) + +# TinyUSB xfer_result_t enum +RESULT_SUCCESS = const(0) +RESULT_FAILED = const(1) +RESULT_STALLED = const(2) +RESULT_TIMEOUT = const(3) +RESULT_INVALID = const(4) + + +# Non-shared constants, used in this function only +_STD_DESC_ENDPOINT_LEN = const(7) +_STD_DESC_ENDPOINT_TYPE = const(0x5) + + +def endpoint_descriptor(bEndpointAddress, bmAttributes, wMaxPacketSize, bInterval=1): + # Utility function to generate a standard Endpoint descriptor bytes object, with + # the properties specified in the parameter list. + # + # See USB 2.0 specification section 9.6.6 Endpoint p269 + # + # As well as a numeric value, bmAttributes can be a string value to represent + # common endpoint types: "control", "bulk", "interrupt". + bmAttributes = {"control": 0, "bulk": 2, "interrupt": 3}.get(bmAttributes, bmAttributes) + return ustruct.pack( + "> 5) & 0x03, + (bmRequestType >> 7) & 0x01, + )