Skip to content

Commit 064e171

Browse files
authored
Add MultiSparkline
And rewrite Sparkline to be a subclass of MultiSparkline
1 parent d1e96bf commit 064e171

File tree

2 files changed

+340
-227
lines changed

2 files changed

+340
-227
lines changed
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
# SPDX-FileCopyrightText: 2020 Kevin Matocha
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
"""
6+
`multisparkline`
7+
================================================================================
8+
9+
Various common shapes for use with displayio - Multiple Sparklines on one chart!
10+
11+
12+
* Author(s): Kevin Matocha, Maciej Sokolowski
13+
14+
Implementation Notes
15+
--------------------
16+
17+
**Software and Dependencies:**
18+
19+
* Adafruit CircuitPython firmware for the supported boards:
20+
https://github.com/adafruit/circuitpython/releases
21+
22+
"""
23+
24+
try:
25+
from typing import Optional, List
26+
except ImportError:
27+
pass
28+
import displayio
29+
from adafruit_display_shapes.polygon import Polygon
30+
31+
32+
class _CyclicBuffer:
33+
def __init__(self, size: int) -> None:
34+
self._buffer = [0.0] * size
35+
self._start = 0 # between 0 and size-1
36+
self._end = 0 # between 0 and 2*size-1
37+
38+
def push(self, value: int | float) -> None:
39+
"""Pushes value at the end of the buffer.
40+
41+
:param int|float value: value to be pushed
42+
43+
"""
44+
45+
if self.len() == len(self._buffer):
46+
raise RuntimeError("Trying to push to full buffer")
47+
self._buffer[self._end % len(self._buffer)] = value
48+
self._end += 1
49+
50+
def pop(self) -> int | float:
51+
"""Pop value from the start of the buffer and returns it."""
52+
53+
if self.len() == 0:
54+
raise RuntimeError("Trying to pop from empty buffer")
55+
result = self._buffer[self._start]
56+
self._start += 1
57+
if self._start == len(self._buffer):
58+
self._start -= len(self._buffer)
59+
self._end -= len(self._buffer)
60+
return result
61+
62+
def len(self) -> int:
63+
"""Returns count of valid data in the buffer."""
64+
65+
return self._end - self._start
66+
67+
def clear(self) -> None:
68+
"""Marks all data as invalid."""
69+
70+
self._start = 0
71+
self._end = 0
72+
73+
def values(self) -> List[int | float]:
74+
"""Returns valid data from the buffer."""
75+
76+
if self.len() == 0:
77+
return []
78+
start = self._start
79+
end = self._end % len(self._buffer)
80+
if start < end:
81+
return self._buffer[start:end]
82+
return self._buffer[start:] + self._buffer[:end]
83+
84+
85+
class MultiSparkline(displayio.TileGrid):
86+
"""A multiple sparkline graph.
87+
88+
:param int width: Width of the multisparkline graph in pixels
89+
:param int height: Height of the multisparkline graph in pixels
90+
:param int max_items: Maximum number of values housed in each sparkline
91+
:param bool dyn_xpitch: (Optional) Dynamically change xpitch (True)
92+
:param list y_mins: Lower range for the y-axis per line.
93+
Set each to None for autorange of respective line.
94+
Set to None for autorange of all lines.
95+
:param list y_maxs: Upper range for the y-axis per line.
96+
Set each to None for autorange of respective line.
97+
Set to None for autorange of all lines.
98+
:param int x: X-position on the screen, in pixels
99+
:param int y: Y-position on the screen, in pixels
100+
:param list colors: Each line color. Number of items in this list determines maximum
101+
number of sparklines
102+
103+
Note: If dyn_xpitch is True (default), each sparkline will allways span
104+
the complete width. Otherwise, each sparkline will grow when you
105+
add values. Once the line has reached the full width, each sparkline
106+
will scroll to the left.
107+
"""
108+
109+
def __init__(
110+
self,
111+
width: int,
112+
height: int,
113+
max_items: int,
114+
colors: List[int], # each line color
115+
dyn_xpitch: Optional[bool] = True, # True = dynamic pitch size
116+
y_mins: Optional[List[Optional[int]]] = None, # None = autoscaling
117+
y_maxs: Optional[List[Optional[int]]] = None, # None = autoscaling
118+
x: int = 0,
119+
y: int = 0,
120+
) -> None:
121+
# define class instance variables
122+
self._max_items = max_items # maximum number of items in the list
123+
self._lines = len(colors)
124+
self._buffers = [
125+
_CyclicBuffer(self._max_items) for i in range(self._lines)
126+
] # values per sparkline
127+
self._points = [
128+
_CyclicBuffer(self._max_items) for i in range(self._lines)
129+
] # _points: all points of sparkline
130+
self.dyn_xpitch = dyn_xpitch
131+
if not dyn_xpitch:
132+
self._xpitch = (width - 1) / (self._max_items - 1)
133+
self.y_mins = (
134+
[None] * self._lines if y_mins is None else y_mins
135+
) # minimum of each y-axis (None: autoscale)
136+
self.y_maxs = (
137+
[None] * self._lines if y_maxs is None else y_maxs
138+
) # maximum of each y-axis (None: autoscale)
139+
self.y_bottoms = self.y_mins.copy()
140+
# y_bottom: The actual minimum value of the vertical scale, will be
141+
# updated if autorange
142+
self.y_tops = self.y_maxs.copy()
143+
# y_top: The actual minimum value of the vertical scale, will be
144+
# updated if autorange
145+
self._palette = displayio.Palette(self._lines + 1)
146+
self._palette.make_transparent(0)
147+
for (i, color) in enumerate(colors):
148+
self._palette[i + 1] = color
149+
self._bitmap = displayio.Bitmap(width, height, self._lines + 1)
150+
151+
super().__init__(self._bitmap, pixel_shader=self._palette, x=x, y=y)
152+
153+
def clear_values(self) -> None:
154+
"""Clears _buffer and removes all lines in the group"""
155+
self._bitmap.fill(0)
156+
for buffer in self._buffers:
157+
buffer.clear()
158+
159+
def add_values(self, values: List[float], update: bool = True) -> None:
160+
"""Add a value to each sparkline.
161+
162+
:param list values: The values to be added, one per sparkline
163+
:param bool update: trigger recreation of primitives
164+
165+
Note: when adding multiple values per sparkline it is more efficient to call
166+
this method with parameter 'update=False' and then to manually
167+
call the update()-method
168+
"""
169+
170+
for (i, value) in enumerate(values):
171+
if value is not None:
172+
top = self.y_tops[i]
173+
bottom = self.y_bottoms[i]
174+
if (
175+
self._buffers[i].len() >= self._max_items
176+
): # if list is full, remove the first item
177+
first = self._buffers[i].pop()
178+
# check if boundaries have to be updated
179+
if self.y_mins[i] is None and first == bottom:
180+
bottom = min(self._buffers[i].values())
181+
if self.y_maxs[i] is None and first == self.y_tops[i]:
182+
top = max(self._buffers[i].values())
183+
self._buffers[i].push(value)
184+
185+
if self.y_mins[i] is None:
186+
bottom = value if not bottom else min(value, bottom)
187+
if self.y_maxs[i] is None:
188+
top = value if not top else max(value, top)
189+
190+
self.y_tops[i] = top
191+
self.y_bottoms[i] = bottom
192+
193+
if update:
194+
self.update(i)
195+
196+
@staticmethod
197+
def _xintercept(
198+
x_1: float,
199+
y_1: float,
200+
x_2: float,
201+
y_2: float,
202+
horizontal_y: float,
203+
) -> Optional[
204+
int
205+
]: # finds intercept of the line and a horizontal line at horizontalY
206+
slope = (y_2 - y_1) / (x_2 - x_1)
207+
b = y_1 - slope * x_1
208+
209+
if slope == 0 and y_1 != horizontal_y: # does not intercept horizontalY
210+
return None
211+
else:
212+
xint = (
213+
horizontal_y - b
214+
) / slope # calculate the x-intercept at position y=horizontalY
215+
return int(xint)
216+
217+
def _add_point(
218+
self,
219+
line: int,
220+
x: int,
221+
value: float,
222+
) -> None:
223+
# Guard for y_top and y_bottom being the same
224+
top = self.y_tops[line]
225+
bottom = self.y_bottoms[line]
226+
if top == bottom:
227+
y = int(0.5 * self.height)
228+
else:
229+
y = int((self.height - 1) * (top - value) / (top - bottom))
230+
self._points[line].push((x, y))
231+
232+
def _draw(self) -> None:
233+
self._bitmap.fill(0)
234+
for i in range(self._lines):
235+
Polygon.draw(self._bitmap, self._points[i].values(), i + 1, close=False)
236+
237+
def update_line(self, line: int = None) -> None:
238+
"""Update the drawing of the sparkline.
239+
param int|None line: Line to update. Set to None for updating all (default).
240+
"""
241+
242+
if line is None:
243+
lines = range(self._lines)
244+
else:
245+
lines = [line]
246+
247+
redraw = False
248+
for l in lines:
249+
# bail out early if we only have a single point
250+
n_points = self._buffers[l].len()
251+
if n_points < 2:
252+
continue
253+
254+
redraw = True
255+
if self.dyn_xpitch:
256+
# this is a float, only make int when plotting the line
257+
xpitch = (self.width - 1) / (n_points - 1)
258+
else:
259+
xpitch = self._xpitch
260+
261+
self._points[l].clear() # remove all points
262+
263+
for count, value in enumerate(self._buffers[l].values()):
264+
if count == 0:
265+
self._add_point(l, 0, value)
266+
else:
267+
x = int(xpitch * count)
268+
last_x = int(xpitch * (count - 1))
269+
top = self.y_tops[l]
270+
bottom = self.y_bottoms[l]
271+
272+
if (bottom <= last_value <= top) and (
273+
bottom <= value <= top
274+
): # both points are in range, plot the line
275+
self._add_point(l, x, value)
276+
277+
else: # at least one point is out of range, clip one or both ends the line
278+
if ((last_value > top) and (value > top)) or (
279+
(last_value < bottom) and (value < bottom)
280+
):
281+
# both points are on the same side out of range: don't draw anything
282+
pass
283+
else:
284+
xint_bottom = self._xintercept(
285+
last_x, last_value, x, value, bottom
286+
) # get possible new x intercept points
287+
xint_top = self._xintercept(
288+
last_x, last_value, x, value, top
289+
) # on the top and bottom of range
290+
if (xint_bottom is None) or (
291+
xint_top is None
292+
): # out of range doublecheck
293+
pass
294+
else:
295+
# Initialize the adjusted values as the baseline
296+
adj_x = x
297+
adj_value = value
298+
299+
if value > last_value: # slope is positive
300+
if xint_top <= x: # top is clipped
301+
adj_x = xint_top
302+
adj_value = top # y
303+
else: # slope is negative
304+
if xint_bottom <= x: # bottom is clipped
305+
adj_x = xint_bottom
306+
adj_value = bottom # y
307+
308+
self._add_point(l, adj_x, adj_value)
309+
310+
last_value = value # store value for the next iteration
311+
312+
if redraw:
313+
self._draw()
314+
315+
def values(self, line: int) -> List[float]:
316+
"""Returns the values displayed on the sparkline at given index."""
317+
318+
return self._buffers[line].values()
319+
320+
@property
321+
def width(self) -> int:
322+
"""
323+
:return: the width of the graph in pixels
324+
"""
325+
return self._bitmap.width
326+
327+
@property
328+
def height(self) -> int:
329+
"""
330+
:return: the height of the graph in pixels
331+
"""
332+
return self._bitmap.height

0 commit comments

Comments
 (0)