summaryrefslogtreecommitdiff
path: root/ports/esp32/modules/machine.py
blob: 9cfda12f17753bcf71dafcd1a8c63ea68ccf9103 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import sys

_path = sys.path
sys.path = ()
try:
    import machine as _machine
finally:
    sys.path = _path
    del _path
    del sys


from micropython import const
import esp32

if hasattr(esp32, "PCNT"):
    _PCNT_RANGE = const(32000)

    class _CounterBase:
        _PCNT = esp32.PCNT
        # Singletons, keyed by PCNT unit_id (shared by both Counter & Encoder).
        _INSTANCES = {}

        # Use __new__ to implement a singleton rather than a factory function,
        # because we need to be able to provide class attributes, e.g.
        # Counter.RISING, which is not possible if Counter was a function
        # (functions cannot have attributes in MicroPython).
        def __new__(cls, unit_id, *_args, **_kwargs):
            # Find an existing instance for this PCNT unit id.
            self = cls._INSTANCES.get(unit_id)

            if self:
                # Verify that this PCNT is being used for the same type
                # (Encoder or Counter).
                if not isinstance(self, cls):
                    raise ValueError("PCNT in use")
            else:
                # Previously unused PCNT unit.
                self = object.__new__(cls)
                cls._INSTANCES[unit_id] = self

            # __init__ will now be called with the same args.
            return self

        def __init__(self, unit_id, *args, filter_ns=0, **kwargs):
            self._unit_id = unit_id

            if not hasattr(self, "_pcnt"):
                # New instance, or previously deinit-ed.
                self._pcnt = self._PCNT(unit_id, min=-_PCNT_RANGE, max=_PCNT_RANGE)
            elif not (args or kwargs):
                # Existing instance, and no args, so accessing the existing
                # singleton without reconfiguring. Note: This means that
                # Counter/Encoder cannot be partially re-initalised. Either
                # you get the existing instance as-is (by passing no arguments
                # other than the id), or you must pass all the necessary
                # arguments to additionally re-configure it.
                return

            # Counter- or Encoder-specific configuration of self._pcnt.
            self._configure(*args, **kwargs)

            # Common unit configuration.
            self._pcnt.init(
                filter=min(max(0, filter_ns * 80 // 1000), 1023),
                value=0,
            )

            # Note: We track number-of-overflows rather than the actual count in
            # order to avoid the IRQ handler overflowing MicroPython's "small int"
            # range. This gives an effective range of 2**30 overflows. User code
            # should use counter.value(0) to reset the overflow count.
            # The ESP32 PCNT resets to zero on under/overflow (i.e. it does not wrap
            # around to the opposite limit), so each overflow corresponds to exactly
            # _PCNT_RANGE counts.

            # Reset counter state.
            self._overflows = 0
            self._offset = 0

            # Install IRQ handler to handle under/overflow.
            self._pcnt.irq(self._overflow, self._PCNT.IRQ_MIN | self._PCNT.IRQ_MAX)

            # Start counting.
            self._pcnt.start()

        # Handle counter under/overflow.
        def _overflow(self, pcnt):
            mask = pcnt.irq().flags()
            if mask & self._PCNT.IRQ_MIN:
                self._overflows -= 1
            elif mask & self._PCNT.IRQ_MAX:
                self._overflows += 1

        # Public machine.Counter & machine.Encoder API.
        def init(self, *args, **kwargs):
            self.__init__(self._unit_id, *args, **kwargs)

        # Public machine.Counter & machine.Encoder API.
        def deinit(self):
            if hasattr(self, "_pcnt"):
                self._pcnt.deinit()
                del self._pcnt

        # Public machine.Counter & machine.Encoder API.
        def value(self, value=None):
            if not hasattr(self, "_pcnt"):
                raise RuntimeError("not initialised")

            # This loop deals with the possibility that a PCNT overflow occurs
            # between retrieving self._overflows and self._pcnt.value().
            while True:
                overflows = self._overflows
                current = self._pcnt.value()
                # Calling PCNT.value() forces any pending interrupts to run
                # for this PCNT unit. So self._overflows must now be the the
                # value corresponding to the value we read.
                if self._overflows == overflows:
                    break

            # Compute the result including the number of times we've cycled
            # through the range, and any applied offset.
            result = overflows * _PCNT_RANGE + current + self._offset

            # If a new value is specified, then zero out the overflows, and set
            # self._offset so that it zeros out the current PCNT value. The
            # mutation to self._overflows is atomic w.r.t. the overflow IRQ
            # handler because the scheduler only runs on branch instructions.
            if value is not None:
                self._overflows -= overflows
                self._offset = value - current

            return result

    class Counter(_CounterBase):
        # Public machine.Counter API.
        RISING = 1
        FALLING = 2
        UP = _CounterBase._PCNT.INCREMENT
        DOWN = _CounterBase._PCNT.DECREMENT

        # Counter-specific configuration.
        def _configure(self, src, edge=RISING, direction=UP):
            # Only use the first channel.
            self._pcnt.init(
                channel=0,
                pin=src,
                rising=direction if edge & Counter.RISING else self._PCNT.IGNORE,
                falling=direction if edge & Counter.FALLING else self._PCNT.IGNORE,
            )

    class Encoder(_CounterBase):
        # Encoder-specific configuration.
        def _configure(self, phase_a, phase_b, phases=1):
            if phases not in (1, 2, 4):
                raise ValueError("phases")
            # Configure the first channel.
            self._pcnt.init(
                channel=0,
                pin=phase_a,
                falling=self._PCNT.INCREMENT,
                rising=self._PCNT.DECREMENT,
                mode_pin=phase_b,
                mode_low=self._PCNT.HOLD if phases == 1 else self._PCNT.REVERSE,
            )
            if phases == 4:
                # For 4x quadrature, enable the second channel.
                self._pcnt.init(
                    channel=1,
                    pin=phase_b,
                    falling=self._PCNT.DECREMENT,
                    rising=self._PCNT.INCREMENT,
                    mode_pin=phase_a,
                    mode_low=self._PCNT.REVERSE,
                )
            else:
                # For 1x and 2x quadrature, disable the second channel.
                self._pcnt.init(channel=1, pin=None, rising=self._PCNT.IGNORE)
            self._phases = phases

        def phases(self):
            return self._phases

    del _CounterBase


del esp32


# Delegate to built-in machine module.
def __getattr__(attr):
    return getattr(_machine, attr)