summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Hogg <me@jonathanhogg.com>2022-09-10 16:47:19 +0100
committerDamien George <damien@micropython.org>2025-08-01 23:45:18 +1000
commit327655905e9f523070301f2f35459197d46db4fb (patch)
tree414e2968af1dd7ce62828f7ddc4401cad6f6fecb
parente54553c496bf915ed0263c2d2a61d31d61032dea (diff)
esp32/modules/machine.py: Add Counter and Encoder classes.
Adds a Python override of the `machine` module, which delegates to the built-in module and adds an implementation of `Counter` and `Encoder`, based on the `esp32.PCNT` class. Original implementation by: Jonathan Hogg <me@jonathanhogg.com> Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
-rw-r--r--ports/esp32/modules/machine.py192
1 files changed, 192 insertions, 0 deletions
diff --git a/ports/esp32/modules/machine.py b/ports/esp32/modules/machine.py
new file mode 100644
index 000000000..9cfda12f1
--- /dev/null
+++ b/ports/esp32/modules/machine.py
@@ -0,0 +1,192 @@
+import sys
+
+_path = sys.path
+sys.path = ()
+try:
+ import machine as _machine
+finally:
+ sys.path = _path
+ del _path
+ del sys
+
+
+from micropython import const
+import esp32
+
+if hasattr(esp32, "PCNT"):
+ _PCNT_RANGE = const(32000)
+
+ class _CounterBase:
+ _PCNT = esp32.PCNT
+ # Singletons, keyed by PCNT unit_id (shared by both Counter & Encoder).
+ _INSTANCES = {}
+
+ # Use __new__ to implement a singleton rather than a factory function,
+ # because we need to be able to provide class attributes, e.g.
+ # Counter.RISING, which is not possible if Counter was a function
+ # (functions cannot have attributes in MicroPython).
+ def __new__(cls, unit_id, *_args, **_kwargs):
+ # Find an existing instance for this PCNT unit id.
+ self = cls._INSTANCES.get(unit_id)
+
+ if self:
+ # Verify that this PCNT is being used for the same type
+ # (Encoder or Counter).
+ if not isinstance(self, cls):
+ raise ValueError("PCNT in use")
+ else:
+ # Previously unused PCNT unit.
+ self = object.__new__(cls)
+ cls._INSTANCES[unit_id] = self
+
+ # __init__ will now be called with the same args.
+ return self
+
+ def __init__(self, unit_id, *args, filter_ns=0, **kwargs):
+ self._unit_id = unit_id
+
+ if not hasattr(self, "_pcnt"):
+ # New instance, or previously deinit-ed.
+ self._pcnt = self._PCNT(unit_id, min=-_PCNT_RANGE, max=_PCNT_RANGE)
+ elif not (args or kwargs):
+ # Existing instance, and no args, so accessing the existing
+ # singleton without reconfiguring. Note: This means that
+ # Counter/Encoder cannot be partially re-initalised. Either
+ # you get the existing instance as-is (by passing no arguments
+ # other than the id), or you must pass all the necessary
+ # arguments to additionally re-configure it.
+ return
+
+ # Counter- or Encoder-specific configuration of self._pcnt.
+ self._configure(*args, **kwargs)
+
+ # Common unit configuration.
+ self._pcnt.init(
+ filter=min(max(0, filter_ns * 80 // 1000), 1023),
+ value=0,
+ )
+
+ # Note: We track number-of-overflows rather than the actual count in
+ # order to avoid the IRQ handler overflowing MicroPython's "small int"
+ # range. This gives an effective range of 2**30 overflows. User code
+ # should use counter.value(0) to reset the overflow count.
+ # The ESP32 PCNT resets to zero on under/overflow (i.e. it does not wrap
+ # around to the opposite limit), so each overflow corresponds to exactly
+ # _PCNT_RANGE counts.
+
+ # Reset counter state.
+ self._overflows = 0
+ self._offset = 0
+
+ # Install IRQ handler to handle under/overflow.
+ self._pcnt.irq(self._overflow, self._PCNT.IRQ_MIN | self._PCNT.IRQ_MAX)
+
+ # Start counting.
+ self._pcnt.start()
+
+ # Handle counter under/overflow.
+ def _overflow(self, pcnt):
+ mask = pcnt.irq().flags()
+ if mask & self._PCNT.IRQ_MIN:
+ self._overflows -= 1
+ elif mask & self._PCNT.IRQ_MAX:
+ self._overflows += 1
+
+ # Public machine.Counter & machine.Encoder API.
+ def init(self, *args, **kwargs):
+ self.__init__(self._unit_id, *args, **kwargs)
+
+ # Public machine.Counter & machine.Encoder API.
+ def deinit(self):
+ if hasattr(self, "_pcnt"):
+ self._pcnt.deinit()
+ del self._pcnt
+
+ # Public machine.Counter & machine.Encoder API.
+ def value(self, value=None):
+ if not hasattr(self, "_pcnt"):
+ raise RuntimeError("not initialised")
+
+ # This loop deals with the possibility that a PCNT overflow occurs
+ # between retrieving self._overflows and self._pcnt.value().
+ while True:
+ overflows = self._overflows
+ current = self._pcnt.value()
+ # Calling PCNT.value() forces any pending interrupts to run
+ # for this PCNT unit. So self._overflows must now be the the
+ # value corresponding to the value we read.
+ if self._overflows == overflows:
+ break
+
+ # Compute the result including the number of times we've cycled
+ # through the range, and any applied offset.
+ result = overflows * _PCNT_RANGE + current + self._offset
+
+ # If a new value is specified, then zero out the overflows, and set
+ # self._offset so that it zeros out the current PCNT value. The
+ # mutation to self._overflows is atomic w.r.t. the overflow IRQ
+ # handler because the scheduler only runs on branch instructions.
+ if value is not None:
+ self._overflows -= overflows
+ self._offset = value - current
+
+ return result
+
+ class Counter(_CounterBase):
+ # Public machine.Counter API.
+ RISING = 1
+ FALLING = 2
+ UP = _CounterBase._PCNT.INCREMENT
+ DOWN = _CounterBase._PCNT.DECREMENT
+
+ # Counter-specific configuration.
+ def _configure(self, src, edge=RISING, direction=UP):
+ # Only use the first channel.
+ self._pcnt.init(
+ channel=0,
+ pin=src,
+ rising=direction if edge & Counter.RISING else self._PCNT.IGNORE,
+ falling=direction if edge & Counter.FALLING else self._PCNT.IGNORE,
+ )
+
+ class Encoder(_CounterBase):
+ # Encoder-specific configuration.
+ def _configure(self, phase_a, phase_b, phases=1):
+ if phases not in (1, 2, 4):
+ raise ValueError("phases")
+ # Configure the first channel.
+ self._pcnt.init(
+ channel=0,
+ pin=phase_a,
+ falling=self._PCNT.INCREMENT,
+ rising=self._PCNT.DECREMENT,
+ mode_pin=phase_b,
+ mode_low=self._PCNT.HOLD if phases == 1 else self._PCNT.REVERSE,
+ )
+ if phases == 4:
+ # For 4x quadrature, enable the second channel.
+ self._pcnt.init(
+ channel=1,
+ pin=phase_b,
+ falling=self._PCNT.DECREMENT,
+ rising=self._PCNT.INCREMENT,
+ mode_pin=phase_a,
+ mode_low=self._PCNT.REVERSE,
+ )
+ else:
+ # For 1x and 2x quadrature, disable the second channel.
+ self._pcnt.init(channel=1, pin=None, rising=self._PCNT.IGNORE)
+ self._phases = phases
+
+ def phases(self):
+ return self._phases
+
+ del _CounterBase
+
+
+del esp32
+
+
+# Delegate to built-in machine module.
+def __getattr__(attr):
+ return getattr(_machine, attr)