Skip to content

Commit c7dbdff

Browse files
committed
Enhanced handling of pluggable discoveries states
1 parent 4c14866 commit c7dbdff

File tree

3 files changed

+71
-41
lines changed

3 files changed

+71
-41
lines changed

arduino/discovery/discovery.go

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package discovery
1717

1818
import (
1919
"encoding/json"
20+
"fmt"
2021
"io"
2122
"sync"
2223
"time"
@@ -29,6 +30,17 @@ import (
2930
"github.com/pkg/errors"
3031
)
3132

33+
// To work correctly a Pluggable Discovery must respect the state machine specifed on the documentation:
34+
// https://arduino.github.io/arduino-cli/latest/pluggable-discovery-specification/#state-machine
35+
// States a PluggableDiscovery can be in
36+
const (
37+
Alive int = iota
38+
Idling
39+
Running
40+
Syncing
41+
Dead
42+
)
43+
3244
// PluggableDiscovery is a tool that detects communication ports to interact
3345
// with the boards.
3446
type PluggableDiscovery struct {
@@ -40,8 +52,7 @@ type PluggableDiscovery struct {
4052
// All the following fields are guarded by statusMutex
4153
statusMutex sync.Mutex
4254
incomingMessagesError error
43-
alive bool
44-
eventsMode bool
55+
state int
4556
eventChan chan<- *Event
4657
cachedPorts map[string]*Port
4758
}
@@ -114,6 +125,7 @@ func New(id string, args ...string) (*PluggableDiscovery, error) {
114125
process: proc,
115126
incomingMessagesChan: messageChan,
116127
outgoingCommandsPipe: stdin,
128+
state: Dead,
117129
}
118130
go disc.jsonDecodeLoop(stdout, messageChan)
119131
return disc, nil
@@ -132,7 +144,7 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
132144
decoder := json.NewDecoder(in)
133145
closeAndReportError := func(err error) {
134146
disc.statusMutex.Lock()
135-
disc.alive = false
147+
disc.state = Dead
136148
disc.incomingMessagesError = err
137149
disc.statusMutex.Unlock()
138150
close(outChan)
@@ -173,19 +185,11 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
173185
}
174186
}
175187

176-
// IsAlive return true if the discovery process is running and so is able to receive commands
177-
// and produce events.
178-
func (disc *PluggableDiscovery) IsAlive() bool {
179-
disc.statusMutex.Lock()
180-
defer disc.statusMutex.Unlock()
181-
return disc.alive
182-
}
183-
184-
// IsEventMode return true if the discovery is in "events" mode
185-
func (disc *PluggableDiscovery) IsEventMode() bool {
188+
// State returns the current state of this PluggableDiscovery
189+
func (disc *PluggableDiscovery) State() int {
186190
disc.statusMutex.Lock()
187191
defer disc.statusMutex.Unlock()
188-
return disc.eventsMode
192+
return disc.state
189193
}
190194

191195
func (disc *PluggableDiscovery) waitMessage(timeout time.Duration) (*discoveryMessage, error) {
@@ -199,7 +203,7 @@ func (disc *PluggableDiscovery) waitMessage(timeout time.Duration) (*discoveryMe
199203
}
200204
return msg, nil
201205
case <-time.After(timeout):
202-
return nil, errors.New("timeout")
206+
return nil, fmt.Errorf("timeout waiting for message from %s", disc.id)
203207
}
204208
}
205209

@@ -223,7 +227,7 @@ func (disc *PluggableDiscovery) runProcess() error {
223227
}
224228
disc.statusMutex.Lock()
225229
defer disc.statusMutex.Unlock()
226-
disc.alive = true
230+
disc.state = Alive
227231
return nil
228232
}
229233

@@ -238,14 +242,17 @@ func (disc *PluggableDiscovery) Run() error {
238242
return err
239243
}
240244
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
241-
return err
245+
return fmt.Errorf("calling HELLO: %w", err)
242246
} else if msg.EventType != "hello" {
243247
return errors.Errorf(tr("communication out of sync, expected 'hello', received '%s'"), msg.EventType)
244248
} else if msg.Message != "OK" || msg.Error {
245249
return errors.Errorf(tr("command failed: %s"), msg.Message)
246250
} else if msg.ProtocolVersion > 1 {
247251
return errors.Errorf(tr("protocol version not supported: requested 1, got %d"), msg.ProtocolVersion)
248252
}
253+
disc.statusMutex.Lock()
254+
defer disc.statusMutex.Unlock()
255+
disc.state = Idling
249256
return nil
250257
}
251258

