Skip to content

Commit a7717f2

Browse files
committed
lib/opta: Add support for running on CPython.
Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
1 parent 8312406 commit a7717f2

File tree

2 files changed

+70
-15
lines changed

2 files changed

+70
-15
lines changed

lib/opta/example.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
level=logging.INFO # Switch to DEBUG to see raw commands
1515
)
1616

17+
# On CPython, something like the following should be used:
18+
# opta = opta.Opta(bus_id=3, pin_id=("/dev/gpiochip1", 5))
1719
opta = opta.Opta(bus_id=3)
1820

1921
# enum_devices initializes the bus, resets all expansions, and returns a list of

lib/opta/opta.py

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,22 @@
55
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
66
import struct
77
import logging
8-
from time import sleep_ms
9-
from machine import I2C
10-
from machine import Pin
11-
from micropython import const
8+
from time import sleep
9+
import sys
10+
11+
_is_micropython = sys.implementation.name == "micropython"
12+
13+
if _is_micropython:
14+
from machine import Pin
15+
from machine import I2C
16+
from micropython import const
17+
else:
18+
import gpiod
19+
from gpiod.line import Direction
20+
from gpiod.line import Value
21+
from smbus2 import SMBus, i2c_msg
22+
def const(x): return x
23+
1224

1325
_MIN_ADDRESS = const(0x0B)
1426
_TMP_ADDRESS = const(0x0A)
@@ -70,6 +82,47 @@
7082
_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din"))
7183

7284

85+
class IOPin:
86+
def __init__(self, pin_id):
87+
if _is_micropython:
88+
self.pin = Pin(pin_id, Pin.IN, Pin.PULL_UP)
89+
else:
90+
self.pin = gpiod.request_lines(
91+
pin_id[0],
92+
consumer="Opta",
93+
config={pin_id[1]: gpiod.LineSettings(direction=Direction.INPUT)},
94+
)
95+
96+
def read(self):
97+
if _is_micropython:
98+
return self.pin.value()
99+
else:
100+
return self.pin.get_values()[0] == Value.ACTIVE
101+
102+
103+
class I2CBus:
104+
def __init__(self, bus_id, freq):
105+
if _is_micropython:
106+
self.bus = I2C(bus_id, freq=freq)
107+
else:
108+
self.bus = SMBus(bus_id)
109+
110+
def read(self, addr, buf):
111+
if _is_micropython:
112+
self.bus.readfrom_into(addr, buf)
113+
else:
114+
msg = i2c_msg.read(addr, len(buf))
115+
self.bus.i2c_rdwr(msg)
116+
buf[:] = msg.buf[0:len(buf)]
117+
118+
def write(self, addr, buf):
119+
if _is_micropython:
120+
self.bus.writeto(addr, buf)
121+
else:
122+
msg = i2c_msg.write(addr, buf)
123+
self.bus.i2c_rdwr(msg)
124+
125+
73126
class Expansion:
74127
def __init__(self, opta, type, addr, name):
75128
self.opta = opta
@@ -274,7 +327,7 @@ def get_bool(k, d):
274327
if "default_value" in kwargs:
275328
value = kwargs["default_value"]
276329
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC_DEF, "<BHB", channel, value, 1)
277-
sleep_ms(250) # DAC requires at leas 250ms to update after a write.
330+
sleep(0.250) # DAC requires at leas 250ms to update after a write.
278331
elif channel_type == "din":
279332
deb_mode = kwargs.get("debounce_mode", "simple")
280333
self.opta._cmd(
@@ -309,18 +362,18 @@ def get_bool(k, d):
309362

310363

311364
class Opta:
312-
def __init__(self, bus_id, freq=400_000, det=None):
365+
def __init__(self, bus_id, freq=400_000, pin_id="BUS_DETECT"):
313366
"""
314367
Initializes an Opta controller.
315368
316369
Parameters:
317370
- bus_id : The I2C bus identifier.
318371
- freq : I2C bus frequency (default=400_000).
319-
- det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
372+
- pin_id : GPIO pin ID used for bus detection (default="BUS_DETECT").
320373
"""
321-
self.bus = I2C(bus_id, freq=freq)
374+
self.pin = IOPin(pin_id)
375+
self.bus = I2CBus(bus_id, freq)
322376
self.cmd_buf = memoryview(bytearray(256 + 2))
323-
self.det = Pin("BUS_DETECT", Pin.IN, Pin.PULL_UP) if det is None else det
324377
self.exp_types = {
325378
0x02: ("digital", "Opta Digital Mechanical"),
326379
0x03: ("digital", "Opta Digital Solid State"),
@@ -336,14 +389,14 @@ def _log_enabled(self, level):
336389
return logging.getLogger().isEnabledFor(level)
337390

338391
def _bus_read(self, addr, buf):
339-
self.bus.readfrom_into(addr, buf)
392+
self.bus.read(addr, buf)
340393
if self._log_enabled(logging.DEBUG):
341394
self._log_debug("Recv: " + " ".join(["%02X" % (a) for a in buf]))
342395

343396
def _bus_write(self, addr, buf):
344397
if self._log_enabled(logging.DEBUG):
345398
self._log_debug("Send: " + " ".join(["%02X" % (a) for a in buf]))
346-
self.bus.writeto(addr, buf)
399+
self.bus.write(addr, buf)
347400

348401
def _crc8(self, buf, poly=0x07, crc=0x00):
349402
for byte in buf:
@@ -382,7 +435,7 @@ def _cmd(self, addr, cmd, fmt=None, *args):
382435

383436
def _reset_bus(self, addr):
384437
self._cmd(addr, _CMD_CHIP_RESET, "B", 0x56)
385-
sleep_ms(2000)
438+
sleep(2)
386439

387440
def _set_address(self, addr, addr_new=None):
388441
if addr_new is not None:
@@ -413,9 +466,9 @@ def enum_devices(self):
413466

414467
addr = _MAX_ADDRESS
415468
# Assign temp I2C addresses to expansions.
416-
while not self.det.value():
469+
while not self.pin.read():
417470
self._set_address(0x0A, addr_new=addr)
418-
sleep_ms(100)
471+
sleep(0.100)
419472
try:
420473
xaddr, xtype = self._set_address(addr)
421474
if xaddr == addr:
@@ -428,7 +481,7 @@ def enum_devices(self):
428481
# Assign final I2C addresses to expansions.
429482
for addr_new in range(_MIN_ADDRESS, _MIN_ADDRESS + addr - _MAX_ADDRESS):
430483
self._set_address(addr - 1, addr_new)
431-
sleep_ms(100)
484+
sleep(0.100)
432485
try:
433486
xaddr, xtype = self._set_address(addr_new)
434487
addr -= 1

0 commit comments

Comments
 (0)