RabbitMQ学习记录

RabbitMQ中间件

MQ相关概念

什么是MQ

MQ(message queue),本质是队列,FIFO先入先出,只不过队列中存放的是message。还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

为什么要用MQ

流量消峰

在正常情况下,如果订单系统最多只能处理一万条数据。但是在高峰的时候,如果有两万条数据,订单系统处理不了,可能导致下单失败。如果使用消息队列作为缓冲,部分订单先放到队列里,用户虽然等待时间长,但是能够正常下单使用。

应用解耦

在大型项目中包含多个系统,部分系统是耦合的,这个数据处理完之后交由另一个系统处理,如果另一个系统发生故障,就会导致数据异常。如果采用消息队列模式,就算系统发生故障,等待系统恢复后也可以数据正常,提升系统的可用性。

异步处理

有些服务之间调用时异步的,例如A调用B,B需要花费很长时间执行,但是A需要B什么时候可以执行完。以前一般有两种方式,A过一段时间去调用B的查询API查询,或者A提供一个callback api,B执行完之后调用API通知A服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完之后,会发送消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询API,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息。

RabbitMQ介绍

2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

优点

由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备、健壮、稳定、易用、跨平台、支持多种语言,如Python、Ruby、.NET、Java、C、PHP、JMS、ActionScript、XMPP、STOMP等,支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃高;更新频率相当高。https://www.rabbitmq.com/new.html

缺点

商业版需要收费,学习成本较高

MQ的选择

Kafka

Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选Kafka了。

RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务消峰,在大量交易涌入时,后端可能无法及时处理的情况。RocketMQ在稳定性上可能更是值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议选择RocketMQ。

RabbitMQ

结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司选择功能比较完备的鄂RabbitMQ。

RabbitMQ

概念

RabbitMQ是一个消息中间件:它接收、存储并转发消息。

四大核心概念

  • 生产者:产生数据发送消息的程序是生产者
  • 交换机:交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面他将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多队列,亦或者是把消息丢弃,这个得有交换机类型决定
  • 队列:队列是RabbitMQ内部使用的一种数据结果,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质是一个大的消息缓存区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
  • 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者、消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又可以是消费者。

RabbitMQ核心部部分

Hello World!

The simplest thing that does something

Work queues

Distributing tasks among workers (the competing consumers pattern)

Publish/Subscribe

Sending messages to many consumers at once

Routing

Receiving messages selectively

Topics

Receiving messages based on a pattern (topics)

RPC

Request/reply pattern example

Publsher Confirms

Reliable publishing with publisher confirms

各个名词介绍

Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。

Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似与网络中的namesapce概念,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。

Connection:publisher/consumer和broker之间的TCP连接。

Channel:如果每次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个Thread创建单独的Channel通信,AMQP method包含了Channel id帮助客户端和message broker识别Channel,所以Channel之间是完全隔离的。Channel作为轻量级Connection极大减少了操作系统建立TCP Connection的开销。

Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct

(point-to-point),topic(pulish-subscribe)和fanout(multicast)

Queue:消息最终被送到这里等待consumer取走

Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据

安装指南

https://www.rabbitmq.com/download.html

安装

  • 安装Rabbit之前首先安装erlang语言。

  • 查看当前系统命令uname -a

  • 依次安装文件

    1
    2
    3
    rpm -ivh erlang-21.3-1.el7.x86_64.rpm
    yum install socat -y
    rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

常用命令

  • 添加开机启动RabbitMQ服务

    1
    chkconfig rabbitmq-server on
  • 启动服务

    1
    /sbin/service rabbitmq-server start
  • 查看服务状态

    1
    /sbin/service rabbitmq-server status
  • 停止服务

    1
    /sbin/service rabbitmq-server stop
  • 开启web管理插件

    1
    rabbitmq-plugins enable rabbitmq_managerment

    用默认账号密码(guest/guest)访问地址http://ip:port

  • 关闭防火墙

    1
    systemctl stop firewalld

添加新账户

创建账号
1
rabbitmq add_user admin 123
设置用户角色
1
rabbitmqctl set_user_tags admin administrator
设置用户权限
1
2
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

用户user_admin 具有/vhost1这个virtual host中所有资源的配置、写、读权限

当前用户和角色
1
rabbitmqctl list_users

再次利用admin账户登录

Hello World

依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>

消息生产者

消息消费者