前台注册时,给用户的邮箱异步发送邮件用于激活账户.服务器资源紧张不能引入MQ队列来实现,所以用JUC写了个消费者功能废话不多说直接上代码
/** * 描述: 消费者线程 * date: 2020/6/22 0022 **/ @Slf4j public class ConsumerTask implements Runnable { public void run() { doConsumer(); } private void doConsumer() { ConsumerTaskHolder.lock.lock(); try{ while (true){ while (ConsumerTaskHolder.queue.size() == 0){ log.info("队列中无数据等待数据中......"); ConsumerTaskHolder.notEmpty.await(); } //消费邮箱 String email = ConsumerTaskHolder.queue.poll(); log.info("收到邮箱:{},发送邮件......",email); // 处理逻辑 发送邮件..... some action } } catch (InterruptedException e) { e.printStackTrace(); } finally { ConsumerTaskHolder.lock.unlock(); } } }
/** * 描述: 提供者主要用于给队列推送数据 * date: 2020/6/22 0022 **/ public class ProviderTask { /** * 给队列推送数据 * @param email */ public static void pushQueue(String email){ ConsumerTaskHolder.lock.lock(); try { ConsumerTaskHolder.queue.add(email); //唤醒消费者线程处理任务啦 ConsumerTaskHolder.notEmpty.signalAll(); }finally { ConsumerTaskHolder.lock.unlock(); } } }
/** * 描述: 消费者相关资源 * date: 2020/6/22 0022 **/ public class ConsumerTaskHolder { public static Lock lock = new ReentrantLock(); public static Condition notEmpty = lock.newCondition(); /** * 存放数据的队列 */ public static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue(); }
/** * 描述: 线程池启动加载 * date: 2020/6/22 0022 **/ @Slf4j @Component @Order(0) public class ConsumerThreadPool implements ApplicationRunner { /** * 线程池 */ public static final ExecutorService executorService = Executors.newFixedThreadPool(5); public void run(ApplicationArguments args) { log.info("加载线程池中........."); try { ConsumerTask consumerTask = new ConsumerTask(); executorService.submit(consumerTask); log.info("线程池启动完成......"); } catch (Exception e) { log.error("线程池启动报错了......."); e.printStackTrace(); } } /** * 项目销毁前执行 */ @PreDestroy public void destroy() { try { executorService.shutdown();//优雅的关闭 log.info("关闭线程池...."); // List runnables = executorService.shutdownNow();//直接关闭 // log.debug("未执行完的任务,"+runnables); } catch (Exception e) { log.error("线程池关闭失败...", e); e.printStackTrace(); } } }
@RequestMapping("/system") @Controller public class SystemController { @Autowired UserService userService; @GetMapping("/{path}") public String index(@PathVariable String path){ return path; } @ResponseBody @PostMapping("/register") public Map register(User user){ Map resultMap = new HashMap(); userService.register(user); return resultMap; } }
@Slf4j @Service public class UserServiceImpl implements UserService { public void register(User user) { String email = user.getEmail(); //给队列推送邮箱账号过去,队列处理发送邮件 ProviderTask.pushQueue(email); //some action log.info("保存用户信息到数据库"); } }
项目启动后消费者就会等待数据,生产者推送一个数据过去后唤醒消费者,消费者线程接收到唤醒信号去消费数据.反复如此,由于是Demo所以代码比较简单,实际业务中的问题自行完善即可.
项目地址