redis.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package redisqueue
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "github.com/pkg/errors"
  9. "github.com/redis/go-redis/v9"
  10. )
  11. var redisVersionRE = regexp.MustCompile(`redis_version:(.+)`)
  12. // RedisOptions is an alias to redis.Options so that users can this instead of
  13. // having to import go-redis directly.
  14. type RedisOptions = redis.Options
  15. // newRedisClient creates a new Redis client with the given options. If options
  16. // is nil, it will use default options.
  17. func newRedisClient(options *RedisOptions) *redis.Client {
  18. if options == nil {
  19. options = &RedisOptions{}
  20. }
  21. return redis.NewClient(options)
  22. }
  23. // redisPreflightChecks makes sure the Redis instance backing the *redis.Client
  24. // offers the functionality we need. Specifically, it also that it can connect
  25. // to the actual instance and that the instance supports Redis streams (i.e.
  26. // it's at least v5).
  27. func redisPreflightChecks(client redis.UniversalClient) error {
  28. info, err := client.Info(context.TODO(), "server").Result()
  29. if err != nil {
  30. return err
  31. }
  32. match := redisVersionRE.FindAllStringSubmatch(info, -1)
  33. if len(match) < 1 {
  34. return fmt.Errorf("could not extract redis version")
  35. }
  36. version := strings.TrimSpace(match[0][1])
  37. parts := strings.Split(version, ".")
  38. major, err := strconv.Atoi(parts[0])
  39. if err != nil {
  40. return err
  41. }
  42. if major < 5 {
  43. return fmt.Errorf("redis streams are not supported in version %q", version)
  44. }
  45. return nil
  46. }
  47. // incrementMessageID takes in a message Id (e.g. 1564886140363-0) and
  48. // increments the index section (e.g. 1564886140363-1). This is the next valid
  49. // Id value, and it can be used for paging through messages.
  50. func incrementMessageID(id string) (string, error) {
  51. parts := strings.Split(id, "-")
  52. index := parts[1]
  53. parsed, err := strconv.ParseInt(index, 10, 64)
  54. if err != nil {
  55. return "", errors.Wrapf(err, "error parsing message Id %q", id)
  56. }
  57. return fmt.Sprintf("%s-%d", parts[0], parsed+1), nil
  58. }