江苏省建设厅网站查询,深圳网站建设ejaket,中国机械加工网招聘信息,饶阳网站建设前言碎语
当系统流量负载比较高时#xff0c;业务日志的写入操作也要纳入系统性能考量之内#xff0c;如若处理不当#xff0c;将影响系统的正常业务操作#xff0c;之前写过一篇《spring boot通过MQ消费log4j2的日志》的博文#xff0c;采用了RabbitMQ消息中间件来存储抗…
前言碎语
当系统流量负载比较高时业务日志的写入操作也要纳入系统性能考量之内如若处理不当将影响系统的正常业务操作之前写过一篇《spring boot通过MQ消费log4j2的日志》的博文采用了RabbitMQ消息中间件来存储抗高并发下的日志因为引入了中间件操作使用起来可能没那么简便今天分享使用多线程消费阻塞队列的方式来处理我们的海量日志
waht阻塞队列
阻塞队列BlockingQueue是区别于普通队列多了两个附加操作的线程安全的队列。这两个附加的操作是在队列为空时获取元素的线程会等待队列变为非空。当队列满时存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景生产者是往队列里添加元素的线程消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器而消费者也只从容器里拿元素。
1.声明存储固定消息的队列
/*** Created by kl on 2017/3/20.* Content :销售操作日志队列*/
public class SalesLogQueue{//队列大小public static final int QUEUE_MAX_SIZE 1000;private static SalesLogQueue alarmMessageQueue new SalesLogQueue();//阻塞队列private BlockingQueueblockingQueue new LinkedBlockingQueue(QUEUE_MAX_SIZE);private SalesLogQueue(){}public static SalesLogQueue getInstance() {return alarmMessageQueue;}/*** 消息入队* param salesLog* return*/public boolean push(SalesLog salesLog) {return this.blockingQueue.add(salesLog);//队列满了就抛出异常不阻塞}/*** 消息出队* return*/public SalesLog poll() {SalesLog result null;try {result this.blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}return result;}/*** 获取队列大小* return*/public int size() {return this.blockingQueue.size();}
}
ps:因为业务原因采用add的方式入队队列满了就抛异常不阻塞
2.消息入队
消息入队可以在任何需要保存日志的地方操作如aop统一拦截日志处理filter过滤请求日志处理或者耦合的业务日志记住不阻塞入队操作不然将影响正常的业务操作如下为filter统一处理请求日志
/*** Created by kl on 2017/3/20.* Content :访问请求拦截保存操作日志*/
public class SalesLogFilter implements Filter {private RoleResourceService resourceService;Overridepublic void init(FilterConfig filterConfig) throws ServletException {ServletContext context filterConfig.getServletContext();ApplicationContext ctx WebApplicationContextUtils.getWebApplicationContext(context);resourceService ctx.getBean(RoleResourceService.class);}Overridepublic void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {try {HttpServletRequest request (HttpServletRequest) servletRequest;String requestUrl request.getRequestURI();String requestTyperequest.getMethod();String ipAddress HttpClientUtil.getIpAddr(request);Map resourceresourceService.getResource();String contextresource.get(requestUrl);//动态url正则匹配if(StringUtil.isNull(context)){for(Map.Entry entry:resource.entrySet()){String resourceUrl entry.getKey();if(requestUrl.matches(resourceUrl)){contextentry.getValue();break;}}}SalesLog lognew SalesLog();log.setCreateDate(new Timestamp(System.currentTimeMillis()));log.setContext(context);log.setOperateUser(UserTokenUtil.currentUser.get().get(realname));log.setRequestIp(ipAddress);log.setRequestUrl(requestUrl);log.setRequestType(requestType);SalesLogQueue.getInstance().push(log);}catch (Exception e){e.printStackTrace();}filterChain.doFilter(servletRequest, servletResponse);}Overridepublic void destroy() {}
}
3.消息出队被消费
BlockingQueue是线程安全的所以可以放心的在多个线程中去处理队列中的消息如下代码声明了一个两个大小的固定线程池并添加了两个线程去处理队列中的消息 /*** Created by kl on 2017/3/20.* Content :启动消费操作日志队列的线程*/
Component
public class ConsumeSalesLogQueue {AutowiredSalesLogService salesLogService;PostConstructpublic void startrtThread() {ExecutorService e Executors.newFixedThreadPool(2);//两个大小的固定线程池e.submit(new PollSalesLog(salesLogService));e.submit(new PollSalesLog(salesLogService));}class PollSalesLog implements Runnable {SalesLogService salesLogService;public PollSalesLog(SalesLogService salesLogService) {this.salesLogService salesLogService;}Overridepublic void run() {while (true) {try {SalesLog salesLog SalesLogQueue.getInstance().poll();if(salesLog!null){salesLogService.saveSalesLog(salesLog);}} catch (Exception e) {e.printStackTrace();}}}}
}
参考博文如下对BlockingQueue队列更多了解可读一读如下的博文 http://blog.csdn.net/vernonzheng/article/details/8247564 http://www.infoq.com/cn/articles/java-blocking-queue http://wsmajunfeng.iteye.com/blog/1629354