Skip to content

Added monitor rate-limiting functionality #1221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions arduino/monitors/null.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// This file is part of arduino-cli.
//
// Copyright 2021 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to license@arduino.cc.

package monitors

import (
"log"
"time"
)

// NullMonitor outputs zeros at a constant rate and discards anything sent
type NullMonitor struct {
started time.Time
sent int
bps float64
}

// OpenNullMonitor creates a monitor that outputs the same character at a fixed
// rate.
func OpenNullMonitor(bytesPerSecondRate float64) *NullMonitor {
log.Printf("Started streaming at %f\n", bytesPerSecondRate)
return &NullMonitor{
started: time.Now(),
bps: bytesPerSecondRate,
}
}

// Close the connection
func (mon *NullMonitor) Close() error {
return nil
}

// Read bytes from the port
func (mon *NullMonitor) Read(bytes []byte) (int, error) {
for {
elapsed := time.Now().Sub(mon.started).Seconds()
n := int(elapsed*mon.bps) - mon.sent
if n == 0 {
// Delay until the next char...
time.Sleep(time.Millisecond)
continue
}
if len(bytes) < n {
n = len(bytes)
}
mon.sent += n
for i := 0; i < n; i++ {
bytes[i] = 0
}
return n, nil
}
}

// Write bytes to the port
func (mon *NullMonitor) Write(bytes []byte) (int, error) {
// Discard all chars
return len(bytes), nil
}
98 changes: 78 additions & 20 deletions commands/daemon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
package daemon

import (
"fmt"
"errors"
"io"
"sync/atomic"

"github.com/arduino/arduino-cli/arduino/monitors"
rpc "github.com/arduino/arduino-cli/rpc/monitor"
Expand All @@ -39,7 +40,7 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
// ensure it's a config message and not data
config := msg.GetMonitorConfig()
if config == nil {
return fmt.Errorf("first message must contain monitor configuration, not data")
return errors.New("first message must contain monitor configuration, not data")
}

// select which type of monitor we need
Expand All @@ -61,13 +62,34 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
if mon, err = monitors.OpenSerialMonitor(config.GetTarget(), int(baudRate)); err != nil {
return err
}

case rpc.MonitorConfig_NULL:
if addCfg, ok := config.GetAdditionalConfig().AsMap()["OutputRate"]; !ok {
mon = monitors.OpenNullMonitor(100.0) // 100 bytes per second as default
} else if outputRate, ok := addCfg.(float64); !ok {
return errors.New("OutputRate in Null monitor must be a float64")
} else {
// get the Monitor instance
mon = monitors.OpenNullMonitor(outputRate)
}
}

// we'll use these channels to communicate with the goroutines
// handling the stream and the target respectively
streamClosed := make(chan error)
targetClosed := make(chan error)

// set rate limiting window
bufferSize := int(config.GetRecvRateLimitBuffer())
rateLimitEnabled := (bufferSize > 0)
if !rateLimitEnabled {
bufferSize = 1024
}
buffer := make([]byte, bufferSize)
bufferUsed := 0

var writeSlots int32

// now we can read the other messages and re-route to the monitor...
go func() {
for {
Expand All @@ -84,6 +106,11 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
break
}

if rateLimitEnabled {
// Increase rate limiter write slots
atomic.AddInt32(&writeSlots, msg.GetRecvAcknowledge())
}

if _, err := mon.Write(msg.GetData()); err != nil {
// error writing to target
targetClosed <- err
Expand All @@ -94,27 +121,58 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e

// ...and read from the monitor and forward to the output stream
go func() {
buf := make([]byte, 8)
dropBuffer := make([]byte, 10240)
dropped := 0
for {
n, err := mon.Read(buf)
if err != nil {
// error reading from target
targetClosed <- err
break
if bufferUsed < bufferSize {
if n, err := mon.Read(buffer[bufferUsed:]); err != nil {
// error reading from target
targetClosed <- err
break
} else if n == 0 {
// target was closed
targetClosed <- nil
break
} else {
bufferUsed += n
}
} else {
// FIXME: a very rare condition but still...
// we may be waiting here while, in the meantime, a transmit slot is
// freed: in this case the (filled) buffer will stay in the server
// until the following Read exits (-> the next char arrives from the
// monitor).

if n, err := mon.Read(dropBuffer); err != nil {
// error reading from target
targetClosed <- err
break
} else if n == 0 {
// target was closed
targetClosed <- nil
break
} else {
dropped += n
}
}

if n == 0 {
// target was closed
targetClosed <- nil
break
}

if err = stream.Send(&rpc.StreamingOpenResp{
Data: buf[:n],
}); err != nil {
// error sending to stream
streamClosed <- err
break
slots := atomic.LoadInt32(&writeSlots)
if !rateLimitEnabled || slots > 0 {
if err = stream.Send(&rpc.StreamingOpenResp{
Data: buffer[:bufferUsed],
Dropped: int32(dropped),
}); err != nil {
// error sending to stream
streamClosed <- err
break
}
bufferUsed = 0
dropped = 0

// Rate limit, filling all the available window
if rateLimitEnabled {
slots = atomic.AddInt32(&writeSlots, -1)
}
}
}
}()
Expand Down
11 changes: 11 additions & 0 deletions commands/daemon/term_example/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/arduino/arduino-cli/term_example

go 1.16

replace github.com/arduino/arduino-cli => ../../..

require (
github.com/arduino/arduino-cli v0.0.0-20200109150215-ffa84fdaab21
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.25.0
)
Loading