什么是Bus (AMQP RibbitMQ、Kafka)
Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions. AMQP and Kafka broker implementations are included with the project. Alternatively, any Spring Cloud Stream binder found on the classpath will work out of the box as a transport. —摘自官网
# 0.翻译
- https://spring.io/projects/spring-cloud-bus
- springcloudbus使用轻量级消息代理将分布式系统的节点连接起来。然后,可以使用它来广播状态更改(例如配置更改)或其他管理指令。AMQP和Kafka broker(中间件)实现包含在项目中。或者,在类路径上找到的任何springcloudstream绑定器都可以作为传输使用。
- 通俗定义: bus称之为springcloud中消息总线,主要用来在微服务系统中实现远端配置更新时通过广播形式通知所有客户端刷新配置信息,避免手动重启服务的工作
实现配置刷新原理
搭建RabbitMQ服务
# 0.下载rabbitmq安装包 [][可以直接使用docker安装更方便]
- 官方安装包下载:https://www.rabbitmq.com/install-rpm.html#downloads
[注意:][这里安装包只能用于centos7.x系统]
# 1.将rabbitmq安装包上传到linux系统中
erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el7.noarch.rpm
# 2.安装Erlang依赖包
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
# 3.安装RabbitMQ安装包(需要联网)
yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要
将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
# 4.复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 5.查看配置文件位置
ls /etc/rabbitmq/rabbitmq.config
# 6.修改配置文件(参见下图:)
vim /etc/rabbitmq/rabbitmq.config
将上图中配置文件中红色部分去掉%%
,以及最后的,
逗号 修改为下图:
# 7.执行如下命令,启动rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management
出现如下说明:
Enabling plugins on node rabbit@localhost:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
# 8.启动RabbitMQ的服务
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
# 9.查看服务状态(见下图:)
systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
Main PID: 2904 (beam.smp)
Status: "Initialized"
CGroup: /system.slice/rabbitmq-server.service
├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
MBlmbcs...
├─3220 erl_child_setup 32768
├─3243 inet_gethost 4
└─3244 inet_gethost 4
.........
# 10.启动出现如下错误:
- 4月 21 10:10:50 bogon systemd[1]: Starting RabbitMQ broker...
- 4月 21 10:11:11 bogon rabbitmq-server[1772]: ERROR: epmd error for host bogon: address (cannot connect to host/port)
- 4月 21 10:11:11 bogon systemd[1]: rabbitmq-server.service: main process exited, code=exited, status=1/FAILURE
`解决方案`:
1. 修改主机名 vim /etc/hostname 修改为自己注解名 rabbimq 2.修改完必须重启
3. vim /etc/hosts 在文件中添加: 127.0.0.1 自己主机名(rabbitmq)
# 10.关闭防火墙服务
systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
systemctl stop firewalld
# 11.访问web管理界面
http://10.15.0.8:15672/
# 12.登录管理界面
username: guest
password: guest
# 13.MQ服务搭建成功
实现自动配置刷新
# 1.在所有项目中引入bus依赖
<!--引入bus依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
# 2.配置统一配置中心连接到mq
rabbitmq:
host: 121.4.116.186 #连接主机
port: 5672 #连接mq端口
username: guest #连接mq用户名
password: guest #连接mq密码
virtual-host: / #mq虚拟路径
重点一:config-server和config-client的配置都得加上
management:
endpoints:
web:
exposure:
include: "*" #开启所有web端点暴露
重点二: 否则会出现即使刷新消息从配置服务器广播到其他微服务,服务调用时也刷新不了,这是上一节手动刷新配置时配置的注解,自动配置刷新也需要这个注解,课程里没提这2点重要性.
# 3.远端配置中加入连接mq配置
# 4.启动统一配置中心服务
- 正常启动
# 5.启动客户端服务
- 加入bus组件之后客户端启动报错
- 原因springcloud中默认链接不到远程服务器不会报错,但是在使用bus消息总线时必须开启连接远程服务失败报错
spring.cloud.config.fail-fast=true
# 6.修改远程配置后在配置中心服务通过执行post接口刷新配置
- curl -X POST http://localhost:7878/actuator/bus-refresh
# 7.通过上述配置就实现了配置统一刷新
指定服务刷新配置
# 1.说明
- 默认情况下使用curl -X POST http://localhost:7878/actuator/bus-refresh这种方式刷新配置是全部广播形式,也就是所有的微服务都能接收到刷新配置通知,但有时我们修改的仅仅是某个服务的配置,这个时候对于其他服务的通知是多余的,因此就需要指定服务进行通知
# 2.指定服务刷新配置实现
- 指定端口刷新某个具体服务: curl -X POST http://localhost:7878/actuator/bus-refresh/configclient:9090
- 指定服务id刷新服务集群节点: curl -X POST http://localhost:7878/actuator/bus-refresh/configclient
[注意:][configclient代表刷新服务的唯一标识]
集成webhook实现自动刷新
# 1.配置webhooks
- 说明: git仓库提供一种特有机制: 这种机制就是一个监听机制 监听就是仓库提交事件 ... 触发对应事件执行
- javascript: 事件 事件源 html标签 事件: 触发特定动作click ... 事件处理程序:函数
- 添加webhooks
- 在webhooks中添加刷新配置接口
- 内网穿透的网站: https://natapp.cn/
# 2.解决400错误问题
- 在配置中心服务端加入过滤器进行解决(springcloud中一个坑)
package com.xxx.filters;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
/**
* @author blijojo
*/
@Component
public class UrlFilter implements Filter {
public static final String BUS_REFRESH = "/bus-refresh";
public static String readAsChars(HttpServletRequest request) {
BufferedReader br = null;
StringBuilder sb = new StringBuilder();
try {
br = request.getReader();
String str;
while ((str = br.readLine()) != null) {
sb.append(str);
}
br.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != br) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return sb.toString();
}
@Override
public void init(FilterConfig filterConfig) {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
String url = httpServletRequest.getRequestURI();
//只过滤/actuator/bus-refresh请求
if (!url.endsWith(BUS_REFRESH)) {
chain.doFilter(request, response);
return;
}
//获取原始的body
String body = readAsChars(httpServletRequest);
System.out.println("original body: " + body);
//使用HttpServletRequest包装原始请求达到修改post请求中body内容的目的
CustomerRequestWrapper requestWrapper = new CustomerRequestWrapper(httpServletRequest);
chain.doFilter(requestWrapper, response);
}
@Override
public void destroy() {
}
private static class CustomerRequestWrapper extends HttpServletRequestWrapper {
public CustomerRequestWrapper(HttpServletRequest request) {
super(request);
}
@Override
public ServletInputStream getInputStream() {
byte[] bytes = new byte[0];
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
return new ServletInputStream() {
@Override
public boolean isFinished() {
return byteArrayInputStream.read() == -1;
}
@Override
public boolean isReady() {
return false;
}
@Override
public void setReadListener(ReadListener readListener) {
}
@Override
public int read() {
return byteArrayInputStream.read();
}
};
}
}
}