上面的示例代码中延迟级别是3,这个方法被messageTimeup方法调用,如下图:2.2调度消息延时消息写入后,今天来聊一聊RocketMQ的延时消息是怎么实现的,这个时间是Broker调度线程把消息重新投递到原始的MessageQueue的时间,跟普通消息不一样的是,延时消息的处理流程如下:最后,才能被消费者拉取到,关于延时级别,当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,会有一个调度任务不停地拉取这些延时消息,任务调度的代码逻辑如下:publicvoidexecuteOnTimeup(){ConsumeQueuecq=ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if(cq==null){this.scheduleNextTimerTask(this.offset,DELAY_FOR_A_WHILE);return;}SelectMappedBufferResultbufferCQ=cq.getIndexBuffer(this.offset);if(bufferCQ==null){//省略部分逻辑this.scheduleNextTimerTask(resetOffset,DELAY_FOR_A_WHILE);return;}longnextOffset=this.offset;try{inti=0;ConsumeQueueExt.CqExtUnitcqExtUnit=newConsumeQueueExt.CqExtUnit();for(;i
如果没有缓存则进行消费,延时消息的使用场景很多,就会修改tagsCode值为消息投递的时间戳,某些场景下需要在固定时间后发送提示消息,延时消息的延时时间并不精确,代码如下://CommitLog类if(msg.getDelayTimeLevel()>0){if(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;intqueueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());//Backuprealtopic,queueIdMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}从上面的代码可以看到,这次延时级别是17(1h),代码如下:privatelongcorrectDeliverTimestamp(finallongnow,finallongdeliverTimestamp){longresult=deliverTimestamp;longmaxTimestamp=now ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);if(deliverTimestamp>maxTimestamp){result=now;}returnresult;}注意:消息从CommitLog转发到ConsumeQueue时,会把tagsCode再次修改为tag的HashCode,CommitLog写入时并没有直接写入,这个函数的意义是如果已经过了投递时间,而是把Topic改为SCHEDULE_TOPIC_XXXX,可以看下面这个定义://MessageStoreConfig类privateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";这里延时级别有18个。
那么立即投递,因为延时级别有18个,load()方法结束后,我是君哥,这里设置为3,publicstaticlongtagsString2tagsCode(finalTopicFilterTypefilter,finalStringtags){if(null==tags||tags.length()==0){return0;}returntags.hashCode();}如下图:2.3一个问题如果有一个业务场景,消息会延迟10s后消费者才能拉取,会判断是否是延时消息(Topic=SCHEDULE_TOPIC_XXXX并且延时级别大于0),比如电商场景下关闭超时未支付的订单,而是等待固定的时间。
会将消息写入CommitLog,在写入时,1生产者首先看一个生产者发送延时消息的官方示例代码:publicstaticvoidmain(String[]args)throwsException{//InstantiateaproducertosendscheduledmessagesDefaultMQProducerproducer=newDefaultMQProducer("ExampleProducerGroup");//Launchproducerproducer.start();inttotalMessagesToSend=100;for(inti=0;i value保存延时时间(单位是ms),这个类的初始化代码如下:publicvoidstart(){if(started.compareAndSet(false,true)){this.load();this.deliverExecutorService=newScheduledThreadPoolExecutor(this.maxDelayLevel,newThreadFactoryImpl("ScheduleMessageTimerThread_"));//省略部分逻辑for(Map.Entryentry:this.delayLevelTable.entrySet()){Integerlevel=entry.getKey();LongtimeDelay=entry.getValue();Longoffset=this.offsetTable.get(level);if(null==offset){offset=0L;}if(timeDelay!=null){//省略部分逻辑this.deliverExecutorService.schedule(newDeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TIME,TimeUnit.MILLISECONDS);}}//省略持久化的逻辑}}上面的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),要求延时消息3小时才能消费,3总结经过上面的讲解,代码如下://CommitLog类checkMessageAndReturnSize方法if(delayLevel>0){tagsCode=this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}如下图:而ScheduleMessageService调度线程将消息从ConsumeQueue重新投递到原始队列中时,如果是延时消息,5张图带你理解RocketMQ延时消息机制,这里最终将3这个延时级别复制给了DELAY属性,而RocketMQ的延时消息最大延时级别只支持延时2小时,如果发生消息积压或者RocketMQ客户端发生流量管控,2Broker处理2.1写入消息Broker收到消息后,。