vlambda博客
学习文章列表

玩转 Redis:简单消息队列

点击上方蓝色“ 火丁笔记 ”关注我们, 设个星标 ,每天学习全栈知识

使用go语言基于redis写了一个简单的消息队列

使用demo[2]

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

img

添加数据和获取数据的操作也是非常简单的LPUSH 从左边插入数据RPUSH 大右边插入数据LPOP 从左边取出一个数据RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间, 如果这期间有数据写入,则会读取并返回,没有数据则会返回空 在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?玩转 Redis:简单消息队列

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以, 这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

type consumer struct {
 once            sync.Once
 redisCmd        redis.Cmdable
 ctx             context.Context
 topicName       string
 handler         Handler
 rateLimitPeriod time.Duration
 options         ConsumerOptions
 _               struct{}
}

type ConsumerOptions struct {
 RateLimitPeriod time.Duration
 UseBLPop        bool
}

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
 consumer := &consumer{
  redisCmd:  redisCmd,
  ctx:       ctx,
  topicName: topicName,
 }
 for _, o := range opts {
  o(&consumer.options)
 }
 if consumer.options.RateLimitPeriod == 0 {
  consumer.options.RateLimitPeriod = time.Microsecond * 200
 }
 return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理 有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {
 HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
 go func() {
  ticker := time.NewTicker(s.options.RateLimitPeriod)
  defer func() {
   log.Println("stop get message.")
   ticker.Stop()
  }()
  for {
   select {
   case <-s.ctx.Done():
    log.Printf("context Done msg: %#v \n", s.ctx.Err())
    return
   case <-ticker.C:
    var revBody []byte
    var err error
    if !s.options.UseBLPop {
     revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
    } else {
     revs := s.redisCmd.BLPop(time.Second, s.topicName)
     err = revs.Err()
     revValues := revs.Val()
     if len(revValues) >= 2 {
      revBody = []byte(revValues[1])
     }
    }
    if err == redis.Nil {
     continue
    }
    if err != nil {
     log.Printf("LPOP error: %#v \n", err)
     continue
    }

    if len(revBody) == 0 {
     continue
    }
    msg := &Message{}
    json.Unmarshal(revBody, msg)
    if s.handler != nil {
     s.handler.HandleMessage(msg)
    }
   }
  }
 }()
}

Producer 的实现

Producer`还是很简单的就是把数据推送到 `reids
type Producer struct {
 redisCmd redis.Cmdable
 _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
 return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
 msg := NewMessage("", body)
 sendData, _ := json.Marshal(msg)
 return p.redisCmd.RPush(topicName, string(sendData)).Err()
}

原文链接:https://www.cnblogs.com/li-peng/p/12659222.html

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

参考资料

[1] [2]

使用demo: https://github.com/lpxxn/go-utils/blob/master/examples/redis_mq/simple_mq_demo1/main.go