HSF是一个RPC的中间件,微服务之间的远程调用,是现在各类微服务之间通信的主要采用的一个中间件。既然是远程调用,其中与其相关的序列化、网络通信知识必不可少。
前置知识
1、对象序列化。
java里把对象转换成字节码叫做对象的序列化。把字节码恢复陈对象叫做对象的反序列化。
为什么要这么做?主要为了方便对象持久化(对象的字节序列一直存在硬盘的一个文件里,随时可以整出来)+网络传输对象。
如何序列化?实现Serializable接口
public class person implements Serializable {
// 序列化uid,用来保证版本一致性。
private static final long serialVersionUID = -5809782578272943999L;
// ...
}
public class Test{
private static void SerializaPerson() throws FileNotFoundException, IOException {
Person person = new Person();
// set...
ObjectOutputStream oo = new ObjectOutputStream(new FileOuputStream(new File("D://xxx.txt")));
oo.writeObject(person);
}
private static Person DeserializePerson() throws Exception, IOException {
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File("D://xxx.txt")));
Person person = (Person) ois.readObject();
return person;
Java序列化和Hessian序列化的区别?Java序列化会把所有的信息,包含类的继承关系都序列化了,所以效率较低,而且字节流比较大。Hessian序列化着重于数据,附带简单的类型信息方法,比如Integer a = 1,简化为I 1这样的流。Hessian把所有的对象属性当成一个Map来序列化,包含了基本的类型描述和数据内容。如果遇到重复的对象,还会插入一个引用块来避免多次序列化和反序列,所以相当于提高了效率和节省了空间。
Hessian2序列化(Dubbo的默认序列化器),参考https://www.cnblogs.com/yougewe/articles/10491667.html
HSF在1.0的时候只支持Java和Hessian的序列化方式,2.0的时候增加了Hessian2, json, kyro的方式。
其中,kyro的序列化方式效率最高但适配最差,而java原生序列化方式效率最低但适配最好;hession在性能上处于kyro以及java原生之间,效果还是不错的;
- Netty
Netty是一个先进的NIO的解决方案,完善了java中的NIO并对其进行了优化。
这有一个极简的采用Netty搭建的一个RPC服务的案例:https://www.v2ex.com/amp/t/447130
HSF简单说明:
上面是一个更加清晰的图,描述了各个角色都能做什么。
对Provider,可以声明和发布服务。
对Consumer,主要是获取服务,在代理这里,不同的代理方式会产生差异。
对配置中心,保存了很多的三元组,一方面接受Provider的注册服务,另一方面对消费者连接和心跳推送服务。
除了上述的三个实体之外,还有另外一些参与的实体:
- 持久化配置,主要基于Diamond,存储HSF中使用到的治理规则,包括路由规则、归组规则、权重规则等,从而根据规则对调用过程的选址逻辑进行干预。
- 元数据存储中心,用于存储HSF服务对应的方法列表以及参数结构等信息,该部分不会对HSF服务的调用过程产生影响。
- HSFOPS,在线的进行上述的各类管理,而且可以在线进行测试。
HSF发布流程:
一共三个阶段:1. 首先Spring容器初始化,这时候主要监听上下文的事件, 2. HSFSpringProviderBean初始化,这里主要是元数据的初始化和存储,3. 发布服务,这里主要调用RPC协议发布服务,然后ConfigServer注册服务发布元数据,最后存储元数据。
- Spring容器初始化时发生了什么?
1)调用了setApplicationContext方法。
HSFSpringProviderBean实现了ApplicationContextAware接口,意味着Spring容器初始化后,HSFSpringProviderBean中的setApplicationContext方法将被调用,同时获取Spring容器ApplicationContext。
/* HSFSpringProviderBean.java */
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
...
if (applicationContext instanceof AbstractApplicationContext){
...
Method method = AbstractApplicationContext.class.getDeclareMethod("addListener",
new Class<?>[] {ApplicationListener.class});
method.setAccessible(true);
// 注册HSFApplicationListener
method.invoke(applicationContext, new Object[] {new HSFApplicationListener() });
isInSpringCOntainer = true;
}
}
2)监听Spring事件,触发onApplicationEvent
/* HSFSpringProviderBean.java */
private final class HSFApplicationListener implements ApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent) {
// 如果是刷新事件,发布服务,更新服务状态
HSFSpringProviderBean.this.providerBean.publish();
setAppInitedStatus();
} else if (event instanceof ContextClosedEvent) {
// 如果是关闭事件,关闭HSF服务器
if (AppInfoUtils.appRunning.compareAndSet(true, false)) {
providerBean.shutdownHSFServer();
}
}
}
}
实例化HSFApiProviderBean,并且设置各个元数据:
/* HSFSpringProviderBean.java */
public final class HSFSpringProviderBean implements InitializingBean, ApplicationContextAware {
...
private final HSFApiProviderBean providerBean = new HSFApiProviderBean();
...
}
其中构造函数里包含服务元数据ServiceMetadata的相关属性,包括版本号、组别、超时时间等
/* HSFApiProviderBean.java */
private final ServiceMetadata metadata = new ServiceMetadata(true);
...
public HSFApiProviderBean(){
metadata.setVersion("1.0.0");
metadata.setGroup("HSF");
// 默认客户端调用超时时间
metadata.addProperty(TRConstraints.TIMEOUT_TYPE_KEY, "3000");
// 默认客户端连接空闲超时时间
metadata.addProperty(TRConstraints.IDLE_TIMEOUT_KEY, "10");
// 序列化类型,默认Hessian
metadata.addProperty(TRConstraints.SERIALIZE_TYPE_KEY, HSFContraints.HESSIAN_SERIALIZE);
metadata.addProperty(HSFContraints.RPC_VERSION, "2.0");
metadata.addProperty(HSFContraints.PREFER_SERIALIZIER, HSFServiceContainer.getInstance(ConfigurationService.class).getPreferedSerializer());
}
HSFSpringProviderBean实现了initialingBean,意味着Bean被初始化时,Spring设置完Bean所有属性后,将触发afterPropertiesSet方法
/* HSFSpringProviderBean.java */
public final class HSFSpringProviderBean implements InitializingBean, ApplicationContextAware {
...
public void afterPropertiesSet() throws Exception {
init();
}
}
init里在做什么:
/* HSFSpringProviderBean.java */
public void init() throws Exception {
// 避免多次初始化
if (!providerBean.getInited().compareAndSet(false, true)) {return;}
// 初始化HSF日志
LoggerInit.initHSFLog();
// 初始化AppName
AppInfoUtils.initAppNames(providerBean.getMetadata());
// 初始化Spas
SpasInit.initSpas();
// 检查配置,初始化服务名称
providerBean.checkConfig();
// 发布服务,如果providerBean不在Spring容器中,则发布服务
publishIfNotInSpringContainer();
}
private void publishIfNotInSpringContainer(){
if(!isInSpringContainer){
LOGGER.warn("[SpringProviderBean]不是在Spring容器中创建,不推荐使用");
providerBean.publish();
}
}
- 发布服务
HSF服务发布入口:HSFApiProviderBean.publish()
/* HSFApiProviderBean.java */
public void publish(){
if (!isPublished.compareAndSet(false, true)) {return;}
...
// 获取ProcessService实例,将服务元数据作为参数,调用publish
HSFServiceContainer.getInstance(ProcessService.class).publish(metadata);
}
根据metadata获取RPC协议,调用响应的RPC服务完成服务发布:
/* ProcessComponent.java */
@Override
public void publish(ServiceMetadata metadata) throws HSFException {
...
// 根据相应RPC服务实现完成服务发布
rpcProtocolService.registerProvider(rpcProtocolType, metadata);
...
hookService.prePublish(metadata);
...
if (!metadata.isDelayedPublish()){
// 向ConfigService注册服务,发布元数据
metadataService.publish(metadata);
}
...
hookService.afterPublish(metadata);
// 存储元数据
metadataInfoStoreService.store(metadata);
}
根据相应RPC服务实现完成服务发布:
/* RPCprotocolTemplateComponent.java */
@Override
public void registerProvider(String protocol, ServiceMetadata metadata) throws HSFException {
// 调用RemotingRPCProtocolComponent.registerProvider()
rpcService.registerProvider(metadata);
}
调用RemotingRPCProtocolComponent.registerProvider():
/* RemotingRPCProtocolComponent.java */
public void registerProvider(ServiceMetadata metadata) throws HSFException {
// 启动HSF Server
if (isProviderStarted.compareAndSet(false, true)){
providerServer.startHSFServer();
// server.start(configService.getHSFServerPort())
...
}
// 分配线程池
int corePoolSize = metadata.getCorePoolSize();
int maxPoolSize = metadata.getMaxPoolSize();
if (corePoolSize > 0 && maxPoolSize > 0 && maxPoolSize >= corePoolSize) {
providerServer.allocThreadPool(metadata.getUniqueName(), corePoolSize, maxPoolSize);
}
// 注册对象到HSF Server
providerServer.addMetadata(metadata.getUniqueName(), metadata, metadata.getTarget());
}
最后NettyService.start(),本地开始监听端口,处理请求:
/* NettyServer.java */
public void start(int listenPort) throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
...
long tryBind = 3;
while (tryBind > 0) {
ChannelFuture cf = bootstrap.bind(new InetSocketAddress(bindHost, listenPort));
cf.await();
if (cf.isSuccess()) {
LOGGER.warn("Server started, listen at: " + listenPort);
retunr;
} else {
tryBind --;
}
}
}
服务发布后,服务端如何处理远程调用:
发布服务后,Server(HSF Provider)已经启动,并监听相应的端口等待Client(HSF Consumer)连接。
当新请求来的时候,会出发NettyServerHandler的messageReceived()方法:
/* NettyServerHandler.java */
public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
...
handleRequest(ctx, message);
}
private void handleRequest(final ChannelHandlerContext ctx, final Object message) {
final Connection connect = new NettyConnection(ctx.getChannel());
if (message instanceof List) {
List<BaseRequest> messages = (List<BaseRequest>) message;
for (Object messageObject : messages) {
// 处理单一请求
processOneRequest(messageObject, connection);
}
} else {
processOneRequest(message, connection);
}
}
private void processOneRequest(final Object message, final Connection connection){
BaseRequest request = (BaseRequest) message;
ServerHandler<BaseRequest> serverhandler = (ServerHandler<BaseRequest>) request.getServerHandler();
if (serverhandler.getExecutor(request) == null) {
BaseResponse responseWrapper = serverhandle.handleRequest(request, connection);
connection.writeResponseToChannel(responseWrapper);
} else {
// 启动线程,处理请求
serverhandler.getExecutor(request).execute(new HandlerRunnable(connection, request, serverhandler));
}
RemotingRuntimeInfoHolder.getInstance().increaseCountProcessRequests();
}
private static class HandlerRunnable implements Runnable {
...
public void run() {
...
// 调用RPCServerHandler.handleRequest根据相应RPC协议处理请求
BaseResponse responseWrapper = serverHandler.handleRequest(message, connection);
...
connection.writeResponseToChannel(responseWrapper);
}
}
调用RPCServerHandler.handleRequest根据相应RPC协议处理请求:
/* ProviderProcessor.java */
public void handleRequest(final HSFRequest hsfRequest, final ServerOutput outputStream) {
final String serviceUniqueName = hsfRequest.getTargetServiceUniqueName();
final String methodName = hsfRequest.getMethodName();
final Connection connection = outputStream.getConnection();
final String clientIp = connection.getPeerIp();
...
ProviderServiceModel psm = ApplicationModel.instance().getProvidedServiceModel(serviceUniqueName);
HSFResponse hsfResponse = null;
if (hsfResponse == null) {
// 序列化协议
final byte protocol = hsfRequest.getSerializeType();
if (TRConstraints.TOP_SERIALIZE == protocol){
hsfResponse = handleTopRequest(hsfRequest, serviceUniqueName, methodName, outputStream);
} else {
hsfResponse = handleRequest0(hsfRequest, outputStream, null, null);
}
ThreadLocalUtil.remove();
if (hsfResponse != null) {
handleEagleEyeResponseSend(hsfResponse);
int responseSize = outputStream.writeHSFResponse(hsfResponse);
logService.responseSize(responseSize);
}
}
}
private HSFResponse handleRequest0(final HSFRequest hsfRequest, final ServerOutput output,
final Object[] originalArgs, final String[] originalArgTypes) {
final String serviceUniqueName = hsfRequest.getTargetServiceUniqueName();
final ProviderServiceModel serviceModel = applicationModel.getProvidedServiceModel(serviceUniqueName);
final HSFResponse hsfResponse = new HSFResponse();
final String remoteHost = output.getConnection().getPeerIp();
...
if (null == serviceModel){
hsfResponse.setErrorMsg("[HSF-Provider] 未找到业务服务,服务名称:" + serviceUniqueName);
return hsfResponse;
}
...
ClassLoader tcl = Thread.currentThread().getContextClassLoader();
ProviderMethodModel methodModel = null;
// 获取方法名
String methodName = hsfRequest.getMethodName();
...
if (needInvoke) {
// 获得方法参数
Object[] methodArgs = hsfRequest.getMethodArgs();
// 调用业务应用时,切换为应用Class Loader
Thread.currentThread().setContextClassLoader(serviceModel.getMetadata().getServicePojoClassLoader());
// 通过动态代理得到结果
Object appResp = workerMethod.invoke(serviceModel.getServiceInstance(), methodArgs);
...
hsfResponse.setAppResponse(appResp);
retobj = appResp;
}
}
返回结果为:
/* NettyConnection.java */
public void writeResponseToChannel(final BaseResponse response) {
if (response != null) {
ChannelFuture wf = channel.writeAndFlush(response);
wf.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()){
LOGGER.error();
}
if(!channel.isActive()) {
channel.close();
}
}
});
}
}