@@ -256,12 +263,15 @@ func (disc *PluggableDiscovery) Start() error {
256263
return err
257264
}
258265
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
259-
return err
266+
return fmt.Errorf("calling START: %w", err)
260267
} else if msg.EventType != "start" {
261268
return errors.Errorf(tr("communication out of sync, expected 'start', received '%s'"), msg.EventType)
262269
} else if msg.Message != "OK" || msg.Error {
263270
return errors.Errorf(tr("command failed: %s"), msg.Message)
264271
}
272+
disc.statusMutex.Lock()
273+
defer disc.statusMutex.Unlock()
274+
disc.state = Running
265275
return nil
266276
}
267277

@@ -273,19 +283,20 @@ func (disc *PluggableDiscovery) Stop() error {
273283
return err
274284
}
275285
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
276-
return err
286+
return fmt.Errorf("calling STOP: %w", err)
277287
} else if msg.EventType != "stop" {
278288
return errors.Errorf(tr("communication out of sync, expected 'stop', received '%s'"), msg.EventType)
279289
} else if msg.Message != "OK" || msg.Error {
280290
return errors.Errorf(tr("command failed: %s"), msg.Message)
281291
}
282292
disc.statusMutex.Lock()
283293
defer disc.statusMutex.Unlock()
294+
// TODO: Should we clear cached ports here?
284295
if disc.eventChan != nil {
285296
close(disc.eventChan)
286297
disc.eventChan = nil
287298
}
288-
disc.eventsMode = false
299+
disc.state = Idling
289300
return nil
290301
}
291302

@@ -295,7 +306,7 @@ func (disc *PluggableDiscovery) Quit() error {
295306
return err
296307
}
297308
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
298-
return err
309+
return fmt.Errorf("calling QUIT: %w", err)
299310
} else if msg.EventType != "quit" {
300311
return errors.Errorf(tr("communication out of sync, expected 'quit', received '%s'"), msg.EventType)
301312
} else if msg.Message != "OK" || msg.Error {
@@ -307,7 +318,7 @@ func (disc *PluggableDiscovery) Quit() error {
307318
close(disc.eventChan)
308319
disc.eventChan = nil
309320
}
310-
disc.alive = false
321+
disc.state = Dead
311322
return nil
312323
}
313324

