Skip to content

Commit 80f4f6b

Browse files
committed
add ObservableDataStream and DataStreamObserver pattern implementation
1 parent 2e7436a commit 80f4f6b

File tree

3 files changed

+232
-0
lines changed

3 files changed

+232
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1414
- SoftwareSerial. That took a while.
1515
- Queue template implementation
1616
- Table template implementation
17+
- ObservableDataStream and DataStreamObserver pattern implementation
1718

1819
### Changed
1920
- Unit test executables print to STDERR just in case there are segfaults. Uh, just in case I ever write any.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#include <Arduino.h>
2+
#include <ArduinoUnitTests.h>
3+
#include <ci/ObservableDataStream.h>
4+
5+
class Source : public ObservableDataStream {
6+
public:
7+
Source() : ObservableDataStream() {}
8+
9+
// expose protected functions
10+
void doBit(bool val) { advertiseBit(val); }
11+
void doByte(unsigned char val) { advertiseByte(val); }
12+
};
13+
14+
class Sink : public DataStreamObserver {
15+
public:
16+
bool lastBit;
17+
unsigned char lastByte;
18+
19+
Sink() : DataStreamObserver(false, false) {}
20+
21+
virtual String observerName() const { return "Sink"; }
22+
virtual void onBit(bool val) { lastBit = val; }
23+
virtual void onByte(unsigned char val) { lastByte = val; }
24+
};
25+
26+
class BitpackSink : public DataStreamObserver {
27+
public:
28+
bool lastBit;
29+
unsigned char lastByte;
30+
31+
BitpackSink() : DataStreamObserver(true, true) {}
32+
33+
virtual String observerName() const { return "BitpackSink"; }
34+
virtual void onBit(bool val) { lastBit = val; }
35+
virtual void onByte(unsigned char val) { lastByte = val; }
36+
};
37+
38+
unittest(attach_sink_to_src)
39+
{
40+
Source src = Source();
41+
Sink dst = Sink();
42+
43+
dst.lastByte = 'z';
44+
src.addObserver("foo", &dst);
45+
src.doByte('a');
46+
assertEqual('a', dst.lastByte);
47+
src.removeObserver("foo");
48+
src.doByte('b');
49+
assertEqual('a', dst.lastByte);
50+
}
51+
52+
unittest(attach_src_to_sink)
53+
{
54+
Source src = Source();
55+
Sink dst = Sink();
56+
57+
dst.attach(&src);
58+
src.doByte('f');
59+
assertEqual('f', dst.lastByte);
60+
}
61+
62+
// 01010100 T if bigendian
63+
unittest(bitpack)
64+
{
65+
Source src = Source();
66+
Sink dst = Sink();
67+
BitpackSink bst = BitpackSink();
68+
69+
bool message[8] = {0, 1, 0, 1, 0, 1, 0, 0};
70+
71+
bst.lastByte = 'f';
72+
dst.lastByte = 'f';
73+
bst.attach(&src);
74+
dst.attach(&src);
75+
76+
for (int i = 0; i < 8; ++i) {
77+
src.doBit(message[i]);
78+
assertEqual(message[i], bst.lastBit);
79+
assertEqual(message[i], dst.lastBit);
80+
}
81+
82+
assertEqual('f', dst.lastByte); // not doing bitpacking
83+
assertEqual('T', bst.lastByte); // should have formed a binary T char by now
84+
assertNotEqual('*', bst.lastByte); // backwards endianness
85+
}
86+
87+
// 01010100 T if bigendian
88+
unittest(from_pinhistory)
89+
{
90+
GodmodeState* state = GODMODE();
91+
state->reset();
92+
93+
BitpackSink bst = BitpackSink();
94+
bst.attach(&state->digitalPin[2]);
95+
bst.lastByte = 'f';
96+
97+
bool message[8] = {0, 1, 0, 1, 0, 1, 0, 0};
98+
for (int i = 0; i < 8; ++i) {
99+
digitalWrite(2, message[i]);
100+
assertEqual(message[i], bst.lastBit);
101+
}
102+
103+
assertEqual('T', bst.lastByte); // should have formed a binary T char by now
104+
assertNotEqual('*', bst.lastByte); // backwards endianness
105+
}
106+
107+
unittest_main()

