| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package redisqueue
- import (
- "context"
- "fmt"
- "regexp"
- "strconv"
- "strings"
- "github.com/pkg/errors"
- "github.com/redis/go-redis/v9"
- )
- var redisVersionRE = regexp.MustCompile(`redis_version:(.+)`)
- // RedisOptions is an alias to redis.Options so that users can this instead of
- // having to import go-redis directly.
- type RedisOptions = redis.Options
- // newRedisClient creates a new Redis client with the given options. If options
- // is nil, it will use default options.
- func newRedisClient(options *RedisOptions) *redis.Client {
- if options == nil {
- options = &RedisOptions{}
- }
- return redis.NewClient(options)
- }
- // redisPreflightChecks makes sure the Redis instance backing the *redis.Client
- // offers the functionality we need. Specifically, it also that it can connect
- // to the actual instance and that the instance supports Redis streams (i.e.
- // it's at least v5).
- func redisPreflightChecks(client redis.UniversalClient) error {
- info, err := client.Info(context.TODO(), "server").Result()
- if err != nil {
- return err
- }
- match := redisVersionRE.FindAllStringSubmatch(info, -1)
- if len(match) < 1 {
- return fmt.Errorf("could not extract redis version")
- }
- version := strings.TrimSpace(match[0][1])
- parts := strings.Split(version, ".")
- major, err := strconv.Atoi(parts[0])
- if err != nil {
- return err
- }
- if major < 5 {
- return fmt.Errorf("redis streams are not supported in version %q", version)
- }
- return nil
- }
- // incrementMessageID takes in a message Id (e.g. 1564886140363-0) and
- // increments the index section (e.g. 1564886140363-1). This is the next valid
- // Id value, and it can be used for paging through messages.
- func incrementMessageID(id string) (string, error) {
- parts := strings.Split(id, "-")
- index := parts[1]
- parsed, err := strconv.ParseInt(index, 10, 64)
- if err != nil {
- return "", errors.Wrapf(err, "error parsing message Id %q", id)
- }
- return fmt.Sprintf("%s-%d", parts[0], parsed+1), nil
- }
|