消息中间件mq怎么实现的

文章资讯 2020-07-16 22:15:26

消息中间件mq怎么实现的

目录:
1.什么是消息中间件MQ
2.为什么要使用消息中间件
3.消息队列有什么优缺点?
4.常用的消息中间件有哪些?常用中间件之间的对比?
5.常用的中间件推荐?
6.使用中间件的架构是什么样的?
7.生产者消费者模式中,客户机和服务器谁是生产者,谁是消费者?
8.中间件有哪些常见问题?如何解决这些问题?
9.什么是幂等性?
10.幂等性的实现方式有哪些?如何实现幂等性?
11.常用中间件详细介绍:
12.RabbitMQ介绍
13.RabbitMQ的工作模式有哪几种?
14.如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?
15.如何保证RabbitMQ消息的可靠传输?
16.为什么不应该对所有的message都使用持久化机制?
17.如何保证高可用的RabbitMQ的集群?
18.如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?1.什么消息中间件MQ?面向消息的中间件,简称为消息中间件,是一类以消息为载体进行通信的中间件,利用高效可靠的消息机制来实现不同应用间大量的数据交换。按其通信模型的不同,消息中间件的通信模型有两类:消息队列和消息传递。通过这两种消息模型,不同应用之间的通信和网络的复杂性脱离,摆脱对不同通信协议的依赖,可以在复杂的网络环境中高可靠、高效率的实现安全的异步通信。消息中间件的非直接连接,支持多种通信规程,达到多个系统之间的数据的共享和同步。面向消息中间件是一类常用的中间件。
点击回到目录
2.为什么要使用消息中间件?1、系统解耦:
传统系统之间的耦合性太强,牵一发而动全身就是这个道理。redis的sentinel模式就是使用的是消息中间件的原理。再举一个例子,比如你要把自己的房子租给别人,那么你可以自己去找租客,找不到就会一直找。但是你(生产者)把房子(消息)交给房产中介(中间件),你就不需要再找寻租客了,中间件的作用类似房产中介,你把消息交给中间件,就可以去忙别的了,什么时候有租房的,就可以去中间件里找需要的消息。
2、异步调用
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
3、流量削峰
流量削峰一般在秒杀活动中应用广泛,加入消息中间件,可以缓解短时间内的高并发请求。
4、日志处理
解决大量日志传输。
5、消息通讯
消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
3.消息队列有什么优缺点?使用消息中间件的优点:
1、系统解耦
2、异步调用
3、流量削峰
4、日志处理
5、消息通讯
使用消息中间件的缺点:
1、系统可用性降低
2、因为引入了外部依赖中间件,那么中间件挂掉,整套系统也就挂掉了。
3、系统复杂度提高
4、一致性问题,生产者把消息提交给中间件,消费者取走一条数据,此时中间件将数据标记为已消费,但此时消费者宕机了,数据没有成功消费完,此时就产生了数据一致性问题。
点击回到目录
4.常用的消息中间件有哪些?常用中间件之间的对比?
ActiveMQ
RabbitMQ
RocketMQ
Kafka
ZeroMQ
单机吞吐量
比RabbitMQ低
2.6ws(消息做持久化)
11.6ws
17.3ws
29ws
开发语言
Java
Erlang
Java
ScalaJava
C
主要维护者
Aache
MozillaSring
Ababa
Aache
iMatix,创始人
成熟度
成熟
成熟
开源版本不够成熟
比较成熟
只有C、PHP等版本成熟
订阅形式
点对点(2)、广播(发布-订阅)
提供了4种:direct,toic,Headers和fanout。fanout就是广播模式
基于toicmessageTag以及按照消息类型、属性进行正则匹配的发布订阅模式
基于toic以及按照toic进行正则匹配的发布订阅模式
点对点(2)
持久化
支持少量堆积
支持少量堆积
支持大量堆积
支持大量堆积
不支持
顺序消息
不支持
不支持
支持
支持
不支持
性能稳定性


一般
较差
很好
集群方式
支持简单集群模式,比如’主-备’,对高级集群模式支持不好。
支持简单集群,'复制’模式,对高级集群模式支持不好。
常用多对’Master-Slave’模式,开源版本需手动切换Slave变成Master
天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave
不支持
管理界面
一般
较好
一般


