Open
Description
When XGroupCreateMkStream is called in blocking mode (Block = 0), call does not get interrupted by cancelling context.
Expected Behavior
Blocking function interrupts when context is cancelled
Current Behavior
Function continues to block after context cancellation
Possible Solution
Unsure yet
Steps to Reproduce
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v9"
"github.com/google/uuid"
)
func main() {
rdb := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{"localhost:6379"},
Password: "", // no password set
DB: 0, // use default DB
})
defer rdb.Close()
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
for idx := 0; idx < 5; idx++ {
fmt.Printf("Waiting %v...\n", idx)
time.Sleep(time.Second)
}
cancelFn()
fmt.Printf("Cancelled context and now expect blocking XGroupCreateMkStream to be interrupted...\n")
}()
name := "blag"
streamName := name
groupName := name + "-blah"
_, err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Result()
fmt.Printf("%v\n", err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
objs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: uuid.NewString(),
Streams: []string{streamName, ">"},
Count: 100,
Block: 0,
}).Result()
fmt.Printf("%v, %v\n", err, objs)
}()
wg.Wait()
fmt.Printf("Done.\n")
}
Context (Environment)
I have two goroutines concurrently performing XREADGROUP and XADD in blocking mode. XADD is triggered by external events and is not guaranteed to add items to the stream at any particular cadence or pattern. Shutting down reading goroutine is not possible due to the blocking call that does not get interrupted by context concellation.
Detailed Description
Blocking calls should interrupt when context is cancelled and connection closed.
Possible Implementation
N/A
Metadata
Metadata
Assignees
Labels
No labels