安装
记得在有管理员权限安装
1 2 3 4 5 6 7 8 9 10 11
|
[root@hadoop202 software]
[root@hadoop202 software]
[root@hadoop202 software]
[root@hadoop202 software]
[xiamu@hadoop202 software]$ rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
|
开启服务
1 2 3 4 5
| [root@hadoop202 software]# systemctl start rabbitmq-server
[root@hadoop202 software]# systemctl status rabbitmq-server
[root@hadoop202 software]# systemctl stop rabbitmq-server
|
安装 Web 界面插件
1 2 3 4 5 6 7 8
| [root@hadoop202 software]# systemctl stop rabbitmq-server
[root@hadoop202 software]# rabbitmq-plugins enable rabbitmq_management
[root@hadoop202 software]# systemctl start rabbitmq-server
[root@hadoop202 software]# systemctl status rabbitmq-server
|
安装 Web 界面完成之后, 访问如下地址
http://hadoop202:15672/
注意记得关闭防火墙
第一次登录不了, 需要添加账户才能登录
username: guest
password: guest
添加账户 给予权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 创建账号 [root@hadoop202 software]# rabbitmqctl add_user admin 123 Adding user "admin" ...
设置用户角色 [root@hadoop202 software]# rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ...
设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read> [root@hadoop202 software]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" Setting permissions for user "admin" in vhost "/" ... 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色 [root@hadoop202 software]# rabbitmqctl list_users Listing users ... user tags admin [administrator] guest [administrator] [root@hadoop202 software]#
|
再次登录
账号 admin
密码 123
创建 JAVA 开发环境
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
|
创建生产者
com.atguigu.one
Producer 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.atguigu.one;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Producer { public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("hadoop202"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕");
} }
|
创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.atguigu.one;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer { public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("hadoop202"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123"); Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String(message.getBody())); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
工作队列原理
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
抽取连接工厂工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.atguigu.utils;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils { public static Channel getChannel() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("hadoop202S"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
|
启动多个工作线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.atguigu.two;
import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
public class Worker01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到的消息: " + new String(message.getBody())); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); };
System.out.println("C3等待接收消息...");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
然后运行三个工作线程(实例)
工作队列 - 生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.atguigu.two;
import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Task01 { public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息完成: " + message); } } }
|
测试结果:
消息应答生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.atguigu.three;
import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class Task2 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息: " + message); } } }
|
消息应答消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.atguigu.three;
import com.atguigu.utils.RabbitMqUtils; import com.atguigu.utils.SleepUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery;
public class Work03 { public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短");
DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(1); System.out.println("接收到的消息" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> { System.out.println("消费者取消消费接口回调逻辑"); };
boolean autoAsk = false; channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, cancelCallback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.atguigu.three;
import com.atguigu.utils.RabbitMqUtils; import com.atguigu.utils.SleepUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
public class Work04 { public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2等待接收消息处理时间较快");
DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(30); System.out.println("接收到的消息" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> { System.out.println("消费者取消消费接口回调逻辑"); };
boolean autoAsk = false; channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, cancelCallback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.atguigu.utils;
public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
测试结果:
work04 运行到一半时, 给挂掉, work03 成功处理 work04 未能处理的消息
队列持久化
1 2 3 4 5
| channel.queueDeclare(TASK_QUEUE_NAME, null, false, false, null);
boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
|
如果队列在未持久化运行过, 把 durable 修改成 true 变成持久化之后就会报如下错误
1
| Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
|
我们只需要删除这个队列 , 再次运行就可以了
Features 显示 D 说明持久化成功
消息持久化
1 2
| channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
|
不公平分发
所有的消费设置
1 2 3
| int prefetchCount = 1; channel.basicQos(prefetchCount);
|
预取值
消费者 1 设置
1 2 3
| int prefetchCount = 2; channel.basicQos(prefetchCount);
|
消费者 2 设置
1 2 3
| int prefetchCount = 5; channel.basicQos(prefetchCount);
|
发布确认
生产者设置信道
1 2
| // 开启发布确认的方法 channel.confirmSelect();
|
发布确认分三种
单个确认
1 2
| // 1.单个确认 // publicMessageIndividually(); // 发布1000个单独确认消息, 耗时616ms
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static void publicMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes());
boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功"); } }
long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息, 耗时" + (end - begin) + "ms"); }
|
批量确认
1 2
| // 2.批量确认 // publishMessageBatch() ; // 发布1000个批量确认消息, 耗时93ms
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public static void publishMessageBatch() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
int batchSize = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); if (i % batchSize == 0) { channel.waitForConfirms(); } }
long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息, 耗时" + (end - begin) + "ms"); }
|
异步批量确认
1 2 3
| // 3.异步批量确认 publishMessageAsync(); // 发布1000个异步批量确认消息, 耗时47ms // 发布1000个异步批量确认消息, 耗时62ms
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null);
channel.confirmSelect();
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息: " + deliveryTag); };
ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息是: " + message + ", 未确认的消息tag: " + deliveryTag); };
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i; channel.basicPublish("", queueName, null, message.getBytes()); outstandingConfirms.put(channel.getNextPublishSeqNo(), message); }
long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步批量确认消息, 耗时" + (end - begin) + "ms"); }
|
总结
单独发布消息:
同步等待确认,简单,但吞吐量非常有限。
批量发布消息:
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
交换机
简单模式 工作模式
发布, 订阅模式
绑定
创建一个队列
创建一个交换机
让交换机和队列进行绑定
fanout 交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.atguigu.five;
import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery;
public class ReceiveLogs01 { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息, 把接收到消息打印到屏幕上...");
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs01控制台打印接收到的消息: " + new String(message.getBody(), "UTF-8")); };
CancelCallback cancelCallback = (consumerTag) -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.atguigu.five;
import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs02 { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息, 把接收到消息打印到屏幕上...");
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs02控制台打印接收到的消息: " + new String(message.getBody(), "UTF-8")); };
CancelCallback cancelCallback = (consumerTag) -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.atguigu.five;
import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class EmitLog { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息: " + message); } } }
|
运行效果如下 : 通过交换机实现每个队列都能接收到消息
direct 交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class DirectLogs { public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息: " + message); } } }
|
交换机发送的 routerKey 是 warning, 那么只有对应的消费者是 warning 才会接收到发送来的消息
Topics 交换机
发送 topic 交换机的消息的 routerKey 必须是以点号分隔开
_(星号)可以代替一个单词
#(井号)可以代替零个或多个单词
C1 有一个 RouterKey, .orange.
C2 有两个个 RouterKey, _.*rabbit lazy.#
quick.orange.rabbit 匹配 Q1 Q2
lazy.orange.elephant 匹配 Q1 Q2
quick.orange.fox 匹配 Q1
lazy.brown.fox 匹配 Q2
lazy.pink.rabbit 匹配 Q2
quick.brown.fox 匹配不到
quick.orange.male.rabbit 匹配不到
lazy.orange.male.rabbit 匹配 Q2
死信队列
1 2 3 4 5 6 7 8 9 10
| Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); if (msg.equals("info5")) { System.out.println("Consumer01接收到的消息: " + msg + ": 此消息是被C1拒绝的"); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer01接收到的消息: " + msg); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } };
CancelCallback cancelCallback = (consumer) -> {
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }
|
当发送的消息处于 消息被拒绝, 消息的 TTL 过期, 队列打到最大长度 三种情况的一种, 就会发生死信
C1 拒收消息, 将由配置好的死信 C2 来接收处理消息
延迟队列