cpp/arduino/ci/ObservableDataStream.h

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#pragma once
2+
3+
#include "Table.h"
4+
#include <WString.h>
5+
6+
7+
// This pair of classes defines an Observer pattern for bits and bytes.
8+
// This would allow us to create "devices" that respond in "real" time
9+
// to Arduino outputs, in the form of altering the Arduino inputs
10+
//
11+
// e.g. replying to a serial output with serial input
12+
class ObservableDataStream;
13+
14+
// datastream observers handle deliveries of bits and bytes.
15+
// optionally, they can turn bit events into byte events with a given endianness
16+
class DataStreamObserver {
17+
private:
18+
unsigned int mBitPosition; // for building the byte (mask helper)
19+
unsigned char mBuildingByte; // for storing incoming bits
20+
bool mAutoBitPack; // whether to report the packed bits
21+
bool mBigEndian; // bit order for byte
22+
23+
protected:
24+
// functions that are up to the implementer to provide.
25+
virtual void onBit(bool aBit) {}
26+
virtual void onByte(unsigned char aByte) {}
27+
virtual String observerName() const = 0;
28+
29+
public:
30+
DataStreamObserver(bool autoBitPack, bool bigEndian)
31+
{
32+
mBitPosition = 0;
33+
mBuildingByte = 0x00;
34+
mAutoBitPack = autoBitPack;
35+
mBigEndian = bigEndian;
36+
}
37+
38+
virtual ~DataStreamObserver() {}
39+
40+
// entry point for byte-related handler
41+
void handleByte(unsigned char aByte) {
42+
onByte(aByte);
43+
}
44+
45+
// entry poitn for bit-related handler
46+
void handleBit(bool aBit) {
47+
onBit(aBit);
48+
49+
if (!mAutoBitPack) return;
50+
51+
// build the next value
52+
int shift = mBigEndian ? 7 - mBitPosition : mBitPosition;
53+
unsigned char val = aBit ? 0x1 : 0x0;
54+
mBuildingByte |= (val << shift);
55+
56+
// if we roll over after incrementing, the byte is ready to ship
57+
mBitPosition = (mBitPosition + 1) % 8;
58+
if (mBitPosition == 0) {
59+
handleByte(mBuildingByte);
60+
mBuildingByte = 0x00;
61+
};
62+
}
63+
64+
// inlined after ObservableDataStream definition to fake out the compiler
65+
bool attach(ObservableDataStream* source);
66+
bool detach(ObservableDataStream* source);
67+
};
68+
69+
// Inheritable interface for things that produce data, like pins or serial ports
70+
// this class allows others to subscribe for updates on these values and trigger actions
71+
// e.g. if you "turn on" a motor with one pin and expect to see a change in an analog pin
72+
class ObservableDataStream
73+
{
74+
private:
75+
Table<String, DataStreamObserver*> mObservers;
76+
bool mAdvertisingBit;
77+
unsigned char mAdvertisingByte;
78+
79+
protected:
80+
// to allow both member and non-member functions to be called, we need to trick the compiler
81+
// into getting the (this) of a static function. So the default is a work function signature
82+
// that takes a second optional argument. in this case, we use the argument.
83+
84+
static void workAdvertiseBit(ObservableDataStream* that, String _, DataStreamObserver* val) {
85+
val->handleBit(that->mAdvertisingBit);
86+
}
87+
88+
static void workAdvertiseByte(ObservableDataStream* that, String _, DataStreamObserver* val) {
89+
val->handleByte(that->mAdvertisingByte);
90+
}
91+
92+
// advertise functions allow the data stream to publish to observers
93+
94+
// update all observers with a byte value
95+
void advertiseByte(unsigned char aByte) {
96+
// save the value to a class variable. then use the static method with this instance
97+
mAdvertisingByte = aByte;
98+
mObservers.iterate(workAdvertiseByte, this);
99+
}
100+
101+
// update all observers with a byte value
102+
// build up a byte
103+
// if requested, advertise the byte
104+
void advertiseBit(bool aBit) {
105+
// do the named thing first, of course
106+
mAdvertisingBit = aBit;
107+
mObservers.iterate(workAdvertiseBit, this);
108+
}
109+
110+
public:
111+
ObservableDataStream() : mObservers() {
112+
mAdvertisingBit = false; // we'll re-init on demand though
113+
mAdvertisingByte = 0x07; // we'll re-init on demand though
114+
}
115+
116+
virtual ~ObservableDataStream() {}
117+
118+
bool addObserver(String name, DataStreamObserver* obs) { return mObservers.add(name, obs); }
119+
bool removeObserver(String name) { return mObservers.remove(name); }
120+
};
121+
122+
inline bool DataStreamObserver::attach(ObservableDataStream* source) { return source->addObserver(observerName(), this); }
123+
124+
inline bool DataStreamObserver::detach(ObservableDataStream* source) { return source->removeObserver(observerName()); }

0 commit comments

Comments
 (0)