diff --git a/arduino/monitor/monitor.go b/arduino/monitor/monitor.go index 76eeb232dd0..4e53435ebc7 100644 --- a/arduino/monitor/monitor.go +++ b/arduino/monitor/monitor.go @@ -103,24 +103,23 @@ func (mon *PluggableMonitor) String() string { return mon.id } -func (mon *PluggableMonitor) jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage) { +func jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage, log *logrus.Entry, lastError *error) { decoder := json.NewDecoder(in) for { var msg monitorMessage if err := decoder.Decode(&msg); err != nil { - mon.incomingMessagesError = err + *lastError = err close(outChan) - mon.log.Errorf("stopped decode loop: %s", err) + log.Errorf("stopped decode loop: %s", err) return } - mon.log. - WithField("event_type", msg.EventType). + log.WithField("event_type", msg.EventType). WithField("message", msg.Message). WithField("error", msg.Error). Infof("received message") if msg.EventType == "port_closed" { - mon.log.Infof("monitor port has been closed externally") + log.Infof("monitor port has been closed externally") } else { outChan <- &msg } @@ -128,13 +127,15 @@ func (mon *PluggableMonitor) jsonDecodeLoop(in io.Reader, outChan chan<- *monito } func (mon *PluggableMonitor) waitMessage(timeout time.Duration, expectedEvt string) (*monitorMessage, error) { + mon.log.WithField("expected", expectedEvt).Debugf("waiting for event") var msg *monitorMessage select { - case msg = <-mon.incomingMessagesChan: - if msg == nil { + case m, ok := <-mon.incomingMessagesChan: + if !ok { // channel has been closed return nil, mon.incomingMessagesError } + msg = m case <-time.After(timeout): return nil, fmt.Errorf(tr("timeout waiting for message")) } @@ -192,22 +193,21 @@ func (mon *PluggableMonitor) runProcess() error { messageChan := make(chan *monitorMessage) mon.incomingMessagesChan = messageChan - go mon.jsonDecodeLoop(stdout, messageChan) + go jsonDecodeLoop(stdout, messageChan, mon.log, &mon.incomingMessagesError) mon.log.Infof("Monitor process started successfully!") return nil } -func (mon *PluggableMonitor) killProcess() error { +func (mon *PluggableMonitor) killProcess() { mon.log.Infof("Killing monitor process") if err := mon.process.Kill(); err != nil { - return err + mon.log.WithError(err).Error("Sent kill signal") } if err := mon.process.Wait(); err != nil { - return err + mon.log.WithError(err).Error("Waiting for process end") } - mon.log.Infof("Monitor process killed successfully!") - return nil + mon.log.Infof("Monitor process killed") } // Run starts the monitor executable process and sends the HELLO command to the monitor to agree on the @@ -220,15 +220,10 @@ func (mon *PluggableMonitor) Run() (err error) { defer func() { // If the monitor process is started successfully but the HELLO handshake - // fails the monitor is an unusable state, we kill the process to avoid + // fails the monitor is in an unusable state, we kill the process to avoid // further issues down the line. - if err == nil { - return - } - if killErr := mon.killProcess(); killErr != nil { - // Log failure to kill the process, ideally that should never happen - // but it's best to know it if it does - mon.log.Errorf("Killing monitor after unsuccessful start: %s", killErr) + if err != nil { + mon.killProcess() } }() @@ -297,20 +292,19 @@ func (mon *PluggableMonitor) Close() error { if err := mon.sendCommand("CLOSE\n"); err != nil { return err } - _, err := mon.waitMessage(time.Second*10, "close") + _, err := mon.waitMessage(time.Millisecond*250, "close") return err } // Quit terminates the monitor. No more commands can be accepted by the monitor. func (mon *PluggableMonitor) Quit() error { + defer mon.killProcess() // ensure that killProcess is called in any case... + if err := mon.sendCommand("QUIT\n"); err != nil { return err } - if _, err := mon.waitMessage(time.Second*10, "quit"); err != nil { + if _, err := mon.waitMessage(time.Millisecond*250, "quit"); err != nil { return err } - if err := mon.killProcess(); err != nil { - mon.log.WithError(err).Info("error killing monitor process") - } return nil } diff --git a/commands/daemon/daemon.go b/commands/daemon/daemon.go index 6748e48c284..9c442070523 100644 --- a/commands/daemon/daemon.go +++ b/commands/daemon/daemon.go @@ -508,9 +508,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer if err != nil { return err } - ctx, cancel := context.WithCancel(stream.Context()) + go func() { - defer cancel() + // close port on gRPC call EOF or errors + defer portProxy.Close() + for { msg, err := stream.Recv() if errors.Is(err, io.EOF) { @@ -541,23 +543,20 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer } } }() - go func() { - defer cancel() - buff := make([]byte, 4096) - for { - n, err := portProxy.Read(buff) - if errors.Is(err, io.EOF) { - return - } - if err != nil { - stream.Send(&rpc.MonitorResponse{Error: err.Error()}) - return - } - if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil { - return - } + + buff := make([]byte, 4096) + for { + n, err := portProxy.Read(buff) + if errors.Is(err, io.EOF) { + break } - }() - <-ctx.Done() + if err != nil { + stream.Send(&rpc.MonitorResponse{Error: err.Error()}) + break + } + if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil { + break + } + } return nil } diff --git a/commands/daemon/term_example/main.go b/commands/daemon/term_example/main.go index 4b9227c5a75..0326e052871 100644 --- a/commands/daemon/term_example/main.go +++ b/commands/daemon/term_example/main.go @@ -74,52 +74,54 @@ func main() { } fmt.Println("Detected port:", port.Label, port.ProtocolLabel) + connectToPort(cli, instance, port) + time.Sleep(5 * time.Second) + connectToPort(cli, instance, port) + time.Sleep(5 * time.Second) +} + +func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Instance, port *commands.Port) { // Connect to the port monitor - fmt.Println("Connecting to monitor") + fmt.Println("Connecting to port", port) + ctx, cancel := context.WithCancel(context.Background()) - if respStream, err := cli.Monitor(ctx); err != nil { - log.Fatal("Monitor:", err) - } else { - if err := respStream.Send(&commands.MonitorRequest{ - Instance: instance, - Port: port, - }); err != nil { - log.Fatal("Monitor send-config:", err) - } - time.Sleep(1 * time.Second) - - go func() { - for { - if resp, err := respStream.Recv(); err != nil { - fmt.Println(" RECV:", err) - break - } else { - fmt.Println(" RECV:", resp) - } - } - }() + monitorClient, err := cli.Monitor(ctx) + if err != nil { + log.Fatal("Error opening Monitor:", err) + } + if err := monitorClient.Send(&commands.MonitorRequest{ + Instance: instance, + Port: port, + }); err != nil { + log.Fatal("Error sending Monitor config:", err) + } - hello := &commands.MonitorRequest{ - TxData: []byte("HELLO!"), - } - fmt.Println("Send:", hello) - if err := respStream.Send(hello); err != nil { - log.Fatal("Monitor send HELLO:", err) + go func() { + for { + resp, err := monitorClient.Recv() + if err != nil { + fmt.Println(" RECV-ERR:", err) + break + } + fmt.Println(" RECV:", resp) } + }() - fmt.Println("Send:", hello) - if err := respStream.Send(hello); err != nil { - log.Fatal("Monitor send HELLO:", err) - } + hello := &commands.MonitorRequest{ + TxData: []byte("HELLO!"), + } + fmt.Println("Send:", hello) + if err := monitorClient.Send(hello); err != nil { + log.Fatal("Monitor send HELLO:", err) + } - time.Sleep(5 * time.Second) + time.Sleep(15 * time.Second) - fmt.Println("Closing Monitor") - if err := respStream.CloseSend(); err != nil { - log.Fatal("Monitor close send:", err) - } - time.Sleep(5 * time.Second) + fmt.Println("Closing Monitor") + if err := monitorClient.CloseSend(); err != nil { + log.Fatal("Monitor close send:", err) } + <-monitorClient.Context().Done() + cancel() - time.Sleep(5 * time.Second) } diff --git a/commands/monitor/monitor.go b/commands/monitor/monitor.go index 3d2995a6b59..6690621ffab 100644 --- a/commands/monitor/monitor.go +++ b/commands/monitor/monitor.go @@ -86,10 +86,11 @@ func Monitor(ctx context.Context, req *rpc.MonitorRequest) (*PortProxy, *pluggab m.Quit() return nil, nil, &arduino.FailedMonitorError{Cause: err} } - - for _, setting := range req.GetPortConfiguration().Settings { - if err := m.Configure(setting.SettingId, setting.Value); err != nil { - logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err) + if portConfig := req.GetPortConfiguration(); portConfig != nil { + for _, setting := range portConfig.Settings { + if err := m.Configure(setting.SettingId, setting.Value); err != nil { + logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err) + } } }