|
16 | 16 | package discoverymanager
|
17 | 17 |
|
18 | 18 | import (
|
| 19 | + "context" |
| 20 | + "fmt" |
| 21 | + "sync" |
| 22 | + |
19 | 23 | "github.com/arduino/arduino-cli/arduino/discovery"
|
20 | 24 | "github.com/pkg/errors"
|
21 | 25 | )
|
@@ -64,117 +68,187 @@ func (dm *DiscoveryManager) Add(disc *discovery.PluggableDiscovery) error {
|
64 | 68 | return nil
|
65 | 69 | }
|
66 | 70 |
|
67 |
| -// RunAll the discoveries for this DiscoveryManager, |
68 |
| -// returns the first error it meets or nil |
69 |
| -func (dm *DiscoveryManager) RunAll() error { |
| 71 | +// parallelize runs function f concurrently for each discovery. |
| 72 | +// Returns a list of errors returned by each call of f. |
| 73 | +func (dm *DiscoveryManager) parallelize(f func(d *discovery.PluggableDiscovery) error) []error { |
| 74 | + var wg sync.WaitGroup |
| 75 | + ctx, cancel := context.WithCancel(context.Background()) |
| 76 | + errChan := make(chan error) |
70 | 77 | for _, d := range dm.discoveries {
|
| 78 | + wg.Add(1) |
| 79 | + go func(d *discovery.PluggableDiscovery) { |
| 80 | + defer wg.Done() |
| 81 | + if err := f(d); err != nil { |
| 82 | + errChan <- err |
| 83 | + } |
| 84 | + }(d) |
| 85 | + } |
| 86 | + |
| 87 | + // Wait in a goroutine to collect eventual errors running a discovery. |
| 88 | + // When all goroutines that are calling discoveries are done call cancel |
| 89 | + // to avoid blocking if there are no errors. |
| 90 | + go func() { |
| 91 | + wg.Wait() |
| 92 | + cancel() |
| 93 | + }() |
| 94 | + |
| 95 | + errs := []error{} |
| 96 | + for { |
| 97 | + select { |
| 98 | + case <-ctx.Done(): |
| 99 | + goto done |
| 100 | + case err := <-errChan: |
| 101 | + errs = append(errs, err) |
| 102 | + } |
| 103 | + } |
| 104 | +done: |
| 105 | + return errs |
| 106 | +} |
| 107 | + |
| 108 | +// RunAll the discoveries for this DiscoveryManager, |
| 109 | +// returns an error for each discovery failing to run |
| 110 | +func (dm *DiscoveryManager) RunAll() []error { |
| 111 | + return dm.parallelize(func(d *discovery.PluggableDiscovery) error { |
71 | 112 | if d.State() != discovery.Dead {
|
72 | 113 | // This discovery is already alive, nothing to do
|
73 |
| - continue |
| 114 | + return nil |
74 | 115 | }
|
75 | 116 |
|
76 | 117 | if err := d.Run(); err != nil {
|
77 |
| - return err |
| 118 | + return fmt.Errorf("discovery %s process not started: %w", d.GetID(), err) |
78 | 119 | }
|
79 |
| - } |
80 |
| - return nil |
| 120 | + return nil |
| 121 | + }) |
81 | 122 | }
|
82 | 123 |
|
83 | 124 | // StartAll the discoveries for this DiscoveryManager,
|
84 |
| -// returns the first error it meets or nil |
85 |
| -func (dm *DiscoveryManager) StartAll() error { |
86 |
| - for _, d := range dm.discoveries { |
| 125 | +// returns an error for each discovery failing to start |
| 126 | +func (dm *DiscoveryManager) StartAll() []error { |
| 127 | + return dm.parallelize(func(d *discovery.PluggableDiscovery) error { |
87 | 128 | state := d.State()
|
88 | 129 | if state != discovery.Idling || state == discovery.Running {
|
89 | 130 | // Already started
|
90 |
| - continue |
| 131 | + return nil |
91 | 132 | }
|
92 | 133 | if err := d.Start(); err != nil {
|
93 |
| - return err |
| 134 | + return fmt.Errorf("starting discovery %s: %w", d.GetID(), err) |
94 | 135 | }
|
95 |
| - } |
96 |
| - return nil |
| 136 | + return nil |
| 137 | + }) |
97 | 138 | }
|
98 | 139 |
|
99 | 140 | // StartSyncAll the discoveries for this DiscoveryManager,
|
100 |
| -// returns the first error it meets or nil |
| 141 | +// returns an error for each discovery failing to start syncing |
101 | 142 | func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {
|
102 |
| - errs := []error{} |
103 | 143 | if dm.globalEventCh == nil {
|
104 | 144 | dm.globalEventCh = make(chan *discovery.Event, 5)
|
105 | 145 | }
|
106 |
| - for _, d := range dm.discoveries { |
| 146 | + errs := dm.parallelize(func(d *discovery.PluggableDiscovery) error { |
107 | 147 | state := d.State()
|
108 | 148 | if state != discovery.Idling || state == discovery.Syncing {
|
109 | 149 | // Already syncing
|
110 |
| - continue |
| 150 | + return nil |
111 | 151 | }
|
112 | 152 |
|
113 | 153 | eventCh := d.EventChannel(5)
|
114 | 154 | if err := d.StartSync(); err != nil {
|
115 |
| - errs = append(errs, err) |
| 155 | + return fmt.Errorf("start syncing discovery %s: %w", d.GetID(), err) |
116 | 156 | }
|
117 | 157 | go func() {
|
118 | 158 | for ev := range eventCh {
|
119 | 159 | dm.globalEventCh <- ev
|
120 | 160 | }
|
121 | 161 | }()
|
122 |
| - } |
| 162 | + return nil |
| 163 | + }) |
123 | 164 | return dm.globalEventCh, errs
|
124 | 165 | }
|
125 | 166 |
|
126 | 167 | // StopAll the discoveries for this DiscoveryManager,
|
127 |
| -// returns the first error it meets or nil |
128 |
| -func (dm *DiscoveryManager) StopAll() error { |
129 |
| - for _, d := range dm.discoveries { |
| 168 | +// returns an error for each discovery failing to stop |
| 169 | +func (dm *DiscoveryManager) StopAll() []error { |
| 170 | + return dm.parallelize(func(d *discovery.PluggableDiscovery) error { |
130 | 171 | state := d.State()
|
131 | 172 | if state != discovery.Syncing && state != discovery.Running {
|
132 | 173 | // Not running nor syncing, nothing to stop
|
133 |
| - continue |
| 174 | + return nil |
134 | 175 | }
|
135 |
| - err := d.Stop() |
136 |
| - if err != nil { |
137 |
| - return err |
| 176 | + |
| 177 | + if err := d.Stop(); err != nil { |
| 178 | + return fmt.Errorf("stopping discovery %s: %w", d.GetID(), err) |
138 | 179 | }
|
139 |
| - } |
140 |
| - return nil |
| 180 | + return nil |
| 181 | + }) |
141 | 182 | }
|
142 | 183 |
|
143 | 184 | // QuitAll quits all the discoveries managed by this DiscoveryManager.
|
144 |
| -// Returns the first error it meets or nil |
145 |
| -func (dm *DiscoveryManager) QuitAll() error { |
146 |
| - for _, d := range dm.discoveries { |
| 185 | +// Returns an error for each discovery that fails quitting |
| 186 | +func (dm *DiscoveryManager) QuitAll() []error { |
| 187 | + errs := dm.parallelize(func(d *discovery.PluggableDiscovery) error { |
147 | 188 | if d.State() == discovery.Dead {
|
148 | 189 | // Stop! Stop! It's already dead!
|
149 |
| - continue |
| 190 | + return nil |
150 | 191 | }
|
151 |
| - err := d.Quit() |
152 |
| - if err != nil { |
153 |
| - return err |
| 192 | + |
| 193 | + if err := d.Quit(); err != nil { |
| 194 | + return fmt.Errorf("quitting discovery %s: %w", d.GetID(), err) |
154 | 195 | }
|
155 |
| - } |
156 |
| - if dm.globalEventCh != nil { |
| 196 | + return nil |
| 197 | + }) |
| 198 | + // Close the global channel only if there were no errors |
| 199 | + // quitting all alive discoveries |
| 200 | + if len(errs) == 0 && dm.globalEventCh != nil { |
157 | 201 | close(dm.globalEventCh)
|
158 | 202 | dm.globalEventCh = nil
|
159 | 203 | }
|
160 |
| - return nil |
| 204 | + return errs |
161 | 205 | }
|
162 | 206 |
|
163 | 207 | // List returns a list of available ports detected from all discoveries
|
164 |
| -func (dm *DiscoveryManager) List() []*discovery.Port { |
165 |
| - res := []*discovery.Port{} |
| 208 | +// and a list of errors for those discoveries that returned one. |
| 209 | +func (dm *DiscoveryManager) List() ([]*discovery.Port, []error) { |
| 210 | + var wg sync.WaitGroup |
| 211 | + // Use this struct to avoid the need of two separate |
| 212 | + // channels for ports and errors. |
| 213 | + type listMsg struct { |
| 214 | + Err error |
| 215 | + Port *discovery.Port |
| 216 | + } |
| 217 | + msgChan := make(chan listMsg) |
166 | 218 | for _, d := range dm.discoveries {
|
167 |
| - if d.State() != discovery.Running { |
168 |
| - // Discovery is not running, it won't return anything |
169 |
| - continue |
170 |
| - } |
171 |
| - l, err := d.List() |
172 |
| - if err != nil { |
173 |
| - continue |
| 219 | + wg.Add(1) |
| 220 | + go func(d *discovery.PluggableDiscovery) { |
| 221 | + defer wg.Done() |
| 222 | + if d.State() != discovery.Running { |
| 223 | + // Discovery is not running, it won't return anything |
| 224 | + return |
| 225 | + } |
| 226 | + ports, err := d.List() |
| 227 | + if err != nil { |
| 228 | + msgChan <- listMsg{Err: fmt.Errorf("listing ports from discovery %s: %w", d.GetID(), err)} |
| 229 | + } |
| 230 | + for _, p := range ports { |
| 231 | + msgChan <- listMsg{Port: p} |
| 232 | + } |
| 233 | + }(d) |
| 234 | + } |
| 235 | + |
| 236 | + go func() { |
| 237 | + // Close the channel only after all goroutines are done |
| 238 | + wg.Wait() |
| 239 | + close(msgChan) |
| 240 | + }() |
| 241 | + |
| 242 | + ports := []*discovery.Port{} |
| 243 | + errs := []error{} |
| 244 | + for msg := range msgChan { |
| 245 | + if msg.Err != nil { |
| 246 | + errs = append(errs, msg.Err) |
| 247 | + } else { |
| 248 | + ports = append(ports, msg.Port) |
174 | 249 | }
|
175 |
| - res = append(res, l...) |
176 | 250 | }
|
177 |
| - return res |
| 251 | + return ports, errs |
178 | 252 | }
|
179 | 253 |
|
180 | 254 | // ListSync return the current list of ports detected from all discoveries
|
|
0 commit comments