RabbitMQ常用的的五种工作模式
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:
- 简单模式
- work模式
- Publish/Subscribe发布与订阅模式
- Routing路由模式
- Topics主题模式
- RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)
相关概念介绍
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
RabbitMQ是AMQP协议的Erlang的实现
Hello World简单消息模式
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
编写代码
生产者
package com.soberw.simple;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author soberw
* @Classname Publisher
* @Description 生产者:向消息中间件上发送消息数据的应用程序
* @Date 2022-05-31 15:19
*/
public class Publisher {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置mq服务器连接信息
//设置虚拟主机名称 默认是 /
factory.setVirtualHost("/");
//设置mq服务器连接地址 默认是 localhost
factory.setHost("192.168.6.200");
//设置连接用户名 默认为 guest
factory.setUsername("soberw");
//设置密码 默认为 guest
factory.setPassword("123456");
//设置连接端口 默认为 5672
factory.setPort(5672);
//2、创建连接
Connection connection = factory.newConnection();
//3、在连接上创建频道(信道),信道相当于一个逻辑上的连接。
// 为了提高系统性能,不是每次都连接或者关闭connection
// 而是每次都打开或者关闭信道
Channel channel = connection.createChannel();
//4、声明(创建)队列
/*
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当mq重启之后,是否还在
* exclusive 参数3:是否独占本次连接(是否为排他队列),这个队列是否只限于当前连接使用。如果连接关闭,那么队列自动被删除
* ① 是否独占,只能有一个消费者监听这个队列
* ② 当connection关闭时,是否删除队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除。队列长时间没有使用,服务会自动删除队列
* arguments 参数5:队列其它参数,队列的一些属性。例如:队列超时时间、队列连接长度等...
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//5、发送信息
String message = "Hello,RabbitMQ!";
/*
* 参数1:交换机名称,如果没有指定则使用默认Default Exchange,简单模式下默认为“”即可
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息,消息属性信息
* 参数4:消息内容
*/
AMQP.BasicProperties props = new AMQP.BasicProperties();
//构建之后一定要赋值给一个新的对象,要不然无效
AMQP.BasicProperties build = props.builder().appId("app01").userId("soberw").messageId("msg01").build();
channel.basicPublish("", QUEUE_NAME, build, message.getBytes());
System.out.println("已经发送的信息:" + message);
//6、关闭资源
channel.close();
connection.close();
}
}
消费者
package com.soberw.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author soberw
* @Classname Consumer
* @Description 从消息中间件上获取消息,并处理消息的应用程序
* @Date 2022-05-31 16:18
*/
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接
ConnectionFactory factory = new ConnectionFactory();
//设置mq服务器连接信息
//设置虚拟主机名称 默认是 /
factory.setVirtualHost("/");
//设置mq服务器连接地址 默认是 localhost
factory.setHost("192.168.6.200");
//设置连接用户名 默认为 guest
factory.setUsername("soberw");
//设置密码 默认为 guest
factory.setPassword("123456");
//设置连接端口 默认为 5672
factory.setPort(5672);
//2、创建连接
Connection connection = factory.newConnection();
//3、在连接上创建频道(信道)
Channel channel = connection.createChannel();
//4、声明(创建)队列(可以省略)
// 原则上消费者是可以省略不写的,但是考虑到容错性的问题,建议写上
// 因为如果生产者还没有进入队列的时候,如果消费者先进入队列,则会报错
// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//5、获取并处理消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由key。。。
* @param properties 配置信息
* @param body 数据体
* @throws IOException io异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("DeliveryTag = " + envelope.getDeliveryTag()); //消息唯一标识。类似于ID
System.out.println("Exchange = " + envelope.getExchange()); //获取消息是从那个交换机过来的
System.out.println("RoutingKey = " + envelope.getRoutingKey()); //获取队列绑定交换机的路由 key
System.out.println("properties = " + properties);
System.out.println("body = " + new String(body));
//下面就可以进行一些操作,例如:将信息保存到数据库,给用户发消息,发短息,记录日志等
}
};
/*
* 消费者类似一个监听程序,主要是用来监听消息
* 参数:
* 1、queue: 队列名称
* 2、autoAck:是否自动确认,类似于发短息的时候,发送成功手机会收到一个确认信息
* 3、callback:回调对象
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
//6、关闭资源
// channel.close();
// connection.close();
}
}
RabbitMQ执行流程总结
生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
- 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息至RabbitMQ Broker;
- 关闭信道;
- 关闭连接;
消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
- 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
- 等待Broker投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ从队列中删除相应已经被确认的消息;
- 关闭信道;
- 关闭连接;
抽取工具类
由于下面对每个模式的测试都大致要遵循此流程,只是涉及到具体的操作方法会有所不同,因此这里可以对建立连接进行一个抽取,抽取出来一个工具类
package com.soberw.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author soberw
* @Classname ConnectionUtil
* @Description 连接抽取出来的工具类
* @Date 2022-05-31 20:45
*/
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.6.200");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("soberw");
factory.setPassword("123456");
// 通过工程获取连接
return factory.newConnection();
}
//测试一下
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = getConnection();
//connection = amqp://soberw@192.168.6.200:5672/
System.out.println("connection = " + connection);
}
}
Work queues工作队列模式
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
编写代码
因为是多个消费者共同消费同一个生产者的消息,因此创建多一个消费者
生产者
package com.soberw.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.soberw.util.ConnectionUtil;
/**
* @author soberw
* @Classname Publisher
* @Description work消息模型:多个消费者共同消费一个队列的消息。目的:提高消息处理速度
* @Date 2022-05-31 20:57
*/
public class Publisher {
/**
* 模拟生产者生产速度快
*/
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
channel.close();
connection.close();
}
}
消费者1
package com.soberw.work;
import com.rabbitmq.client.*;
import com.soberw.util.ConnectionUtil;
import java.io.IOException;
/**
* @author soberw
* @Classname consumer1
* @Description
* @Date 2022-05-31 21:29
*/
public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body1:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2
package com.soberw.work;
import com.rabbitmq.client.*;
import com.soberw.util.ConnectionUtil;
import java.io.IOException;
/**
* @author soberw
* @Classname Consumer2
* @Description
* @Date 2022-05-31 21:30
*/
public class Consumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body2:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
小结
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
- Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
订阅模式类型
订阅模式示例图
前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Publish/Subscribe发布与订阅模式
发布订阅模式:
- 每个消费者监听自己的队列。
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
编写代码
生产者
需要创建交换机以进行消息转发:
package com.soberw.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.soberw.util.ConnectionUtil;
/**
* @author soberw
* @Classname Publisher
* @Description Publish/Subscribe发布与订阅模式的生产者应用程序
* @Date 2022-06-01 11:00
*/
public class Publisher {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/*
*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
*参数:
* 1. exchange:交换机名称
* 2. type:交换机类型
* DIRECT("direct"),:定向
* FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定的队列。
* TOPIC("topic"),通配符的方式,发送给符合通配符条件的队列
* HEADERS("headers");参数匹配
* 3. durable:是否持久化
* 4. autoDelete:自动删除
* 5. internal:内部使用。 一般false
* 6. arguments:参数
*/
String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//7. 绑定队列和交换机
/*
*queueBind(String queue, String exchange, String routingKey)
*参数:
* 1. queue:队列名称
* 2. exchange:交换机名称
* 3. routingKey:路由键,绑定规则
* 如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName, "", null, body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
消费者1
package com.soberw.fanout;
import com.rabbitmq.client.*;
import com.soberw.util.ConnectionUtil;
import java.io.IOException;
/**
* @author soberw
* @Classname Consumer
* @Description Publish/Subscribe发布与订阅模式的消费者应用程序
* @Date 2022-06-01 11:00
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}
消费者2
将队列名称改为2号队列,其他一样
测试发现,两个消费者都成功取到了消息,并且情空了各自的队列
小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到
发布订阅模式与工作队列模式的区别
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。
Routing路由模式
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey。
- Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key 完全一致,才会接收到消息
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
编写代码
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct
,还有队列绑定交换机的时候需要指定routing key
。
生产者
package com.soberw.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.soberw.util.ConnectionUtil;
/**
* @author soberw
* @Classname Publisher
* @Description Routing路由模式的生产者应用程序
* @Date 2022-06-01 12:55
*/
public class Publisher {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//定义交换机名字
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name, exchangeName, "error");
// 队列2绑定info warning
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "warning");
String errorMessage = "日志信息:张三调用了delete方法.错误了,日志级别error...";
String warningMessage = "日志信息:张三调用了delete方法.错误了,日志级别warning...";
String infoMessage = "日志信息:张三调用了delete方法.错误了,日志级别info...";
// 发送消息
channel.basicPublish(exchangeName, "error", null, errorMessage.getBytes());
channel.basicPublish(exchangeName, "warning", null, warningMessage.getBytes());
channel.basicPublish(exchangeName, "info", null, infoMessage.getBytes());
channel.close();
connection.close();
}
}
消费者1
package com.soberw.routing;
import com.rabbitmq.client.*;
import com.soberw.util.ConnectionUtil;
import java.io.IOException;
/**
* @author soberw
* @Classname Consumer1
* @Description Routing路由模式的消费者应用程序
* @Date 2022-06-01 12:55
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}
消费者2
消费者2从队列2中取消息
小结
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
Topics主题模式
Topic主题模式也叫通配符模式。Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
- #:匹配零个或多个词
- *:匹配不多不少恰好1个词
举例:
- item.#:能够匹配item.insert.abc 或者 item.insert
- item.*:只能匹配item.insert
- 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
- 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
编写代码
使用topic类型的Exchange,发送不同消息的routing key。
生产者
package com.soberw.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.soberw.util.ConnectionUtil;
/**
* @author soberw
* @Classname Publisher
* @Description Topic通配符模式的生产者程序
* @Date 2022-06-01 14:51
*/
public class Publisher {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 绑定队列和交换机
/*
* 参数:
* 1. queue:队列名称
* 2. exchange:交换机名称
* 3. routingKey:路由键,绑定规则
* 如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name, exchangeName, "#.error"); //匹配零个或者多个以.error结束的词
channel.queueBind(queue1Name, exchangeName, "order.*"); //匹配一个以order.开头的词
channel.queueBind(queue2Name, exchangeName, "*.*"); //匹配两个以 . 分割的词
//定义不同的routing key
String key1 = "order.info.error"; //可被队列 1 匹配到
String key2 = "order.info"; //可被队列 1 2 匹配到
String key3 = "goods.info"; //可被队列 2 匹配到
String key4 = "goods.error"; //可被队列 1 2 匹配到
String body1 = "日志信息:张三通过" + key1 + "调用了方法...";
String body2 = "日志信息:张三通过" + key2 + "调用了方法...";
String body3 = "日志信息:张三通过" + key3 + "调用了方法...";
String body4 = "日志信息:张三通过" + key4 + "调用了方法...";
//发送消息
channel.basicPublish(exchangeName, key1, null, body1.getBytes());
channel.basicPublish(exchangeName, key2, null, body2.getBytes());
channel.basicPublish(exchangeName, key3, null, body3.getBytes());
channel.basicPublish(exchangeName, key4, null, body4.getBytes());
channel.close();
connection.close();
}
}
消费者1
package com.soberw.topic;
import com.rabbitmq.client.*;
import com.soberw.util.ConnectionUtil;
import java.io.IOException;
/**
* @author soberw
* @Classname Consumer1
* @Description Topic通配符模式的消费者程序
* @Date 2022-06-01 14:50
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}
消费者2
同消费者1 一样,接收来自队列2 的消息。
小结
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
模式总结
RabbitMQ工作模式:
简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列