vlambda博客
学习文章列表

Redis上的分布式Java队列

在本文中,我们将使用Redisson Java框架讨论六种不同类型的基于Redis的分布式队列。

在Redis中使用队列

Redis是一个功能强大的工具,支持从字符串和列表到映射和流的许多不同类型的数据结构。开发人员将Redis用作数据库,缓存和消息代理。

像任何消息代理一样,Redis需要以正确的顺序发送消息。可以根据消息的年龄或某些其他预定义的优先级等级发送消息。

为了存储这些未决消息,Redis开发人员需要队列数据结构。Redisson是使用Redis和Java进行分布式编程的框架,它提供了许多分布式数据结构(包括队列)的实现。

Redisson通过提供Java API使Redis开发更加容易。Redisson不需要开发人员学习Redis命令,而是包含所有众所周知的Java接口,例如Queue和BlockingQueue。Redisson还处理Redis中繁琐的幕后工作,例如连接管理,故障转移处理和数据序列化。


基于Redis的分布式Java队列

Redisson提供了Java中基本队列数据结构的多个基于Redis的实现,每种实现都有不同的功能。这使您可以选择最适合您目的的队列类型。

下面,我们将使用Redisson Java框架讨论六种不同类型的基于Redis的分布式队列。

队列

RQueueRedisson中对象实现java.util.Queue接口。队列用于应从最旧的元素开始处理(也称为“先进先出”或FIFO)的情况。

与普通Java一样,RQueue可以使用peek()方法检查的第一个元素,或使用方法检查并删除它poll()

RQueue<SomeObject> queue = redisson.getQueue("anyQueue"); queue.add(new SomeObject());      SomeObject obj = queue.peek();  SomeObject someObj = queue.poll();

阻塞队列

RBlockingQueueRedisson中对象实现java.util.BlockingQueue接口。

BlockingQueues是阻塞线程的队列,这些线程试图从空队列中进行轮询,或者试图在已满的队列中插入元素。该线程将被阻塞,直到另一个线程将一个元素插入空队列中,或从完整队列中轮询第一个元素为止。

下面的示例代码演示的正确实例化和使用RBlockingQueue特别是,您可以poll()使用指定参数方法来调用该方法,这些参数指定线程将等待元素变为可用状态的时间:

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");queue.offer(new SomeObject());   SomeObject obj = queue.peek();   SomeObject someObj = queue.poll();     SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

在对Redis的服务器故障转移或重新连接的时候,poll()pollFromAny()pollLastAndOfferFirstTo(),和take()Java方法会自动重新订阅。

BoundedBlockingQueue

RBoundedBlockingQueueRedisson中对象实现了有界阻塞队列结构。有界阻塞队列是容量已受到限制(即有限)的阻塞队列。

下面的代码演示了如何实例化和使用RBoundedBlockingQueueRedisson。trySetCapacity()方法用于尝试设置阻塞队列的容量。trySetCapacity()返回布尔值“ true”或“ false”,取决于是否成功设置了容量或是否已经设置了容量:

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue"); queue.trySetCapacity(2);queue.offer(new SomeObject(1));  queue.offer(new SomeObject(2)); // 将被阻塞,直到队列中的可用空间可用queue.put(new SomeObject());     SomeObject obj = queue.peek();SomeObject someObj = queue.poll();SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

延迟队列

RDelayedQueueRedisson中对象允许您在Redis中实现延迟队列。当使用诸如指数补偿的策略将消息传递给消费者时,这可能很有用。在每次尝试发送邮件失败之后,重试之间的时间将成倍增加。

在与元素一起指定的延迟之后,延迟队列中的每个元素都将被传送到目标队列。此目标队列可以是实现RQueue接口的任何队列,例如RBlockingQueueRBoundedBlockingQueue

RQueue<String> destinationQueue = redisson.getQueue("anyQueue");   RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);     // 在10秒内将对象移到destinationQueuedelayedQueue.offer("msg1"10, TimeUnit.SECONDS);     // 在1分钟内将对象移至destinationQueuedelayedQueue.offer("msg2"1, TimeUnit.MINUTES);

在不再需要队列之后,通过使用destroy()方法销毁延迟的队列是一个好主意。但是,如果要关闭Redisson,则没有必要。

PriorityQueue

RPriorityQueueRedisson中对象实现java.util.Queue接口。优先级队列是不是按元素的使用期限而是按照与每个元素相关联的优先级排序的队列。

如下面的示例代码所示,RPriorityQueue使用Comparator对队列中的元素进行排序:

RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");    queue.trySetComparator(new MyComparator()); // 设置对象比较器     queue.add(3);      queue.add(1);        queue.add(2);       queue.removeAsync(0);queue.addAsync(5);     queue.poll();

PriorityBlockingQueue

Redisson中RPriorityBlockingQueue对象结合了的功能像一样使用来对队列中的元素进行排序。RPriorityQueueRBlockingQueueRPriorityQueueRPriorityBlockingQueueComparator

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");    queue.trySetComparator(new MyComparator()); // //设置对象比较器   queue.add(3);      queue.add(1);     queue.add(2);     queue.removeAsync(0);    queue.addAsync(5);       queue.take();

在故障转移或重新连接到Redis服务器的过程poll()pollLastAndOfferFirstTo(),,和take()Java方法会自动重新预订。


 
   
   
 

感觉不错的话,麻烦动动小手关注一下小编啦


更多精彩请点击

1.