@@ -318,7 +329,7 @@ func (disc *PluggableDiscovery) List() ([]*Port, error) {
318329
return nil, err
319330
}
320331
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
321-
return nil, err
332+
return nil, fmt.Errorf("calling LIST: %w", err)
322333
} else if msg.EventType != "list" {
323334
return nil, errors.Errorf(tr("communication out of sync, expected 'list', received '%s'"), msg.EventType)
324335
} else if msg.Error {
@@ -349,25 +360,21 @@ func (disc *PluggableDiscovery) EventChannel(size int) <-chan *Event {
349360
// After calling StartSync an initial burst of "add" events may be generated to
350361
// report all the ports available at the moment of the start.
351362
func (disc *PluggableDiscovery) StartSync() error {
352-
disc.statusMutex.Lock()
353-
defer disc.statusMutex.Unlock()
354-
355-
if disc.eventsMode {
356-
return errors.New(tr("already in events mode"))
357-
}
358363
if err := disc.sendCommand("START_SYNC\n"); err != nil {
359364
return err
360365
}
361366

362367
if msg, err := disc.waitMessage(time.Second * 10); err != nil {
363-
return err
368+
return fmt.Errorf("calling START_SYNC: %w", err)
364369
} else if msg.EventType != "start_sync" {
365370
return errors.Errorf(tr("communication out of sync, expected 'start_sync', received '%s'"), msg.EventType)
366371
} else if msg.Message != "OK" || msg.Error {
367372
return errors.Errorf(tr("command failed: %s"), msg.Message)
368373
}
369374

370-
disc.eventsMode = true
375+
disc.statusMutex.Lock()
376+
defer disc.statusMutex.Unlock()
377+
disc.state = Syncing
371378
disc.cachedPorts = map[string]*Port{}
372379
return nil
373380
}

arduino/discovery/discovery_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ func TestDiscoveryStdioHandling(t *testing.T) {
6464
require.NotNil(t, msg)
6565
require.Equal(t, "ev2", msg.EventType)
6666

67-
require.True(t, disc.IsAlive())
67+
require.Equal(t, disc.State(), Alive)
6868

6969
err = disc.outgoingCommandsPipe.(io.ReadCloser).Close()
7070
require.NoError(t, err)
7171
time.Sleep(time.Millisecond * 100)
7272

73-
require.False(t, disc.IsAlive())
73+
require.Equal(t, disc.State(), Dead)
7474
}

arduino/discovery/discoverymanager/discoverymanager.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func (dm *DiscoveryManager) Add(disc *discovery.PluggableDiscovery) error {
6868
// returns the first error it meets or nil
6969
func (dm *DiscoveryManager) RunAll() error {
7070
for _, d := range dm.discoveries {
71-
if d.IsAlive() {
72-
// This discovery is already running, nothing to do
71+
if d.State() != discovery.Dead {
72+
// This discovery is already alive, nothing to do
7373
continue
7474
}
7575

@@ -84,6 +84,11 @@ func (dm *DiscoveryManager) RunAll() error {
8484
// returns the first error it meets or nil
8585
func (dm *DiscoveryManager) StartAll() error {
8686
for _, d := range dm.discoveries {
87+
state := d.State()
88+
if state != discovery.Idling || state == discovery.Running {
89+
// Already started
90+
continue
91+
}
8792
if err := d.Start(); err != nil {
8893
return err
8994
}
@@ -99,8 +104,9 @@ func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {
99104
dm.globalEventCh = make(chan *discovery.Event, 5)
100105
}
101106
for _, d := range dm.discoveries {
102-
if d.IsEventMode() {
103-
// Already started, nothing to do
107+
state := d.State()
108+
if state != discovery.Idling || state == discovery.Syncing {
109+
// Already syncing
104110
continue
105111
}
106112

@@ -121,6 +127,11 @@ func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {
121127
// returns the first error it meets or nil
122128
func (dm *DiscoveryManager) StopAll() error {
123129
for _, d := range dm.discoveries {
130+
state := d.State()
131+
if state != discovery.Syncing && state != discovery.Running {
132+
// Not running nor syncing, nothing to stop
133+
continue
134+
}
124135
err := d.Stop()
125136
if err != nil {
126137
return err
@@ -133,6 +144,10 @@ func (dm *DiscoveryManager) StopAll() error {
133144
// Returns the first error it meets or nil
134145
func (dm *DiscoveryManager) QuitAll() error {
135146
for _, d := range dm.discoveries {
147+
if d.State() == discovery.Dead {
148+
// Stop! Stop! It's already dead!
149+
continue
150+
}
136151
err := d.Quit()
137152
if err != nil {
138153
return err
@@ -148,8 +163,12 @@ func (dm *DiscoveryManager) QuitAll() error {
148163
// List returns a list of available ports detected from all discoveries
149164
func (dm *DiscoveryManager) List() []*discovery.Port {
150165
res := []*discovery.Port{}
151-
for _, disc := range dm.discoveries {
152-
l, err := disc.List()
166+
for _, d := range dm.discoveries {
167+
if d.State() != discovery.Running {
168+
// Discovery is not running, it won't return anything
169+
continue
170+
}
171+
l, err := d.List()
153172
if err != nil {
154173
continue
155174
}
@@ -161,8 +180,12 @@ func (dm *DiscoveryManager) List() []*discovery.Port {
161180
// ListSync return the current list of ports detected from all discoveries
162181
func (dm *DiscoveryManager) ListSync() []*discovery.Port {
163182
res := []*discovery.Port{}
164-
for _, disc := range dm.discoveries {
165-
res = append(res, disc.ListSync()...)
183+
for _, d := range dm.discoveries {
184+
if d.State() != discovery.Syncing {
185+
// Discovery is not syncing
186+
continue
187+
}
188+
res = append(res, d.ListSync()...)
166189
}
167190
return res
168191
}

0 commit comments

Comments
 (0)