|
129 | 129 | WL_AP_FAILED = const(9)
|
130 | 130 | # pylint: enable=bad-whitespace
|
131 | 131 |
|
132 |
| -class ESP_I2Ccontrol: # pylint: disable=too-many-public-methods |
| 132 | +class Protocol: |
| 133 | + params = None |
| 134 | + |
| 135 | + # Protocols should override the init for any additional init they need to do. |
| 136 | + def __init__(self, dict_of_params, ready_pin=None, reset_pin=None, gpio0_pin=None, *, debug=False): |
| 137 | + self._params = dict_of_params |
| 138 | + self._ready = ready_pin |
| 139 | + self._reset = reset_pin |
| 140 | + self._gpio0 = gpio0_pin |
| 141 | + self._debug = debug |
| 142 | + self._setup_pins() |
| 143 | + # FIXME |
| 144 | + # There should be a reset of the ESP32 after the pins are set up. |
| 145 | + # self.reset() |
| 146 | + |
| 147 | + # Protocols should override this to set up any additional |
| 148 | + # pins from the *pins* list. |
| 149 | + def _setup_pins(self): |
| 150 | + if self._gpio0: |
| 151 | + self._gpio0.direction = Direction.INPUT |
| 152 | + if self._ready: |
| 153 | + self._ready.direction = Direction.INPUT |
| 154 | + if self._reset: |
| 155 | + self._reset.direction = Direction.OUTPUT |
| 156 | + |
| 157 | + # Every protocol must implement a way to send a buffer. |
| 158 | + def _send_buffer(self, buffer, start, end): |
| 159 | + pass |
| 160 | + |
| 161 | +class SPI(Protocol): |
| 162 | + def _setup_pins(self): |
| 163 | + if self._debug > 3: |
| 164 | + print("SPI Calling super._setup_pins()") |
| 165 | + self._cs = self._params['CS'] |
| 166 | + self._ready = proto._ready |
| 167 | + self._reset = proto._reset |
| 168 | + self._gpio0 = proto._gpio |
| 169 | + self._cs.direction = Direction.OUTPUT |
| 170 | + self._ready.direction = Direction.INPUT |
| 171 | + self._spi_device = SPIDevice(spi, cs_pin, baudrate=8000000) |
| 172 | + def send_buffer(self, buffer, start, end): |
| 173 | + spi.write(buffer, start=start, end=end) # pylint: disable=no-member |
| 174 | + |
| 175 | +class I2C(Protocol): |
| 176 | + def _setup_pins(self): |
| 177 | + if self._debug > 3: |
| 178 | + print("I2C Calling super._setup_pins()") |
| 179 | + super()._setup_pins() |
| 180 | + self._addr = self._params['address'] |
| 181 | + self._scl = self._params['SCL'] |
| 182 | + self._sda = self._params['SDA'] |
| 183 | + self._i2c = busio.I2C(self._scl, self._sda) |
| 184 | + self._device = I2CDevice(self._i2c, self._addr) |
| 185 | + |
| 186 | + def send_buffer(self, buffer, start, end): |
| 187 | + with self._device: |
| 188 | + self._device.write(buffer, start=start, end=end, stop=True) |
| 189 | + |
| 190 | + |
| 191 | +class ESP_Control: # pylint: disable=too-many-public-methods |
133 | 192 | """A class that will talk to an ESP32 module programmed with special firmware
|
134 | 193 | that lets it act as a fast an efficient WiFi co-processor"""
|
135 | 194 | TCP_MODE = const(0)
|
136 | 195 | UDP_MODE = const(1)
|
137 | 196 | TLS_MODE = const(2)
|
138 | 197 |
|
139 | 198 | # pylint: disable=too-many-arguments
|
140 |
| - def __init__(self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin=None, *, debug=False): |
| 199 | + def __init__(self, proto): |
| 200 | + self._buffer = bytearray(10) |
| 201 | + self._pbuf = bytearray(1) # buffer for param read |
| 202 | + self._sendbuf = bytearray(256) # buffer for command sending |
| 203 | + self._socknum_ll = [[0]] # pre-made list of list of socket # |
141 | 204 |
|
142 |
| - # FIXME mcj Hack |
143 |
| - self._addr = 0x2A |
144 |
| - self._i2c = busio.I2C(board.SCL, board.SDA) |
145 |
| - self._device = I2CDevice(self._i2c, self._addr) |
146 |
| - |
147 |
| - self._debug = debug |
148 |
| - self._buffer = bytearray(10) |
149 |
| - self._pbuf = bytearray(1) # buffer for param read |
150 |
| - self._sendbuf = bytearray(256) # buffer for command sending |
151 |
| - self._socknum_ll = [[0]] # pre-made list of list of socket # |
| 205 | + # FIXME: These should not be copied out of the proto object, |
| 206 | + # really. However, it saves modifying more code. |
| 207 | + self._proto = proto |
| 208 | + self._debug = proto._debug |
| 209 | + self._ready = proto._ready |
| 210 | + self._reset = proto._reset |
| 211 | + self._gpio0 = proto._gpio0 |
152 | 212 |
|
153 |
| - self._spi_device = SPIDevice(spi, cs_pin, baudrate=8000000) |
154 |
| - self._cs = cs_pin |
155 |
| - self._ready = ready_pin |
156 |
| - self._reset = reset_pin |
157 |
| - self._gpio0 = gpio0_pin |
158 |
| - #self._cs.direction = Direction.OUTPUT |
159 |
| - #self._ready.direction = Direction.INPUT |
160 |
| - #self._reset.direction = Direction.OUTPUT |
161 |
| - #if self._gpio0: |
162 |
| - # self._gpio0.direction = Direction.INPUT |
163 |
| - # self.reset() |
164 | 213 | # pylint: enable=too-many-arguments
|
165 | 214 |
|
166 | 215 | def reset(self):
|
@@ -235,18 +284,19 @@ def _send_command(self, cmd, params=None, *, param_len_16=False):
|
235 | 284 | # Add the wait_for_ready back in.
|
236 | 285 | # self._wait_for_ready()
|
237 | 286 | # with self._spi_device as spi:
|
238 |
| - with self._device: |
239 |
| - times = time.monotonic() |
240 |
| - while (time.monotonic() - times) < 1: # wait up to 1000ms |
241 |
| - if 1: # FIXME self._ready.value: # ok ready to send! |
242 |
| - break |
243 |
| - else: |
244 |
| - raise RuntimeError("ESP32 timed out on SPI select") |
245 |
| - # spi.write(self._sendbuf, start=0, end=packet_len) # pylint: disable=no-member |
246 |
| - self._device.write(self._sendbuf, start = 0, end = packet_len, stop = True) |
| 287 | + # with self._device: |
| 288 | + times = time.monotonic() |
| 289 | + while (time.monotonic() - times) < 1: # wait up to 1000ms |
| 290 | + if 1: # FIXME self._ready.value: # ok ready to send! |
| 291 | + break |
| 292 | + else: |
| 293 | + raise RuntimeError("ESP32 timed out on SPI select") |
| 294 | + # spi.write(self._sendbuf, start=0, end=packet_len) # pylint: disable=no-member |
| 295 | + # self._device.write(self._sendbuf, start = 0, end = packet_len, stop = True) |
| 296 | + self._proto.send_buffer(self._sendbuf, start=0, end=packet_len) |
247 | 297 |
|
248 |
| - if self._debug >= 3: |
249 |
| - print("Wrote: ", [hex(b) for b in self._sendbuf[0:packet_len]]) |
| 298 | + if self._debug >= 3: |
| 299 | + print("Wrote: ", [hex(b) for b in self._sendbuf[0:packet_len]]) |
250 | 300 | # pylint: disable=too-many-branches
|
251 | 301 |
|
252 | 302 | def _read_byte(self, spi):
|
|
0 commit comments