diff --git a/fluent/fluent.go b/fluent/fluent.go index 7216991..22f7657 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -49,6 +49,7 @@ var randomGenerator = rand.Uint64 type Config struct { FluentPort int `json:"fluent_port"` FluentHost string `json:"fluent_host"` + FluentHosts []string `json:"fluent_hosts"` FluentNetwork string `json:"fluent_network"` FluentSocketPath string `json:"fluent_socket_path"` Timeout time.Duration `json:"timeout"` @@ -116,8 +117,10 @@ type Fluent struct { // time at which the most recent connection to fluentd-address was established. latestReconnectTime time.Time - muconn sync.RWMutex - conn net.Conn + muconn sync.RWMutex + conn net.Conn + conns []net.Conn + currConnId int } type dialer interface { @@ -139,6 +142,7 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { config.FluentNetwork = defaultNetwork } if config.FluentHost == "" { + fmt.Fprintf(os.Stderr, "fluent#New: FluentHost is now deprecated, please use FluentHosts instead") config.FluentHost = defaultHost } if config.FluentPort == 0 { @@ -181,15 +185,17 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { pending: make(chan *msgToSend, config.BufferLimit), pendingMutex: sync.RWMutex{}, muconn: sync.RWMutex{}, + currConnId: 0, } f.wg.Add(1) go f.run(ctx) } else { f = &Fluent{ - Config: config, - dialer: d, - muconn: sync.RWMutex{}, + Config: config, + dialer: d, + muconn: sync.RWMutex{}, + currConnId: 0, } err = f.connect(context.Background()) } @@ -430,6 +436,13 @@ func (f *Fluent) close() { f.conn.Close() f.conn = nil } + + if len(f.conns) > 0 { + for i, conn := range f.conns { + conn.Close() + f.conns[i] = nil + } + } } // connect establishes a new connection using the specified transport. Caller should @@ -437,9 +450,24 @@ func (f *Fluent) close() { func (f *Fluent) connect(ctx context.Context) (err error) { switch f.Config.FluentNetwork { case "tcp": - f.conn, err = f.dialer.DialContext(ctx, - f.Config.FluentNetwork, - f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) + if len(f.Config.FluentHosts) > 0 { + for _, host := range f.Config.FluentHosts { + conn, err := f.dialer.DialContext(ctx, + f.Config.FluentNetwork, + host+":"+strconv.Itoa(f.Config.FluentPort)) + if err != nil { + return err + } + + f.conns = append(f.conns, conn) + } + } else { + // If FluentHosts is not set, use FluentHost and f.conn + f.conn, err = f.dialer.DialContext(ctx, + f.Config.FluentNetwork, + f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) + f.conns = append(f.conns, f.conn) + } case "tls": tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify} f.conn, err = tls.DialWithDialer( @@ -569,6 +597,7 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error { // muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in // the case of panic recovering. func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { + f.selectConnection() closer := func() { f.muconn.Lock() defer f.muconn.Unlock() @@ -635,3 +664,17 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { return false, nil } + +// selectConnection selects the next available connection using round-robin. +func (f *Fluent) selectConnection() net.Conn { + f.muconn.RLock() + defer f.muconn.RUnlock() + + var currConnId = (f.currConnId + 1) % len(f.conns) + if f.conns[currConnId] != nil { + f.currConnId = currConnId + f.conn = f.conns[f.currConnId] + } + + return f.conn +}