java中有界阻塞队列有很多,但是用Map实现的默认是没有的,本文则提供了一个用Map实现的有界队列。
可能有人会问为啥要用map做有界队列呢,用map做队列的好处在哪?显而易见,map最厉害之处就是可以根据key快速定位队列元素,而不用通过循环队列元素的方法,从而显著的提高了程序的效率。
笔者最近做的一个java应用有这样一个场景:我的服务消费mq队列然后将mq中的消息分发给很多下游应用,这个过程是异步的,然后为了记录响应的超时时间,在投递给下游应用之前把请求记录到一个阻塞队列中,然后当请求的响应回来后通过特定的key遍历此队列获取到对应的请求,然后比较时间分析出是否超时,然后这个队列设定了一个固定大小当此队列大小达到最大值后,其put方法就会阻塞住MQ的消费线程,从而实现了限流的目的。
结果压测(MQ蓄峰100万数据量)运行中发现,cpu使用率高达300%,后来通过线程栈分析出是对LinkedBlockingQueue频繁遍历导致的cpu过高。后来基于我这个场景果断自己写了一个用Map实现的有界阻塞队列,cpu使用率直接降到了100多。
废话不说了,上代码。
package com.test.queue; import com.google.common.collect.Maps; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.ReentrantLock; /** * 阻塞的有界队列 */ public class MapBlockingQueue<K, E> extends LinkedBlockingQueue { final ReentrantLock lock = new ReentrantLock(); private final AbstractQueuedSynchronizer.ConditionObject notFull = (AbstractQueuedSynchronizer.ConditionObject) lock.newCondition(); private final Map<K, E> map = Maps.newConcurrentMap(); private int capacity; public MapBlockingQueue() { } public MapBlockingQueue(int capacity) { this.capacity = capacity; } public void put(K k, E e) throws InterruptedException { lock.lock(); try { if (map.size() > capacity) { notFull.await(); } map.put(k, e); } finally { lock.unlock(); } } public E get(K k) { return map.get(k); } }
Buy Gonorrhea Treatment Online Usa Cicqsk https://bestadalafil.com/ – cialis generic buy Cialis Buy Generic Propranolol Online Abhmzg Rybztw https://bestadalafil.com/ – buy generic cialis online cheap