Skip to content

Fixed some weird behaviour in gRPC Monitor command #1706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions arduino/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,38 +103,39 @@ func (mon *PluggableMonitor) String() string {
return mon.id
}

func (mon *PluggableMonitor) jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage) {
func jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage, log *logrus.Entry, lastError *error) {
decoder := json.NewDecoder(in)

for {
var msg monitorMessage
if err := decoder.Decode(&msg); err != nil {
mon.incomingMessagesError = err
*lastError = err
close(outChan)
mon.log.Errorf("stopped decode loop: %s", err)
log.Errorf("stopped decode loop: %s", err)
return
}
mon.log.
WithField("event_type", msg.EventType).
log.WithField("event_type", msg.EventType).
WithField("message", msg.Message).
WithField("error", msg.Error).
Infof("received message")
if msg.EventType == "port_closed" {
mon.log.Infof("monitor port has been closed externally")
log.Infof("monitor port has been closed externally")
} else {
outChan <- &msg
}
}
}

func (mon *PluggableMonitor) waitMessage(timeout time.Duration, expectedEvt string) (*monitorMessage, error) {
mon.log.WithField("expected", expectedEvt).Debugf("waiting for event")
var msg *monitorMessage
select {
case msg = <-mon.incomingMessagesChan:
if msg == nil {
case m, ok := <-mon.incomingMessagesChan:
if !ok {
// channel has been closed
return nil, mon.incomingMessagesError
}
msg = m
case <-time.After(timeout):
return nil, fmt.Errorf(tr("timeout waiting for message"))
}
Expand Down Expand Up @@ -192,22 +193,21 @@ func (mon *PluggableMonitor) runProcess() error {

messageChan := make(chan *monitorMessage)
mon.incomingMessagesChan = messageChan
go mon.jsonDecodeLoop(stdout, messageChan)
go jsonDecodeLoop(stdout, messageChan, mon.log, &mon.incomingMessagesError)

mon.log.Infof("Monitor process started successfully!")
return nil
}

func (mon *PluggableMonitor) killProcess() error {
func (mon *PluggableMonitor) killProcess() {
mon.log.Infof("Killing monitor process")
if err := mon.process.Kill(); err != nil {
return err
mon.log.WithError(err).Error("Sent kill signal")
}
if err := mon.process.Wait(); err != nil {
return err
mon.log.WithError(err).Error("Waiting for process end")
}
mon.log.Infof("Monitor process killed successfully!")
return nil
mon.log.Infof("Monitor process killed")
}

// Run starts the monitor executable process and sends the HELLO command to the monitor to agree on the
Expand All @@ -220,15 +220,10 @@ func (mon *PluggableMonitor) Run() (err error) {

defer func() {
// If the monitor process is started successfully but the HELLO handshake
// fails the monitor is an unusable state, we kill the process to avoid
// fails the monitor is in an unusable state, we kill the process to avoid
// further issues down the line.
if err == nil {
return
}
if killErr := mon.killProcess(); killErr != nil {
// Log failure to kill the process, ideally that should never happen
// but it's best to know it if it does
mon.log.Errorf("Killing monitor after unsuccessful start: %s", killErr)
if err != nil {
mon.killProcess()
}
}()

Expand Down Expand Up @@ -297,20 +292,19 @@ func (mon *PluggableMonitor) Close() error {
if err := mon.sendCommand("CLOSE\n"); err != nil {
return err
}
_, err := mon.waitMessage(time.Second*10, "close")
_, err := mon.waitMessage(time.Millisecond*250, "close")
return err
}

// Quit terminates the monitor. No more commands can be accepted by the monitor.
func (mon *PluggableMonitor) Quit() error {
defer mon.killProcess() // ensure that killProcess is called in any case...

if err := mon.sendCommand("QUIT\n"); err != nil {
return err
}
if _, err := mon.waitMessage(time.Second*10, "quit"); err != nil {
if _, err := mon.waitMessage(time.Millisecond*250, "quit"); err != nil {
return err
}
if err := mon.killProcess(); err != nil {
mon.log.WithError(err).Info("error killing monitor process")
}
return nil
}
37 changes: 18 additions & 19 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
if err != nil {
return err
}
ctx, cancel := context.WithCancel(stream.Context())

go func() {
defer cancel()
// close port on gRPC call EOF or errors
defer portProxy.Close()

for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
Expand Down Expand Up @@ -541,23 +543,20 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}
}
}()
go func() {
defer cancel()
buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
return
}

buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
if errors.Is(err, io.EOF) {
break
}
}()
<-ctx.Done()
if err != nil {
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
break
}
if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
break
}
}
return nil
}
80 changes: 41 additions & 39 deletions commands/daemon/term_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,52 +74,54 @@ func main() {
}
fmt.Println("Detected port:", port.Label, port.ProtocolLabel)

