You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
消息队列的操作,就是写入和读取,所以首先可以想到的是把 list 作为一个消息队列。对于生产者,使用 lpush 写入数据到队列头部,消费者通过轮询的方式,每次循环使用 rpop 从队列尾部取出一条消息进行处理。通过 rpop 取出数据后,数据就不在 list 中存在了,所以不同消费者就只能取到不同的数据。另外消费这也可以通过 brpop 阻塞式地从 list 中读取消息,brpop 会在 list 中没有任何元素的时候阻塞连接,这种方式效率更高。
读取数据时的行锁,最容易想到的就是 select * from queue where id = 1 for update,该 SQL 会给 id 为 1 的行加上排他锁,当一个 worker 执行该 SQL 的时候,别的 worker 就无法读取这一行。但如果 select ... for updat 中的 where 条件没有索引,行锁就会升级为表锁。而我们最开始操作数据的时候,是不知道 id 的。所以没办法直接使用这条 SQL 来查询需要处理的数据,需要另辟蹊径。
其实也很简单,假设我们有 10 个 worker 来消费队列,那么每个 worker 可以先批量查询 100 条数据,然后随机选择其中一条,得到对应的 id。接下来再使用 select * from queue where id = ? for update 来给每个 worker 需要处理的一行数据加锁。流程如下:
select * from queue order by id asc, retry_times asc limit 100
随机选择一行数据,得到 id
读取该行数据并加锁 select * from queue where id = ? for update
消息队列本质上是一个存储介质,通常是链表结构,不同的进程或线程可以向消息队列中写入或读取消息。消息队列的使用场景有很多,比如异步处理任务、应用解耦、流量削锋等等。通常我们使用消息队列,都是直接使用 MNS、RocketMQ、Kafka 等产品。但某些场景下这些产品也难以满足,或者使用起来成本比较高,比如:批量创建大量(比如 1 万条)消息,并且要么都写入,要么不写入。这时候就需要考虑一些别的方案了。总的来说该场景主要需要满足以下几个条件:
接下来就以上述的场景为例,分别分析不同消息队列所面临的问题,以及如何使用 MySQL 来实现一个支持批量事务消息队列。
基于 MNS / RocketMQ
MNS 是阿里云提供的一个分布式消息服务,RocketMQ 是阿里云基于 Apache RocketMQ 构建的分布式消息中间件。
MNS 可以使用 SendMessage 接口向队列发送一条消息,也可以使用 BatchSendMessage 批量创建消息,但该接口一次最多发送 16 条消息。而 RocketMQ 不支持批量创建消息。
所以对于一般的消息队列产品,只能通过多次调用发送单条/(有限的)多条创建消息的接口,来实现批量创建大量消息。但多次调用接口,就很难保证这些调用的事务性,很难保证这一批消息要么都成功写入消息队列,要么都不写入。
综上,MNS、RocketMQ 等消息队列,比较难实现事务性地批量创建消息。
基于 Redis Lists
Redis 是一个经常用来做消息队列的数据库。Redis 的 lists 是一个链表,基于 lists 可以很方便实现一个轻量级的消息队列。
lpush/rpop 或 lpush/brpop
消息队列的操作,就是写入和读取,所以首先可以想到的是把 list 作为一个消息队列。对于生产者,使用 lpush 写入数据到队列头部,消费者通过轮询的方式,每次循环使用 rpop 从队列尾部取出一条消息进行处理。通过 rpop 取出数据后,数据就不在 list 中存在了,所以不同消费者就只能取到不同的数据。另外消费这也可以通过 brpop 阻塞式地从 list 中读取消息,brpop 会在 list 中没有任何元素的时候阻塞连接,这种方式效率更高。
从 2.4 开始,lpush 支持传入多个 elements:
LPUSH key element [element ...]
,这样我们可以很方便使用lpush
批量向队列中写入消息。但
lpush/rpop
这种方案实现的消息队列是不可靠的。例如,当消费这通过 rpop 取出消息后,出现了网络问题或者消费者端崩溃了, 那么这个消息就丢失了。所以一般不能简单使用 lpush/rpop 来实现消息队列。rpoplpush 或 brpoplpush
那么如何使消息队列可靠呢?可以通过 rpoplpush (或 brpoplpush)来实现。
rpoplpush source destination
可以从source
列表中取出一个元素,并将该元素写入到destination
列表,这两个操作是一个原子操作。比如我们定义队列的 list 为
queue
,正在处理的队列为processing
,消费者可以通过rpoplpush queue processing
取出消息进行处理,处理成功后,再使用lrem processing
将消息从processing
中删除。这样当消费过程中出现异常,消息就会留存在processing
中。然后我们可以通过另一个 worker ,监听processing
并将超时的消息取出来,再放回到queue
。但问题就在于监听processing
的 worker 需要客户端实现,为了计算消息的超时时间,可能还需要别的介质来如 hash 表来存储消息的取出时间,复杂度又上升了。除了使用
processing
表,还有一种方案就是使用循环队列,即rpoplpush queue queue
,source 和 destination 是同一个队列。每次从队列尾部取出待消费的消息,同时将消息放在队列头部,消费完成后,使用lrem queue
将消息删除。但也存在一个问题,当队列中消息数量小于 worker 数量时,不同 worker 就很可能读取到同一个消息,造成消息被重复消费。当然,如果 worker 能够容忍消息被重复处理,这种方式是可行的。总的来说,基于 Redis Lists,可以实现消息的批量创建,并保证创建消息的事务性,但难以实现消息不被重复消费。
基于 MySQL 的批量事务消息队列
基于 Redis Lists 的消息队列之所以难以避免消息被重复消费,主要是异常消息重试导致的。因为数据一旦从 Redis Lists 中取出来,就只能重新 lpush 回去,难以保证取出消息和消费消息的事务(消息一定是成功消费后再删除)。
那么如何基于 MySQL 实现一个不重复消费的队列呢?
批量事务消息
MySQL 的消息队列的数据写入和 Redis 类似,只是 MySQL 是使用表来存储消息。比如新建一张
queue
表:对于 MySQL 消息队列,批量写入消息很容易,直接通过
insert
批量写入数据即可:insert into queue(id,...) values(...)(...)...
。并且 SQL 语句也可以放在事务中,很容易实现批量消息的事务。队列消费者的实现,就是实现一个定时任务,定时从
queue
表中查询出消息进行处理,处理完成之后再删掉该行数据:select * from queue order by id asc, retry_times asc limit 1
delete from queue where id
retry_times
:update queue set retry_times = retry_times + 1 where id
这样就能保证消息一定是成功被消费,如果消费失败,则消息依旧留存在
queue
表中,等待下一次被消费。MySQL 队列的最大问题是,当有多个进程都在从
queue
中读取消息进行消费时,很容易读取到重复的行,进而导致重复消费。如何解决这个问题呢?基于乐观锁的消息消费
最容易想到的就是锁,并且是行锁;如果是表锁,则 SQL 的执行效率就非常低了。主要就是通过锁实现:当一行数据被一个 worker 读取后,就不能被其他 worker 读取。
读取数据时的行锁,最容易想到的就是
select * from queue where id = 1 for update
,该 SQL 会给id
为1
的行加上排他锁,当一个 worker 执行该 SQL 的时候,别的 worker 就无法读取这一行。但如果select ... for updat
中的where
条件没有索引,行锁就会升级为表锁。而我们最开始操作数据的时候,是不知道 id 的。所以没办法直接使用这条 SQL 来查询需要处理的数据,需要另辟蹊径。其实也很简单,假设我们有 10 个 worker 来消费队列,那么每个 worker 可以先批量查询 100 条数据,然后随机选择其中一条,得到对应的
id
。接下来再使用select * from queue where id = ? for update
来给每个 worker 需要处理的一行数据加锁。流程如下:select * from queue order by id asc, retry_times asc limit 100
select * from queue where id = ? for update
这样不同 worker 取得同一行数据的概率只有 1/10,由于排他锁的存在,不同 worker 无法同时读取到这一行数据。
排他锁虽然实现了一行数据只能被一个 worker 读取,但一旦不同 worker 随机选择了同一个 id,则后执行的 SQL 就会一直阻塞,直到排他锁被释放。如果抢到锁的 worker 处理速度很慢,则其他 worker 的 SQL 就会阻塞很长时间。显然这种方案还不够好。
那么有没有更好的方案?排他锁的主要问题是,该行锁会阻塞其他 SQL,所以使用一种不会阻塞的行锁就可以了。
首先给
queue
表增加一个字段:lock_id
:然后不同 worker 还是和之前方案一样,批量读取数据,随机选择一行,然后将该行数据的
lock_id
更新为当前进程的{hostname}-{pid}
,接下来再根据lock_id
去查询一行数据,这样就能保证不同 worker 读取的是不同的数据了。如果 A、B 两个 worker 随机选择了同一个 id,那么造成的影响就是,A 进程的lock_id
可能被 B 进程覆盖,导致 A 进程最后查询不到可以消费的数据,造成 A 进程的浪费。在保证准确性的情况下,这种浪费也是允许的。详细流程如下:select * from queue where lock_id = '' order by id asc, retry_times asc limit 100
update queue set lock_id = {hostname}-{pid}, updated_at = now()
select queue where lock_id = {hostname}-{pid}
delete from queue where id
update queue set lock_id = '', retry_times = retry_times + 1, updated_at = now()
释放锁失败后的重试
通过
lock_id
实现基于乐观锁消费,就能保证一行数据一定只被一个 worker 消费。现在还面临一个问题就是,如果消费失败,需要释放锁,如果释放锁失败怎么处理?这个时候就可以加一个兜底方案。在消费消息最后,查询出有
lock_id
并且超时的数据,这些数据只在两种情况会产生:前两种情况,可以直接重新释放锁,即将
lock_id
设置为空字符串,等待下一次轮询去处理。“移从列表中移除消息失败”,准确来说是:消息消费成功了,但没有从队列中移除。解决这个问题,可以把 ”删除消息“ 和 ”消费消息“ 做成一个事务,就能保证消息一定是消费成功才被删除。
这样就实现了一个可靠的消息队列。
总结
针对批量创建事务消息的场景,将 RocketMQ、Redis Lists 和 MySQL 消息队列做个简单的对比,如下:
总的来说,MNS、RocketMQ 等产品,由于不支持批量写入数据到队列,所以难以满足批量创建事务消息的需求,实现起来成本比较高。而使用 Redis Lists 可以很方便实现一个批量写入数据的消息队列,但难以保证消息只被消费一次。使用 MySQL 实现一个消息队列,批量写入可以通过
insert
一次写入多条消息,并通过乐观锁的方式保证一条消息只被消费一次。当然,技术没有好与不好,只有适合与不适合,上述几种消息队列都有各自适合的场景,最终还是要根据实际需求进行选择。The text was updated successfully, but these errors were encountered: