当前位置:首页 > 百科杂文

rocketmq

发布日期:2022-11-08 02:00:47

上面的示例代码中延迟级别是3,这个方法被messageTimeup方法调用,如下图:2.2调度消息延时消息写入后,今天来聊一聊RocketMQ的延时消息是怎么实现的,这个时间是Broker调度线程把消息重新投递到原始的MessageQueue的时间,跟普通消息不一样的是,延时消息的处理流程如下:最后,才能被消费者拉取到,关于延时级别,当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,会有一个调度任务不停地拉取这些延时消息,任务调度的代码逻辑如下:publicvoidexecuteOnTimeup(){ConsumeQueuecq=ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if(cq==null){this.scheduleNextTimerTask(this.offset,DELAY_FOR_A_WHILE);return;}SelectMappedBufferResultbufferCQ=cq.getIndexBuffer(this.offset);if(bufferCQ==null){//省略部分逻辑this.scheduleNextTimerTask(resetOffset,DELAY_FOR_A_WHILE);return;}longnextOffset=this.offset;try{inti=0;ConsumeQueueExt.CqExtUnitcqExtUnit=newConsumeQueueExt.CqExtUnit();for(;i0){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);return;}MessageExtmsgExt=ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy,sizePy);if(msgExt==null){continue;}MessageExtBrokerInnermsgInner=ScheduleMessageService.this.messageTimeup(msgExt);//事务消息判断省略booleandeliverSuc;//只保留同步deliverSuc=this.syncDeliver(msgInner,msgExt.getMsgId(),nextOffset,offsetPy,sizePy);if(!deliverSuc){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);return;}}nextOffset=this.offset (i/ConsumeQueue.CQ_STORE_UNIT_SIZE);}catch(Exceptione){log.error("ScheduleMessageService,messageTimeupexecuteerror,offset={}",nextOffset,e);}finally{bufferCQ.release();}this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);}这段代码可以参考下面的流程图来进行理解:上面有一个修正投递时间的函数,创建18个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度,这个逻辑在类ScheduleMessageService。

如果没有缓存则进行消费,延时消息的使用场景很多,就会修改tagsCode值为消息投递的时间戳,某些场景下需要在固定时间后发送提示消息,延时消息的延时时间并不精确,代码如下://CommitLog类if(msg.getDelayTimeLevel()>0){if(msg.getDelayTimeLevel()>this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()){msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic=TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;intqueueId=ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());//Backuprealtopic,queueIdMessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}从上面的代码可以看到,这次延时级别是17(1h),代码如下:privatelongcorrectDeliverTimestamp(finallongnow,finallongdeliverTimestamp){longresult=deliverTimestamp;longmaxTimestamp=now ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);if(deliverTimestamp>maxTimestamp){result=now;}returnresult;}注意:消息从CommitLog转发到ConsumeQueue时,会把tagsCode再次修改为tag的HashCode,CommitLog写入时并没有直接写入,这个函数的意义是如果已经过了投递时间,而是把Topic改为SCHEDULE_TOPIC_XXXX,可以看下面这个定义://MessageStoreConfig类privateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";这里延时级别有18个。

那么立即投递,因为延时级别有18个,load()方法结束后,我是君哥,这里设置为3,publicstaticlongtagsString2tagsCode(finalTopicFilterTypefilter,finalStringtags){if(null==tags||tags.length()==0){return0;}returntags.hashCode();}如下图:2.3一个问题如果有一个业务场景,消息会延迟10s后消费者才能拉取,会判断是否是延时消息(Topic=SCHEDULE_TOPIC_XXXX并且延时级别大于0),比如电商场景下关闭超时未支付的订单,而是等待固定的时间。

会将消息写入CommitLog,在写入时,1生产者首先看一个生产者发送延时消息的官方示例代码:publicstaticvoidmain(String[]args)throwsException{//InstantiateaproducertosendscheduledmessagesDefaultMQProducerproducer=newDefaultMQProducer("ExampleProducerGroup");//Launchproducerproducer.start();inttotalMessagesToSend=100;for(inti=0;i

value保存延时时间(单位是ms),这个类的初始化代码如下:publicvoidstart(){if(started.compareAndSet(false,true)){this.load();this.deliverExecutorService=newScheduledThreadPoolExecutor(this.maxDelayLevel,newThreadFactoryImpl("ScheduleMessageTimerThread_"));//省略部分逻辑for(Map.Entryentry:this.delayLevelTable.entrySet()){Integerlevel=entry.getKey();LongtimeDelay=entry.getValue();Longoffset=this.offsetTable.get(level);if(null==offset){offset=0L;}if(timeDelay!=null){//省略部分逻辑this.deliverExecutorService.schedule(newDeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TIME,TimeUnit.MILLISECONDS);}}//省略持久化的逻辑}}上面的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),要求延时消息3小时才能消费,3总结经过上面的讲解,代码如下://CommitLog类checkMessageAndReturnSize方法if(delayLevel>0){tagsCode=this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}如下图:而ScheduleMessageService调度线程将消息从ConsumeQueue重新投递到原始队列中时,如果是延时消息,5张图带你理解RocketMQ延时消息机制,这里最终将3这个延时级别复制给了DELAY属性,而RocketMQ的延时消息最大延时级别只支持延时2小时,如果发生消息积压或者RocketMQ客户端发生流量管控,2Broker处理2.1写入消息Broker收到消息后,。

举报

9月14日将迎来超级月亮,教你如何拍摄美妙瞬间

随着傍晚到来,你的眼睛会被天空中超级明亮的月亮所吸引。9月14日,月球距离地球最近,是今年的超级月亮。超级月亮是指月亮与地球距离...

2024-05-02 17:33:46
带鱼怎么做法最好吃

带鱼是一种非常常见和受欢迎的海鲜食材,在中国各地都有独特的烹饪方法。下面介绍一种简单易做的带鱼红烧做法,让你品尝到最美味的带鱼。...

2024-05-02 16:35:09
何以解忧唯有杜康,它到底是什么意思?

在中国文化中,有一句古话说:“何以解忧,唯有杜康。”这句话常常出现在文学作品中,引发了人们的好奇,那么杜康到底是什么意思呢?杜康...

2024-05-02 16:22:44
探秘世界最美的丹霞地貌

中国拥有着世界上最为壮观的丹霞地貌群落,雄伟壮观的红岩地貌景观远近闻名。丹霞地貌可以分为九大类,包括砂砾丘、菱锰矿、红砂岩、重晶...

2024-05-02 16:04:32
法国的鳄鱼会游泳

鳄鱼是一种生活在淡水和咸水沼泽及静水中的爬行动物。听起来在法国,我们也会看到鳄鱼出没。在法国,养鳄鱼并不是一件罕见的事情,而且在...

2024-05-02 15:28:53
金秀贤电影又有新作品了!

金秀贤是韩国娱乐圈的新生代男演员,拥有颜值与演技的他备受年轻人喜爱。最近,金秀贤的新电影《恋人们》将要上映,引发了广泛的关注。《...

2024-05-02 14:48:36
从Chinapost看国内快递业的未来

Chinapost是中国邮政管理局下属的快递品牌,近年来在国内市场上发展迅猛。在我国互联网快递行业的大背景下,Chinapost...

2024-05-02 14:42:30
皮鞋尺码解析

皮鞋是现代人常常穿着的一种时尚鞋类,不仅舒适耐穿,而且可以增添个人风格。但是,选择合适的尺码对于穿着皮鞋来说至关重要。那么,如何...

2024-05-02 13:55:17
电磁门吸:安全便捷的门禁设备

电磁门吸是一种广泛应用于门禁系统的安全设备,其通过电磁力固定门体,以实现自动打开和关闭门的功能。电磁门吸由电磁控制器和吸力部分组...

2024-05-02 13:41:48

进入高中后是每位男生都要面对的事情——换个酷炫的发型,让人一眼就记住你!因为发型无论对男生还是女生来说,都有着十分重要的作用。一...

2024-05-02 13:27:23