点击回到目录
5.常用的中间件推荐?中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;
大型公司,基础架构研发实力较强,用RocketMQ是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的。
点击回到目录
6.使用中间件的架构是什么样的?客户端—中间件—服务器端。也可以叫做:
生产者—消息队列—消费者。
7.生产者消费者模式中,客户机和服务器谁是生产者,谁是消费者?客户机一般是产生数据的,所以是生产者;服务器是处理客户机产生的数据的,所以是消费者。
点击回到目录
8.中间件有哪些常见问题?如何解决这些问题?1、消息的顺序问题;
2、消息的重复问题。1、什么是消息的顺序性问题:
data1和data2是有顺序的,必须data1先执行,data2后执行;这两个数据被不同的消费者消费到了,可能data2先执行,data1后执行,这样原来的顺序就错乱了。
如何解决消息的顺序性问题(概括的说):
①:单线程消费来保证消息的顺序性;
②:保证生产者-中间件-消费者是一对一的关系;
③:对消息进行编号,消费者处理时根据编号判断顺序。
如何解决消息的顺序性问题(具体的说)?
①当中间件是RabbitMQ时:
在RabbitMQ里面创建多个队列,同一规则的数据(对唯一标识进行hash),有顺序的放入RabbitMQ的队列里面,消费者只取一个队列里面获取数据消费,这样执行的顺序是有序的。或者还是只有一个queue但是对应一个消费者,然后这个消费者内部用内存队列做排队,然后分发给底层不同的worker来处理。
②当中间件是Kafka时:
在消费端使用内存队列,队列里的数据使用hash进行分发,每个线程对应一个队列,这样可以保证数据的顺序。
③当中间件是RocketMQ时:
生产者中把orderId进行取模,把相同模的数据放到messagequeue里面,消费者消费同一个messagequeue,只要消费者这边有序消费,那么可以保证数据被顺序消费。
④当中间件是ActiveMQ时:
ActiveMQ里面有messageGrous属性,可以指定JMSXGrouID,消费者会消费指定的JMSXGrouID。即保证了顺序性,又解决负载均衡的问题。2、什么是消息的重复问题:
造成消息重复的根本原因是:网络不可达。中间件发送data1给消费者,但是由于网络延迟的原因,一直停留在网络中,延迟一段时间中间件没有收到消费者的确认,就会再次发送data1,第二次发送的data1经过其他的没有延迟的路由器到达了消费者,恰恰此时第一次发送的data1的网络通了,也到达了消费者,这就造成了数据的重复问题。
如何解决消息的重复问题?
①:消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。
②:保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
点击回到目录
9.什么是幂等性?幂等性是指,用户对同一操作(如在新建a.txt并写入helloworld),发起了一次请求还是多次请求,对服务器的影响是一致的,不会请求几次就新建几个a.txt文件并写入helloworld。
幂等性的适用领域:
比如第一次付款时实际支付成功,但是信息返回时网络中断导致系统误判,需要用户重新支付一次,但肯定不能让用户支付双倍的价钱啊;又比如第一次付款的确失败了,但第二次付款时发生意外,导致支付请求被重复发送等等。在一次支付的过程中,每个环节都有可能会发生问题,我们要如何规避这类问题引发的风险?幂等性是解决这类问题的方案之一,所以在电商,银行,互联网金融等对数据准确性要求很高的领域中,这一特性具有十分重要的地位。10.幂等性的实现方式有哪些?如何实现幂等性?1、MVCC方案:
多版本并发控制,该策略主要使用udatewicondition(更新带条件)来保证多次外部请求调用对系统的影响是一致的。在系统设计的过程中,合理的使用乐观锁,通过version或者udateTime(timestam)等其他条件,来做乐观锁的判断条件,这样保证更新操作即使在并发的情况下,也不会有太大的问题。
2、去重表:
在插入数据的时候,插入去重表,利用数据库的唯一索引特性,保证唯一的逻辑。这种方法适用于在业务中有唯一标的插入场景中,比如在支付场景中,如果一个订单只会支付一次,所以订单ID可以作为唯一标识。这时,我们就可以建一张去重表,并且把唯一标识作为唯一索引,在我们实现时,把创建支付单据和写入去去重表,放在一个事务中,如果重复创建,数据库会抛出唯一约束异常,操作就会回滚。
3、悲观锁:
selectforudate,整个执行过程中锁定该订单对应的记录。注意:悲观锁在DB读大于写的情况下尽量少用。
4、select+insert:
并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,在进行业务处理,就可以了。注意:核心高并发流程不要用这种方法。
5、状态机幂等:
在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机,就是业务单据上面有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,这时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的变更,理论上是不能够变更的,这样的话,保证了有限状态机的幂等。这种方法适合在有状态机流转的情况下,比如就会订单的创建和付款,订单的付款肯定是在之前,这时我们可以通过在设计状态字段时,使用int类型,并且通过值类型的大小来做幂等,比如订单的创建为0,付款成功为1。付款失败为2。
6、token机制:
防止页面重复提交。数据提交前要向服务器申请token,token放到redis或jvm内存,用户访问服务器需要提交token并让后台校验token,,token有效时间过期后需要删除token,用户再次访问时生成新的token返回。token特点:要申请,一次有效性,可以限流。
7、对外提供接口的ai:
如银联提供的付款接口:需要接入商户提交付款请求时附带:source来源,seq序列号。source+seq在数据库里面做唯一索引,防止多次付款,(并发时,只能处理一个请求)
8、全局唯一ID:
如果使用全局唯一ID,就是根据业务的操作和内容生成一个全局ID,在执行操作前先根据这个全局唯一ID是否存在,来判断这个操作是否已经执行。如果不存在则把全局ID,存储到存储系统中,比如数据库、redis等。如果存在则表示该方法已经执行。
点击回到目录
11.常用中间件详细介绍:1、RabbitMQ:
使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP,SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Loadbalance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
2、ActiveMQ:
Aache下的一个子项目。使用Java完全支持JMS1.1和J2EE1.4规范的JMSProvider实现,少量代码就可以高效地实现高级应用场景。可插拔的传输协议支持,比如:in-VM,TCP,SSL,NIO,UDP,mticast,JGrousandJXTAansorts。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端C++、Java、.Net,、Pyon、Ph、Ruby等。
3、Redis:
使用C语言开发的一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
4、Kafka:
Aache下的一个子项目,使用scala实现的一个高性能分布式PubshSubscribe消息队列系统,具有以下特性:
①快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
②高吞吐:在一台普通的服务器上既可以达到10Ws的吞吐速率;
③高堆积:支持toic下消费者较长时间离线,消息堆积量大;
④完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeer自动实现复杂均衡;
⑤支持Hadoo数据并行加载:对于像Hadoo的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
5、ZeroMQ:
号称最快的消息队列系统,专门为高吞吐量低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ能够实现RabbitMQ不擅长的高级复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。因此ZeroMQ具有一个独特的非中间件的模式,更像一个socketary,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序本身就是使用ZeroMQAPI完成逻辑服务的角色。但是ZeroMQ仅提供非持久性的队列,如果down机,数据将会丢失。如:Twitter的Storm中使用ZeroMQ作为数据流的传输。ZeroMQ套接字是与传输层无关的:ZeroMQ套接字对所有传输层协议定义了统一的API接口。默认支持进程内(inroc),进程间(IPC),多播,TCP协议,在不同的协议之间切换只要简单的改变连接字符串的前缀。可以在任何时候以最小的代价从进程间的本地通信切换到分布式下的TCP通信。ZeroMQ在背后处理连接建立,断开和重连逻辑。ZeroMQ具有以下几个特性:
①无锁的队列模型:对于跨线程间的交互(用户端和session)之间的数据交换通道ie,采用无锁的队列算法CAS;在ie的两端注册有异步事件,在读或者写消息到ie的时,会自动触发读写事件。
②批量处理的算法:对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
③多核下的线程绑定,无须CPU切换:区别于传统的多线程并发模式,信号量或者临界区,zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。
点击回到目录
12.RabbitMQ介绍:Broker:简单来说就是消息队列服务器实体;
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列;
Queue:消息队列载体,每个消息都会被投入到一个或多个队列;
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来;
RoutingKey:路由关键字,exchange根据这个关键字进行消息投递;
VHost:vhost可以理解为虚拟oker,即mini-RabbitMQserver。其内部均含有独立的queue、exchange和binding等,但最最重要的是,其拥有独立的权限系统,可以做到vhost范围的用户控制。当然,从RabbitMQ的全局角度,vhost可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的vhost中);
Producer:消息生产者,就是投递消息的程序;
Consumer:消息消费者,就是接受消息的程序;
Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。
13.RabbitMQ的工作模式有哪几种?1、simle模式(即最简单的收发模式)①消息产生消息,将消息放入队列
②消息的消费者(consumer)监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
2、work工作模式(资源的竞争)①消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize)保证一条消息只能被一个消费者使用)。
3、ubshsubscribe发布订阅(共享资源)①每个消费者监听自己的队列;
②生产者将消息发给oker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
4、routing路由模式①消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
②根据业务功能定义路由字符串;
③从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中;
④业务场景:error通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。
5、toic主题模式(路由模式的一种)①星号井号代表通配符;
②星号代表多个单词,井号代表一个单词;
③路由功能添加模糊匹配;
④消息产生者产生消息,把消息交给交换机;
⑤交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)
6、RPC模式
①客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个‘回调’的请求的队列地址。
点击回到目录
14.如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?1、发送方确认模式:
①将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。
②一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。
③如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。
④发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
2、接收方确认机制
①消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
②这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;
③下面罗列几种特殊情况:
③-①如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
③-②如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
15.如何保证RabbitMQ消息的可靠传输?消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;
1、生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供ansaction和confirm模式来确保生产者不丢消息;
①ansaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
②confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;RabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2、消息队列丢数据:消息持久化。
①处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,RabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
3、RabbitMQ如何持久化?
①队列持久化需要在声明队列时添加参数durable=True,这样在rabbitmq崩溃时也能保存队列
②仅仅使用durable=True,只能持久化队列,不能持久化消息
③消息持久化需要在消息生成时,添加参数roerties=ika.BasicProerties(devery_mode=2)
点击回到目录
16.为什么不应该对所有的message都使用持久化机制?1、首先,必然导致性能的下降,因为写磁盘比写RAM慢的多,message的吞吐量可能有10倍的差距。
2、其次,message的持久化机制用在RabbitMQ的内置cluster方案时会出现“坑爹”问题。矛盾点在于,若message设置了ersistent属性,但queue未设置durable属性,那么当该queue的ownernode出现异常后,在未重建该queue前,发往该queue的message将被blackholed;若message设置了ersistent属性,同时queue也设置了durable属性,那么当queue的ownernode异常且无法重启的情况下,则该queue无法在其他node上重建,只能等待其ownernode重启后,才能恢复该queue的使用,而在这段时间内发送给该queue的message将被blackholed。
3、所以,是否要对message进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到100,000条秒以上的消息吞吐量(单RabbitMQ服务器),则要么使用其他的方式来确保message的可靠devery,要么使用非常快速的存储系统以支持全持久化(例如使用SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。
17.如何保证高可用的RabbitMQ的集群?RabbitMQ是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以RabbitMQ为例子讲解第一种MQ的高可用性怎么实现。RabbitMQ有三种模式:单机模式、普通集群模式、镜像集群模式。
1、单机模式,就是Demo级别的;新手使用
2、普通集群模式,意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。你创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。
3、镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上。RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个queue的完整数据,别的consumer都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ一个queue的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个queue的完整数据。
点击回到目录
18.如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?消息积压处理办法:临时紧急扩容:
先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉。
新建一个toic,artition是原来的10倍,临时建立好原先10倍的queue数量。
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer机器来消费消息。
MQ中消息失效:假设你用的是RabbitMQ,RabbtiMQ是可以设置过期时间的,也就是TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。
mq消息队列块满了:如果消息积压在mq里,你很长时间都没有处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
点击回到目录后边的还没改格式,先写到这,明天接着做细化。