Skip to content

Commit 8effef0

Browse files
cmagliefacchinm
authored andcommitted
Implementation of Source/Sinks and connected blocks paradigm
1 parent 2cf1739 commit 8effef0

File tree

5 files changed

+219
-0
lines changed

5 files changed

+219
-0
lines changed

examples/Blocks/Blocks.ino

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* This examples demonstrates the SOURCE/SINK abstraction.
3+
* Each thread may have any number of SOURCES and SINKS that can be connected
4+
* together using the "connectTo" method.
5+
*/
6+
7+
void setup() {
8+
data_reader.out.connectTo(data_writer.in);
9+
data_reader.start();
10+
data_writer.start();
11+
12+
// put your setup code here, to run once:
13+
pinMode(LEDR, OUTPUT);
14+
}
15+
16+
void loop() {
17+
// put your main code here, to run repeatedly:
18+
digitalWrite(LEDR, HIGH);
19+
delay(1000);
20+
digitalWrite(LEDR, LOW);
21+
delay(1000);
22+
}

examples/Blocks/SharedVariables.h

Whitespace-only changes.

examples/Blocks/data_reader.inot

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
/* The output SOURCE, it sends 'int' */
3+
SOURCE(out, int)
4+
5+
void setup() {
6+
}
7+
8+
// a '1' is sent every 100 ms
9+
void loop() {
10+
out.send(1);
11+
delay(100);
12+
}

examples/Blocks/data_writer.inot

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
/*
3+
* An 'int' SINK with a size of '0'. This kind of SINK has no buffer so the reading thread
4+
* will block until the writing thread has written something, or viceversa.
5+
*/
6+
SINK(in, int, 0)
7+
8+
void setup() {
9+
pinMode(LEDB, OUTPUT);
10+
}
11+
12+
void loop() {
13+
// Read an 'int' from the SINK and discards it. Since there is basically no delay in the loop
14+
// this call will surely block until something comes from the connected SOURCE. In this case
15+
// the pace is dictated by the SOURCE that sends data every 100 ms.
16+
in.read();
17+
digitalWrite(LEDB, HIGH);
18+
19+
in.read();
20+
digitalWrite(LEDB, LOW);
21+
}

src/Arduino_Threads.h

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,170 @@
33

44
#include <MemoryPool.h>
55

6+
#define SOURCE(name, type) \
7+
public: \
8+
Source<type> name; \
9+
private:
10+
11+
#define SINK(name, type, size) \
12+
public: \
13+
Sink<type> name{size}; \
14+
private:
15+
// we need to call the Sink<T>(int size) non-default constructor using size as parameter.
16+
// This is done by writing
17+
// Sink<type> name{size};
18+
// instead of:
19+
// Sink<type> name(size);
20+
// otherwise the compiler will read it as a declaration of a method called "name" and we
21+
// get a syntax error.
22+
// This is called "C++11 uniform init" (using "{}" instead of "()" without "="... yikes!)
23+
// https://chromium.googlesource.com/chromium/src/+/master/styleguide/c++/c++-dos-and-donts.md
24+
25+
// Forward declaration of Sink and Source
26+
template<class T>
27+
class Sink;
28+
template<class T>
29+
class Source;
30+
31+
template<class T>
32+
class Sink
33+
{
34+
private:
35+
rtos::Mutex dataMutex;
36+
rtos::ConditionVariable dataAvailable;
37+
rtos::ConditionVariable slotAvailable;
38+
T latest;
39+
Sink *next;
40+
const int size;
41+
int first, last;
42+
bool full;
43+
T *queue;
44+
45+
public:
46+
Sink(int s) :
47+
dataAvailable(dataMutex),
48+
slotAvailable(dataMutex),
49+
size(s),
50+
queue((size > 0) ? new T[size] : nullptr),
51+
first(0), last(0), full(false)
52+
{};
53+
54+
~Sink() {
55+
if (queue != nullptr) { delete queue; }
56+
}
57+
58+
59+
//protected: TODO
60+
void connectTo(Sink &sink)
61+
{
62+
if (next == nullptr) {
63+
next = &sink;
64+
} else {
65+
next->connectTo(sink);
66+
}
67+
}
68+
69+
T read()
70+
{
71+
// Non-blocking shared variable
72+
if (size == -1) {
73+
dataMutex.lock();
74+
T res = latest;
75+
dataMutex.unlock();
76+
return res;
77+
}
78+
79+
// Blocking shared variable
80+
if (size == 0) {
81+
dataMutex.lock();
82+
while (!full) {
83+
dataAvailable.wait();
84+
}
85+
T res = latest;
86+
full = false;
87+
slotAvailable.notify_all();
88+
dataMutex.unlock();
89+
return res;
90+
}
91+
92+
// Blocking queue
93+
dataMutex.lock();
94+
while (first == last && !full) {
95+
dataAvailable.wait();
96+
}
97+
T res = queue[first++];
98+
first %= size;
99+
if (full) {
100+
full = false;
101+
slotAvailable.notify_one();
102+
}
103+
dataMutex.unlock();
104+
return res;
105+
}
106+
107+
//protected: TODO
108+
void inject(const T &value)
109+
{
110+
dataMutex.lock();
111+
112+
// Non-blocking shared variable
113+
if (size == -1) {
114+
latest = value;
115+
}
116+
117+
// Blocking shared variable
118+
else if (size == 0) {
119+
while (full) {
120+
slotAvailable.wait();
121+
}
122+
latest = value;
123+
full = true;
124+
dataAvailable.notify_one();
125+
slotAvailable.wait();
126+
}
127+
128+
// Blocking queue
129+
else {
130+
while (full) {
131+
slotAvailable.wait();
132+
}
133+
if (first == last) {
134+
dataAvailable.notify_one();
135+
}
136+
queue[last++] = value;
137+
last %= size;
138+
if (first == last) {
139+
full = true;
140+
}
141+
}
142+
dataMutex.unlock();
143+
144+
if (next) next->inject(value);
145+
}
146+
};
147+
148+
template<class T>
149+
class Source
150+
{
151+
public:
152+
Source() {};
153+
154+
void connectTo(Sink<T> &sink) {
155+
if (destination == nullptr) {
156+
destination = &sink;
157+
} else {
158+
destination->connectTo(sink);
159+
}
160+
}
161+
162+
void send(const T &value) {
163+
if (destination) destination->inject(value);
164+
}
165+
166+
private:
167+
Sink<T> *destination;
168+
};
169+
6170
template<class T, size_t QUEUE_SIZE = 16>
7171
class Shared // template definition
8172
{

0 commit comments

Comments
 (0)