From 03ed5a80b93e17bf9ee92d4dbd39a8fc9c3ae0a1 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 5 Apr 2022 00:08:35 +0200 Subject: [PATCH 1/7] Made monitor.jsonDecodeLoop more thread resilient --- arduino/monitor/monitor.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/arduino/monitor/monitor.go b/arduino/monitor/monitor.go index 76eeb232dd0..14a9d236ada 100644 --- a/arduino/monitor/monitor.go +++ b/arduino/monitor/monitor.go @@ -103,24 +103,23 @@ func (mon *PluggableMonitor) String() string { return mon.id } -func (mon *PluggableMonitor) jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage) { +func jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage, log *logrus.Entry, lastError *error) { decoder := json.NewDecoder(in) for { var msg monitorMessage if err := decoder.Decode(&msg); err != nil { - mon.incomingMessagesError = err + *lastError = err close(outChan) - mon.log.Errorf("stopped decode loop: %s", err) + log.Errorf("stopped decode loop: %s", err) return } - mon.log. - WithField("event_type", msg.EventType). + log.WithField("event_type", msg.EventType). WithField("message", msg.Message). WithField("error", msg.Error). Infof("received message") if msg.EventType == "port_closed" { - mon.log.Infof("monitor port has been closed externally") + log.Infof("monitor port has been closed externally") } else { outChan <- &msg } @@ -192,7 +191,7 @@ func (mon *PluggableMonitor) runProcess() error { messageChan := make(chan *monitorMessage) mon.incomingMessagesChan = messageChan - go mon.jsonDecodeLoop(stdout, messageChan) + go jsonDecodeLoop(stdout, messageChan, mon.log, &mon.incomingMessagesError) mon.log.Infof("Monitor process started successfully!") return nil From 21ab292aeb07e4745349f782968684ed1af46497 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 5 Apr 2022 00:09:18 +0200 Subject: [PATCH 2/7] Use idiomatic way to check for channel-closed event --- arduino/monitor/monitor.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/arduino/monitor/monitor.go b/arduino/monitor/monitor.go index 14a9d236ada..3d013729caf 100644 --- a/arduino/monitor/monitor.go +++ b/arduino/monitor/monitor.go @@ -127,13 +127,15 @@ func jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage, log *logrus.En } func (mon *PluggableMonitor) waitMessage(timeout time.Duration, expectedEvt string) (*monitorMessage, error) { + mon.log.WithField("expected", expectedEvt).Debugf("waiting for event") var msg *monitorMessage select { - case msg = <-mon.incomingMessagesChan: - if msg == nil { + case m, ok := <-mon.incomingMessagesChan: + if !ok { // channel has been closed return nil, mon.incomingMessagesError } + msg = m case <-time.After(timeout): return nil, fmt.Errorf(tr("timeout waiting for message")) } From aeb3cff50d31d4a01f8499a3dadb68778785df0a Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 5 Apr 2022 00:12:19 +0200 Subject: [PATCH 3/7] Ensure that monitor processes are killed and collected correctly --- arduino/monitor/monitor.go | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/arduino/monitor/monitor.go b/arduino/monitor/monitor.go index 3d013729caf..e78ce8bd4f4 100644 --- a/arduino/monitor/monitor.go +++ b/arduino/monitor/monitor.go @@ -199,16 +199,15 @@ func (mon *PluggableMonitor) runProcess() error { return nil } -func (mon *PluggableMonitor) killProcess() error { +func (mon *PluggableMonitor) killProcess() { mon.log.Infof("Killing monitor process") if err := mon.process.Kill(); err != nil { - return err + mon.log.WithError(err).Error("Sent kill signal") } if err := mon.process.Wait(); err != nil { - return err + mon.log.WithError(err).Error("Waiting for process end") } - mon.log.Infof("Monitor process killed successfully!") - return nil + mon.log.Infof("Monitor process killed") } // Run starts the monitor executable process and sends the HELLO command to the monitor to agree on the @@ -221,15 +220,10 @@ func (mon *PluggableMonitor) Run() (err error) { defer func() { // If the monitor process is started successfully but the HELLO handshake - // fails the monitor is an unusable state, we kill the process to avoid + // fails the monitor is in an unusable state, we kill the process to avoid // further issues down the line. - if err == nil { - return - } - if killErr := mon.killProcess(); killErr != nil { - // Log failure to kill the process, ideally that should never happen - // but it's best to know it if it does - mon.log.Errorf("Killing monitor after unsuccessful start: %s", killErr) + if err != nil { + mon.killProcess() } }() @@ -304,14 +298,13 @@ func (mon *PluggableMonitor) Close() error { // Quit terminates the monitor. No more commands can be accepted by the monitor. func (mon *PluggableMonitor) Quit() error { + defer mon.killProcess() // ensure that killProcess is called in any case... + if err := mon.sendCommand("QUIT\n"); err != nil { return err } if _, err := mon.waitMessage(time.Second*10, "quit"); err != nil { return err } - if err := mon.killProcess(); err != nil { - mon.log.WithError(err).Info("error killing monitor process") - } return nil } From b8c5fab0636b9d5ad34b4b4107d2dbea82b50d14 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 5 Apr 2022 00:13:20 +0200 Subject: [PATCH 4/7] Simplified (and fixed...) monitor stream handling There is no need to spawn two goroutines, one is enough. Also the extra context is not needed anymore. The monitor port can be closed when the goroutine with the grpc recv loop ends. --- commands/daemon/daemon.go | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/commands/daemon/daemon.go b/commands/daemon/daemon.go index 6748e48c284..9c442070523 100644 --- a/commands/daemon/daemon.go +++ b/commands/daemon/daemon.go @@ -508,9 +508,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer if err != nil { return err } - ctx, cancel := context.WithCancel(stream.Context()) + go func() { - defer cancel() + // close port on gRPC call EOF or errors + defer portProxy.Close() + for { msg, err := stream.Recv() if errors.Is(err, io.EOF) { @@ -541,23 +543,20 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer } } }() - go func() { - defer cancel() - buff := make([]byte, 4096) - for { - n, err := portProxy.Read(buff) - if errors.Is(err, io.EOF) { - return - } - if err != nil { - stream.Send(&rpc.MonitorResponse{Error: err.Error()}) - return - } - if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil { - return - } + + buff := make([]byte, 4096) + for { + n, err := portProxy.Read(buff) + if errors.Is(err, io.EOF) { + break } - }() - <-ctx.Done() + if err != nil { + stream.Send(&rpc.MonitorResponse{Error: err.Error()}) + break + } + if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil { + break + } + } return nil } From c8b380c55e0f3f27a3e826a7d6eb21a6e71187b5 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 5 Apr 2022 00:16:02 +0200 Subject: [PATCH 5/7] Fix cli crash if no configuration is provided in the Monitor gRPC call --- commands/monitor/monitor.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/commands/monitor/monitor.go b/commands/monitor/monitor.go index 3d2995a6b59..6690621ffab 100644 --- a/commands/monitor/monitor.go +++ b/commands/monitor/monitor.go @@ -86,10 +86,11 @@ func Monitor(ctx context.Context, req *rpc.MonitorRequest) (*PortProxy, *pluggab m.Quit() return nil, nil, &arduino.FailedMonitorError{Cause: err} } - - for _, setting := range req.GetPortConfiguration().Settings { - if err := m.Configure(setting.SettingId, setting.Value); err != nil { - logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err) + if portConfig := req.GetPortConfiguration(); portConfig != nil { + for _, setting := range portConfig.Settings { + if err := m.Configure(setting.SettingId, setting.Value); err != nil { + logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err) + } } } From 0bdec812ef81edd0a43df843bcbee29044bd422b Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Tue, 5 Apr 2022 00:17:18 +0200 Subject: [PATCH 6/7] Extended term_example gRPC API test coverage Now it cycles on the same port two times. --- commands/daemon/term_example/main.go | 80 ++++++++++++++-------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/commands/daemon/term_example/main.go b/commands/daemon/term_example/main.go index 4b9227c5a75..0326e052871 100644 --- a/commands/daemon/term_example/main.go +++ b/commands/daemon/term_example/main.go @@ -74,52 +74,54 @@ func main() { } fmt.Println("Detected port:", port.Label, port.ProtocolLabel) + connectToPort(cli, instance, port) + time.Sleep(5 * time.Second) + connectToPort(cli, instance, port) + time.Sleep(5 * time.Second) +} + +func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Instance, port *commands.Port) { // Connect to the port monitor - fmt.Println("Connecting to monitor") + fmt.Println("Connecting to port", port) + ctx, cancel := context.WithCancel(context.Background()) - if respStream, err := cli.Monitor(ctx); err != nil { - log.Fatal("Monitor:", err) - } else { - if err := respStream.Send(&commands.MonitorRequest{ - Instance: instance, - Port: port, - }); err != nil { - log.Fatal("Monitor send-config:", err) - } - time.Sleep(1 * time.Second) - - go func() { - for { - if resp, err := respStream.Recv(); err != nil { - fmt.Println(" RECV:", err) - break - } else { - fmt.Println(" RECV:", resp) - } - } - }() + monitorClient, err := cli.Monitor(ctx) + if err != nil { + log.Fatal("Error opening Monitor:", err) + } + if err := monitorClient.Send(&commands.MonitorRequest{ + Instance: instance, + Port: port, + }); err != nil { + log.Fatal("Error sending Monitor config:", err) + } - hello := &commands.MonitorRequest{ - TxData: []byte("HELLO!"), - } - fmt.Println("Send:", hello) - if err := respStream.Send(hello); err != nil { - log.Fatal("Monitor send HELLO:", err) + go func() { + for { + resp, err := monitorClient.Recv() + if err != nil { + fmt.Println(" RECV-ERR:", err) + break + } + fmt.Println(" RECV:", resp) } + }() - fmt.Println("Send:", hello) - if err := respStream.Send(hello); err != nil { - log.Fatal("Monitor send HELLO:", err) - } + hello := &commands.MonitorRequest{ + TxData: []byte("HELLO!"), + } + fmt.Println("Send:", hello) + if err := monitorClient.Send(hello); err != nil { + log.Fatal("Monitor send HELLO:", err) + } - time.Sleep(5 * time.Second) + time.Sleep(15 * time.Second) - fmt.Println("Closing Monitor") - if err := respStream.CloseSend(); err != nil { - log.Fatal("Monitor close send:", err) - } - time.Sleep(5 * time.Second) + fmt.Println("Closing Monitor") + if err := monitorClient.CloseSend(); err != nil { + log.Fatal("Monitor close send:", err) } + <-monitorClient.Context().Done() + cancel() - time.Sleep(5 * time.Second) } From 3d86e8f02d68404590fd1ec2c2ffba731aedefe7 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Wed, 6 Apr 2022 15:25:15 +0200 Subject: [PATCH 7/7] Reduce timeout for monitor close/quit commands to 250ms --- arduino/monitor/monitor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arduino/monitor/monitor.go b/arduino/monitor/monitor.go index e78ce8bd4f4..4e53435ebc7 100644 --- a/arduino/monitor/monitor.go +++ b/arduino/monitor/monitor.go @@ -292,7 +292,7 @@ func (mon *PluggableMonitor) Close() error { if err := mon.sendCommand("CLOSE\n"); err != nil { return err } - _, err := mon.waitMessage(time.Second*10, "close") + _, err := mon.waitMessage(time.Millisecond*250, "close") return err } @@ -303,7 +303,7 @@ func (mon *PluggableMonitor) Quit() error { if err := mon.sendCommand("QUIT\n"); err != nil { return err } - if _, err := mon.waitMessage(time.Second*10, "quit"); err != nil { + if _, err := mon.waitMessage(time.Millisecond*250, "quit"); err != nil { return err } return nil