connectToPort(cli, instance, port)
time.Sleep(5 * time.Second)
connectToPort(cli, instance, port)
time.Sleep(5 * time.Second)
}

func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Instance, port *commands.Port) {
// Connect to the port monitor
fmt.Println("Connecting to monitor")
fmt.Println("Connecting to port", port)

ctx, cancel := context.WithCancel(context.Background())
if respStream, err := cli.Monitor(ctx); err != nil {
log.Fatal("Monitor:", err)
} else {
if err := respStream.Send(&commands.MonitorRequest{
Instance: instance,
Port: port,
}); err != nil {
log.Fatal("Monitor send-config:", err)
}
time.Sleep(1 * time.Second)

go func() {
for {
if resp, err := respStream.Recv(); err != nil {
fmt.Println(" RECV:", err)
break
} else {
fmt.Println(" RECV:", resp)
}
}
}()
monitorClient, err := cli.Monitor(ctx)
if err != nil {
log.Fatal("Error opening Monitor:", err)
}
if err := monitorClient.Send(&commands.MonitorRequest{
Instance: instance,
Port: port,
}); err != nil {
log.Fatal("Error sending Monitor config:", err)
}

hello := &commands.MonitorRequest{
TxData: []byte("HELLO!"),
}
fmt.Println("Send:", hello)
if err := respStream.Send(hello); err != nil {
log.Fatal("Monitor send HELLO:", err)
go func() {
for {
resp, err := monitorClient.Recv()
if err != nil {
fmt.Println(" RECV-ERR:", err)
break
}
fmt.Println(" RECV:", resp)
}
}()

fmt.Println("Send:", hello)
if err := respStream.Send(hello); err != nil {
log.Fatal("Monitor send HELLO:", err)
}
hello := &commands.MonitorRequest{
TxData: []byte("HELLO!"),
}
fmt.Println("Send:", hello)
if err := monitorClient.Send(hello); err != nil {
log.Fatal("Monitor send HELLO:", err)
}

time.Sleep(5 * time.Second)
time.Sleep(15 * time.Second)

fmt.Println("Closing Monitor")
if err := respStream.CloseSend(); err != nil {
log.Fatal("Monitor close send:", err)
}
time.Sleep(5 * time.Second)
fmt.Println("Closing Monitor")
if err := monitorClient.CloseSend(); err != nil {
log.Fatal("Monitor close send:", err)
}
<-monitorClient.Context().Done()

cancel()
time.Sleep(5 * time.Second)
}
9 changes: 5 additions & 4 deletions commands/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ func Monitor(ctx context.Context, req *rpc.MonitorRequest) (*PortProxy, *pluggab
m.Quit()
return nil, nil, &arduino.FailedMonitorError{Cause: err}
}

for _, setting := range req.GetPortConfiguration().Settings {
if err := m.Configure(setting.SettingId, setting.Value); err != nil {
logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err)
if portConfig := req.GetPortConfiguration(); portConfig != nil {
for _, setting := range portConfig.Settings {
if err := m.Configure(setting.SettingId, setting.Value); err != nil {
logrus.Errorf("Could not set configuration %s=%s: %s", setting.SettingId, setting.Value, err)
}
}
}

Expand Down