那些你以为很高大尚的组件其实也都是从无到有迭代出来了,就算没有造轮子的时间,也要有造轮子的💕。
短链接服务
短链接服务主要是加密和缩短原来链接,同时也可以统计短链接访问的一些情况。
生成思路:
1、将原链接存入表,获得long类型的id,
优化点:如果mysql不可用时候,可直接在redis获取自增的id, 然后调用方法获取到短链唯一字符串,
防止两种方式生产的短链冲突,在redis生产的短链字符串加上一个保留字符串(Z),这个字符串
是调用Long.toString或者parseRadix都会带有的,等到mysql正常时候,再将缓存的数据回写
到mysql里面保证落地数据一致。
2、掉用方法Long.toString(shortLinkId, 36)获得短链唯一性字符串
优化点,如果是36进制,7位置长度的话 36^6=78亿左右数据,最多可以存这么多条数据,可以试用一下的61进制
private final static char[] digits = {'0','1', '2', '3', '4', '5', '6', '7', '8',
'9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y',
'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y'};
//求余数求商 10进制转61进制
public static String parseRadix(long origin){
char[] buf = new char[61];
int charPos = 60;
while (origin >= 61) {
buf[charPos--] = digits[(int)((origin % 61))];
origin = origin / 61;
}
buf[charPos] = digits[(int)(origin)];
return new String(buf, charPos, (61 - charPos));
}
3、根据shortLinkId更新原来表,将短链字符串回写表
4、将短链字符串和原链接的对应关系写入缓存系统,redis或者tail
查询思路:
1、根据短链字符串先查询缓存,没有命中缓存再查询表,然后再塞入缓存。
统一电话加密服务
背景
对系统存储的等敏感信息进行脱敏,防止从技术人员内部造成的敏感信息泄漏问题
系统目标
目标:
手机号支撑能力:3亿 -> 15亿 -> 30亿
按业务划分,各业务互相不影响(泳道)
SLA标准:
可用性:99.999%
TP95:1.3ms、TP99:2ms
业务QPS:6w
核心流程
短信服务
相关概念
短信一般都会有模板的概念,模板可以有申请、修改、审核相关流程,模板控制了短信整体格式,方便后期统计和维护。
模板都会有权重的概念,因为短信不是24小时发送的,如果大半夜给用户发短信就是SB了,所以需要设计权重。
短信内容字数限制为64字,超过64字会自动分成多条发送。
短信又分为功能短信和营销短信:
营销短信需要支持"退订回复TD",同时营销短信发送频次限制:单个手机每天至多接收一天营销短信
插入短信
当业务方新增一条短信记录时候,服务端需要一系列检验。
1、模板是否启用
2、短信里面的链接是否是短链接
3、短信内容经过诚信服务是否有敏感词语
4、手机号码是否是黑名单
5、如果是营销短信检验发送次数
6、生成分布式unionId(可以根据SnowFlake算法)
经历过以上6步骤,短信的对象算是构造完成了,但是短信模板有优先级,优先级高的肯定是要立即发送的,比如登录
获取验证码这种,那么这种的一般是先将短信放到队列,然后再多线程消费队列去,再调用短信提供方接口,接着再将记录插入
数据库数据。如果像订单购买成功这种优先级比较低的,可以先将数据放到db里面,或者先放到kafka里面,然后消费kafka再放到db里面
,然后再启用多个线程,去发送这些短信。
对于放入db或者放入kafka,可以根据权重,比如设计一个随机数,考虑到可用性,比如放入db时候,如果这时候db不可用,就优先放入kafka。
发消息的时候,需要考虑重复发送的问题,比如相同的手机号+内容,如果30秒内重复发送就丢弃发送请求
核心点
核心功能就是,队列+线程
缓存队列都是有大小的,如果队列满了需要怎么办,如果系统重启队列里面还有内容怎么办。
如果多线程查询db里面待发送的短信,每个线程都有可能查到重复的内容,对于重复的需要怎么处理,这些都是需要待考虑的问题。
crane定时任务
定时任务一般分两种,一种是去中心化,一种是中心化。
去中心化的话,每个客户端有能力执行任务,多个客户端只有master才有执行job的权力,可以通过ZK的选举
功能实现Leader功能
中心化的话就是,服务端会记录下任务情况,等到任务该执行的时候,再随机选择客户端去执行。
去中心化和中心化各有优缺点,
去中心化好处在于只是依赖ZK的选举,但是客户端必须要严格控制幂等,同时也要考虑如果一个任务被两台服务器同时执行的情况。
中心化好处就是不依赖选举,但是依赖服务端去下发任务,如果任务很多的话会导致服务端非常臃肿,同时需要考虑服务端高可用的问题
必须要支持分布式。
corn表达式记录
字段:
秒(Seconds) 分(Minutes) 小时(Hours) 日期(DayofMonth) 月份(Month) 星期(DayofWeek) 年(可选,留空)(Year)
字符意义
*:表示匹配该域的任意值,假如在Minutes域使用*, 即表示每分钟都会触发事件
?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。用的比较少
-:表示范围。例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次。用的也比较少
/:表示起始时间开始触发,然后每隔固定时间触发一次。
例如在Minutes域使用5/20,则意味着从5分钟开始,20分钟触发一次,而25,45等分别触发一次.
0 0/30 * * * ? ,则意味着每隔30分钟执行一次,从第0分钟开始,然后是30分钟 然后是60分钟
常用表达式例子:
每天中午12点触发 :0 0 12 * * ?
每天上午10:15触发 :0 15 10 * * ?
周一到周五每天上午10:15触发 :0 15 10 ? * MON-FRI
朝九晚五工作时间内每半小时 : 0 0/30 9-17 * * ?
朝九晚五工作时间内每半小时 : 0 0/30 * * * ?
每一小时执行一次: 0 0 * * * ? 0 0 0/1 * * ?
docker-job
有时候我们的任务执行频率不是特别高,比如每个6小时执行一次,对于这种任务,我们可以利用docker进行部署执行。
当任务执行后docker机器销毁也不会占用机器
进程内job
对于执行频率比较高的,比如每隔5分钟执行一次就适合在进程内进行部署
RPC
一、RPC框架的价值
RPC的全称是RemoteProcedureCall,即远程过程调用。
落地微服务的基础
解决了分布式系统之间相互通信的问题
屏蔽了服务与服务之间调用的技术细节,让我们像调用本地代码一样访问远程服务
二、RPC关键词
2.1 服务注册与发现
服务动态上线与下线
2.2 软负载均衡策略
替代F5,常见有轮询法、加权轮询法、随机法、最小连接数法
2.3 服务调用链跟踪
全链路日志跟踪监控
2.4 服务监控与统计
2.5 服务编排
2.6. 服务权限控制/黑白名单
核心服务需要申请才可以访问
2.7 服务依赖关系
服务下线或者强制升级,如何告知上下游
2.8 服务分组、版本、重试 、降级、限流、容错
分组:提供接口多实现功能
版本号:当一个接口的实现,出现不兼容升级时或者处理逻辑变化时候,可以用版本号过渡
重试:防止服务抖动
2.9 SPI机制
预留可扩展接口
2.10 服务泳道
支持一个服务注册到不同的环境,这样就可以避免多版本并行提测问题。
2.11 服务region
服务多区域机房的情况下,相同区域的client优先调用相同区域的server
三、递进手写RPC
最简单的rpc流程图
3.1 入门级别
参考dubbo作者梁飞代码
public interface HelloService {
String hello(String name);
}
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "Hello " + name;
}
}
public class RpcProvider {
public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
RpcFramework.export(service, 1234);
}
}
public class RpcConsumer {
public static void main(String[] args) throws Exception {
HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
for (int i = 0; i < Integer.MAX_VALUE; i ++) {
String hello = service.hello("World" + i);
System.out.println(hello);
Thread.sleep(1000);
}
}
}
public class RpcFramework {
/**
* 暴露服务
*
* @param service 服务实现
* @param port 服务端口
* @throws Exception
*/
public static void export(final Object service, int port) throws Exception {
if (service == null)
throw new IllegalArgumentException("service instance == null");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("Invalid port " + port);
System.out.println("Export service " + service.getClass().getName() + " on port " + port);
ServerSocket server = new ServerSocket(port);
for (; ; ) {
try {
final Socket socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
try {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Method method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 引用服务
*
* @param <T> 接口泛型
* @param interfaceClass 接口类型
* @param host 服务器主机名
* @param port 服务器端口
* @return 远程服务
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
if (interfaceClass == null)
throw new IllegalArgumentException("Interface class == null");
if (!interfaceClass.isInterface())
throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
if (host == null || host.length() == 0)
throw new IllegalArgumentException("Host == null!");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("Invalid port " + port);
System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}
}
如果服务处理很慢,同时N个客户端调用,会有什么问题?可以做怎么改进。
3.2 异步调用
rpc请求模式有同步模式、无返回值模式、异步模式,异步模式设计比较巧妙
如果要实现异步,那么client端在调用的时候,返回值肯定是要是future类型,但如果架包里面的DTO对象返回类型定义成Future<xx>这种格式,
后期如果想改成同步调用,就又要改变DTO类型 ,rpc请求模式与DTO相互耦合肯定违背我们的设计。
3.2.1 设计思路:
client每次请求生成唯一requestId,server端返回的时候带上requestId与返回值
3.2.2 核心代码:
client设计:
public class ConsumerClient {
public static <T> T getProxyClass(Class<T> interfaceClass, RpcRequest rpcRequest) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream oos = null;
BufferedReader bufferedInputStream = null;
try {
socket = new Socket("localhost", 8081);
oos = new ObjectOutputStream(socket.getOutputStream());
String requestId = UUID.randomUUID().toString();
rpcRequest.setRequestId(requestId);
rpcRequest.setMethodName(method.getName());
rpcRequest.setGetParameterTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
oos.writeObject(rpcRequest);
oos.flush();
if (rpcRequest.isAnsyc()) {
//requestid 放到threadLocal
RpcContext.setInvokeMethodStyle(requestId);
Socket finalSocket = socket;
new Thread() {
@Override
public void run() {
try {
ObjectInputStream ois = new ObjectInputStream(finalSocket.getInputStream());
RpcResponse rpcResponse = (RpcResponse) ois.readObject();
if (RpcContext.checkAnsyc(rpcResponse.getRequestId())) {
RpcContext.putAnsyResult(rpcResponse.getRequestId(), rpcResponse);
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}.start();
} else {
bufferedInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "utf-8"));
return bufferedInputStream.readLine();
}
return null;
} catch (Exception e) {
log.error("", e);
} finally {
}
return null;
}
});
}
public static void main(String[] args) throws Exception, InterruptedException {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setAnsyc(true);
TestService testService = ConsumerClient.getProxyClass(TestService.class, rpcRequest);
long beginTime = System.currentTimeMillis();
testService.testMethod("gaogao1");
Future future1 = RpcContext.getFuture();
testService.testMethod("gaogao2");
Future future2 = RpcContext.getFuture();
log.info("------------get result-------");
RpcResponse response1 = (RpcResponse) future1.get();
System.out.println(response1.getResult());
RpcResponse response2 = (RpcResponse) future2.get();
long endTime = System.currentTimeMillis();
log.info("cost:{}", endTime - beginTime);
System.out.println(response2.getResult());
}
}
contentx设计:
public class RpcContext {
//存放requestId
public static ThreadLocal<String> requestIdthreadLocal = new ThreadLocal<>();
//调用方式
public static Map<String, Boolean> invokeMethodStyle = new ConcurrentHashMap<>();
//缓存future
public static Map<String, RpcResultFuture> futureMap = new ConcurrentHashMap<>();
/**
* 返回值放到map里面
* @param requestId
* @param rpcResponse
*/
public static void putAnsyResult(String requestId, RpcResponse rpcResponse) {
RpcResultFuture future = futureMap.get(requestId);
future.setResult(requestId, rpcResponse);
}
/**
* 获取代理future
*
* @return
*/
public static Future getFuture() throws Exception {
if(requestIdthreadLocal.get()==null){
throw new Exception("no ansyc method");
}
RpcResultFuture future = new RpcResultFuture();
//存放到缓存,这样当返回值回来时候就可以用到
futureMap.put(requestIdthreadLocal.get(), future);
//防止随便get
requestIdthreadLocal.remove();
return future;
}
public static void setInvokeMethodStyle(String requestId) {
invokeMethodStyle.put(requestId, true);
requestIdthreadLocal.set(requestId);
}
public static boolean checkAnsyc(String requestId) {
return invokeMethodStyle.get(requestId) == null ? false : true;
}
}
future设计
public class RpcResultFuture implements Future {
private String requestId;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private Object object = null;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
countDownLatch.await();
//clear缓存
RpcContext.futureMap.remove(requestId);
return object;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
countDownLatch.await(timeout, unit);
return object;
}
public void setResult(String requestId, RpcResponse rpcResponse) {
this.requestId = requestId;
object = rpcResponse;
countDownLatch.countDown();
}
}
3.3 基于netty封装
bio版本qps非常低,同时还会有粘包拆包问题
3.3.1 增加功能
自定义传输协议
单次请求体由以下两部分组成:
1、数据byte[]总长度+4个占位符长度
2、数据data的byte[]
自定义序列化
提高性能
增加服务请求超时时间
server服务注解注册
3.3.2 核心代码
client:
@Slf4j
public class ClientEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
try {
// byte[] responseByte = KryoUtil.serialiaztion(msg);
byte[] responseByte = Tool.serialize(msg);
int totalLength = 4 + responseByte.length;
out.writeInt(totalLength);
out.writeBytes(responseByte);
} catch (Exception e) {
log.error("", e);
}
}
}
public class ClientDecoder extends ByteToMessageDecoder {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
//最小可读长度要大于4
if (readableBytes < 4) {
return;
}
//标记当前指针位置
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
//如果数据可读长度小于发送来读数据包总长度,当然不能继续读
if (readableBytes < dataLength) {
//恢复读指针到原来的位置
in.resetReaderIndex();
return;
}
byte[] bytes = new byte[dataLength - 4];
in.readBytes(bytes);
//反序列请求
out.add(Tool.deserialize(bytes, RpcResponse.class));
}
}
public class RpcClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse) msg;
ClientProvider.putRpcResponse(rpcResponse);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
public class RpcInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcInvokeFutureResult rpcInvokeFutureResult = null;
RpcResponse response = null;
try {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//获取future返回值
rpcInvokeFutureResult = ClientProvider.sendAndGetRpcInvokeFutureResult(rpcRequest, rpcRequest.getRequestId());
response = rpcInvokeFutureResult.get(rpcRequest.getTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
log.error("", e);
}
if (response != null) {
return response.getResponse();
}
return null;
}
}
public class ClientProvider {
private static Map<String, RpcInvokeFutureResult> rpcResultMap = new ConcurrentHashMap<>();
private static ChannelFuture channelFuture = null;
static {
init();
}
private static void init() {
startNettyClient();
}
private static void startNettyClient() {
Bootstrap clentBootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
clentBootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
/**自定义协议解决粘包 begin**/
ch.pipeline().addLast(new ClientEncoder());
ch.pipeline().addLast(new ClientDecoder());
/**自定义协议解决粘包 end**/
ch.pipeline().addLast(new RpcClientHandler());
}
}).option(ChannelOption.SO_KEEPALIVE, true);
try {
channelFuture = clentBootstrap.connect("127.0.0.1", 8000).sync();
} catch (InterruptedException e) {
log.error("init netty client error", e);
}
}
public static ChannelFuture getChannelFuture() {
return channelFuture;
}
/**
* 发送消息和返回future结果
*
* @param
* @param requestId
* @return
*/
public static RpcInvokeFutureResult sendAndGetRpcInvokeFutureResult(RpcRequest rpcRequest, String requestId) {
RpcInvokeFutureResult rpcInvokeFutureResult = new RpcInvokeFutureResult(requestId,rpcResultMap);
getChannelFuture().channel().writeAndFlush(rpcRequest);
ClientProvider.rpcResultMap.put(requestId, rpcInvokeFutureResult);
return rpcInvokeFutureResult;
}
public static void putRpcResponse(RpcResponse response) {
RpcInvokeFutureResult rpcInvokeFutureResult = rpcResultMap.get(response.getRequestId());
if (rpcInvokeFutureResult != null) {
rpcInvokeFutureResult.put(response);
}
}
}
server:
public class RpcServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest rpcRequest = (RpcRequest) msg;
RpcResponse response = new RpcResponse();
response.setRequestId(rpcRequest.getRequestId());
String requestClassName = rpcRequest.getClassName();
Object object = ServerProvider.cacheSericeMap.get(requestClassName);
try {
Object result = object.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()).invoke(object, rpcRequest.getParameters());
response.setResponse(result);
} catch (Throwable e) {
response.setExcetion(e.getMessage());
log.error("", e);
}
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
public class ServerProvider {
public static Map<String, Object> cacheSericeMap = new HashMap<>();
private static AtomicBoolean initStatus = new AtomicBoolean(false);
public static void init() {
if (initStatus.compareAndSet(false, true)) {
scannAndCacheService();
startNettyServer();
} else {
throw new IllegalStateException("can not repeat init");
}
}
private static void startNettyServer() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
serverBootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
/**自定义协议解决粘包 begin**/
ch.pipeline().addLast(new ServerEncoder());
ch.pipeline().addLast(new ServerDecoder());
/**自定义协议解决粘包 end**/
ch.pipeline().addLast(new RpcServerHandler());
}
}).bind(8000);
}
public static void main(String[] args) {
init();
}
/**
* 扫描指定目录文件
*/
private static void scannAndCacheService() {
try {
log.info("scann rpc class begin");
ResourcePatternResolver rp = new PathMatchingResourcePatternResolver();
Resource[] resources = rp.getResources("classpath:com/ggj/java/rpc/demo/netty/first/server/service/imp/*.class");
if (resources == null || resources.length == 0) {
throw new IllegalArgumentException("scann package error");
}
for (Resource resource : resources) {
String className = resource.getFile().getPath().split("classes\\/")[1].replaceAll("\\/", ".").replaceAll(".class", "");
Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
if (clazz.getAnnotation(RpcService.class) != null) {
Object object = clazz.newInstance();
cacheSericeMap.putIfAbsent(object.getClass().getInterfaces()[0].getName(), object);
}
}
log.info("scann rpc class end");
} catch (Exception e) {
log.error("", e);
}
}
}
3.4 基于zk实现注册中心
服务的注册与发现是rpc框架必备的一个功能
3.4.1 难点思考
zk上的节点树结构如何设计
服务端的信息都写到zk里面,那么zk的结构如何设计?
--ggjRPC(name space)
----server
-------com.ggj.java.rpc.demo.netty.first.server.service.AppleService (192.168.8.103:4081,192.168.8.103:4082)
-------com.ggj.java.rpc.demo.netty.first.server.service.OrangeService (192.168.8.103:4081,192.168.8.103:4082)
----app
-------oderService (appid)
----weight
server是用来记录服务对应的机器ip,多个ip以逗号分割,例如(192.168.8.103:4081,192.168.8.103:4082),
app 是用来记录提供服务的名称(appid)
weight 是用来记录提供服务机器的权重(负载均衡用到)
如果换成下面的方式会有什么问题?
--ggjRPC(name space)
----server
-------orderService(192.168.8.103:4081,192.168.8.103:4082)
-------userService(192.168.8.103:4083,192.168.8.103:4084)
----app
-------usezkService
----weight
服务注册与发现
场景一:
假设某个服务目前只部署在某一台机器serverA(192.168.1.1)上 , client请求时候可以通过zk直接获取到serverA的ip,然后执行后续操作
,如果这时候服务又部署了一台机器serverB(192.168.1.2),client如何感知到新上线的节点serverB。
场景二:
假设某个服务,目前只部署在serverA(192.168.1.1)和serverb(192.168.1.2)上 , client请求时候可以通过zk直接获取到serverA和serverA的ip,然后执行后续操作
,如果这时候下线了serverB(192.168.1.2),client如何感知到server b下线,同时切走请求流量到服务A上
client端如何高性能的发现服务
如果每次请求都要从zk里面拉去最新的节点,接口qps肯定会下降。
className -> ipList
ip-> 多个链接 (删除机器时候 客户端可以自动发现并且删除链接)
ip->classNameSet(增加机器时候 客户端可以自动发现并且添加链接)
3.4.2 核心代码
zk注册中心:
public class RegisterManager {
private static Lock lock = new ReentrantLock();
private static RegisterManager registerManager = new RegisterManager();
public Register getRegister() {
return register;
}
private Register register;
private RegisterManager() {
}
/**
* 服务端调用
*
* @return
*/
public static RegisterManager getInstance() {
registerManager.init(null);
return registerManager;
}
/**
* 客户端调用
*
* @return
*/
public static RegisterManager getClientInstance() {
registerManager.init(new ZookeeperDataWatcher());
return registerManager;
}
/**
* 初始化,需要加锁防止初始化多次
*
* @param zookeeperDataWatcher
*/
private void init(ZookeeperDataWatcher zookeeperDataWatcher) {
if (register == null) {
lock.lock();
try {
if (register == null) {
register = new ZKRegister();
register.initListener(zookeeperDataWatcher);
}
} catch (Exception e) {
log.error("init error", e);
} finally {
lock.unlock();
}
}
}
}
public class ZKRegister implements Register {
private AtomicBoolean initRegister = new AtomicBoolean();
private CuratorClient zkClient;
public ZKRegister() {
zkClient = new CuratorClient();
}
@Override
public String getName() {
return null;
}
@Override
public String getServiceAddress(String serviceName) throws Exception {
return zkClient.getServiceAddress(serviceName);
}
@Override
public void registerService(String serviceName, String serviceAddress) throws Exception {
ZKPathConfig zkPathConfig = new ZKPathConfig(serviceName, serviceAddress);
if (initRegister.compareAndSet(false, true)) {
initRegisterInfo(zkPathConfig);
}
zkClient.registerService(zkPathConfig, serviceAddress);
}
/**
* 注册
* 机器信息
* 注册权重
*
* @param zkPathConfig
* @throws Exception
*/
private void initRegisterInfo(ZKPathConfig zkPathConfig) throws Exception {
zkClient.registerServerApp(zkPathConfig);
zkClient.registerWeight(zkPathConfig);
}
@Override
public void unregisterService(String serviceName, String serviceAddress) throws Exception {
ZKPathConfig zkPathConfig = new ZKPathConfig(serviceName, serviceName);
zkClient.unregisterService(zkPathConfig, serviceAddress);
}
@Override
public boolean unregisterServerApp(String serviceAddress) {
return false;
}
@Override
public void initListener(ZookeeperDataWatcher zookeeperDataWatcher) {
if (zookeeperDataWatcher != null) {
zkClient.initListener(zookeeperDataWatcher);
}
}
}
public class CuratorClient {
private static CuratorFramework client = null;
public CuratorClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(Constants.DEFAULT_CONNECT_URL, retryPolicy);
client = CuratorFrameworkFactory.builder().namespace(Constants.NAME_SPACE).retryPolicy(retryPolicy).connectString(Constants.DEFAULT_CONNECT_URL).build();
client.start();
}
private void setStringValue() {
}
/**
* 并发注册会有问题
*
* @param zkPathConfig
* @param serviceAddress
* @throws Exception
*/
public void registerService(ZKPathConfig zkPathConfig, String serviceAddress) throws Exception {
try {
if (CuratorUtil.checkExists(client, zkPathConfig.getServicePath())) {
String resulst = CuratorUtil.getStringData(client, zkPathConfig.getServicePath());
if (StringUtils.isEmpty(resulst)) {
CuratorUtil.update(client, zkPathConfig.getServicePath(), serviceAddress);
} else {
String[] adressArray = resulst.split("\\,");
List<String> adressList = new ArrayList();
for (String registeredAdress : adressArray) {
//已注册就不需要重复注册
if (!registeredAdress.equals(serviceAddress)) {
adressList.add(registeredAdress.trim());
}
}
adressList.add(serviceAddress);
CuratorUtil.update(client, zkPathConfig.getServicePath(), org.apache.commons.lang.StringUtils.join(adressList, ","));
}
} else {
CuratorUtil.create(client, zkPathConfig.getServicePath(), serviceAddress);
}
} catch (Exception e) {
throw e;
}
}
/**
* 注册应用信息
*
* @param zkPathConfig
* @throws Exception
*/
public void registerServerApp(ZKPathConfig zkPathConfig) throws Exception {
CuratorUtil.create(client, zkPathConfig.getAppPath(), zkPathConfig.getServiceAddress(), true);
}
/**
* 注册权重
*
* @param zkPathConfig
* @throws Exception
*/
public void registerWeight(ZKPathConfig zkPathConfig) throws Exception {
CuratorUtil.create(client, zkPathConfig.getWeightPath(), String.valueOf(Constants.DEFAULT_ONLINE_WEIGHT), true);
}
/**
* 下线服务机器
*
* @param zkPathConfig
* @param serviceAddress
* @throws Exception
*/
public void unregisterService(ZKPathConfig zkPathConfig, String serviceAddress) throws Exception {
if (CuratorUtil.checkExists(client, zkPathConfig.getServicePath())) {
String resulst = CuratorUtil.getStringData(client, zkPathConfig.getServicePath());
if (!StringUtils.isEmpty(resulst)) {
String[] adressArray = resulst.split("\\,");
List<String> adressList = new ArrayList();
for (String adress : adressArray) {
adressList.add(adress.trim());
}
//remove已经注册过的
if (adressList.contains(serviceAddress)) {
adressList.remove(serviceAddress);
}
CuratorUtil.update(client, zkPathConfig.getServicePath(), org.apache.commons.lang.StringUtils.join(adressList, ","));
}
}
}
/**
* 获取服务端提供地址
* @param serviceName
* @return
* @throws Exception
*/
public String getServiceAddress(String serviceName) throws Exception {
ZKPathConfig zkPathConfig=new ZKPathConfig(serviceName,null);
if (CuratorUtil.checkExists(client, zkPathConfig.getServicePath())) {
return CuratorUtil.getStringDataAndWatch(client, zkPathConfig.getServicePath());
}
throw new Exception(serviceName+":has no server");
}
public void initListener(ZookeeperDataWatcher zookeeperDataWatcher) {
client.getCuratorListenable().addListener(zookeeperDataWatcher);
}
}
/**
* 监听zk节点,下线时候变动
*
* @author gaoguangjin
*/
@Slf4j
public class ZookeeperDataWatcher implements CuratorListener {
/**
* className -> ipList
* ip-> 多个链接 (删除机器时候 客户端可以自动发现并且删除链接)
* ip->classNameSet(增加机器时候 客户端可以自动发现并且添加链接)
* @param client
* @param event
* @throws Exception
*/
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getType() == CuratorEventType.WATCHED && event.getPath() != null) {
String path = event.getPath();
String className = path.split("\\/")[2];
String iplist = CuratorUtil.getStringDataAndWatch(client, path);
if (StringUtils.isEmpty(iplist)) {
return;
}
String[] iparrays = iplist.split("\\,");
List<String> nowIpList = new ArrayList<>();
for (String iparray : iparrays) {
nowIpList.add(iparray);
}
List<String> oldIpList = ClientProvider.getClientConnectMap().get(className).getIpList();
//以前这个class还没创建过请求
if (CollectionUtils.isEmpty(oldIpList)) {
return;
}
//判断扩容还是缩容
if (nowIpList.size() > oldIpList.size()) {
log.info("扩容,nowIplist={},oldIplist={}", nowIpList, oldIpList);
nowIpList.removeAll(oldIpList);
for (String nowIp : nowIpList) {
ClientProvider.initClient(nowIp);
ClientProvider.getClientConnectMap().get(className).getIpList().add(nowIp);
}
} else {
log.info("锁容,nowIplist={},oldIplist={}", nowIpList, oldIpList);
List<String> tempList = new ArrayList();
tempList.addAll(oldIpList);
tempList.removeAll(nowIpList);
for (String oldIp : tempList) {
ClientProvider.getClientConnectMap().get(className).getIpList().remove(oldIp);
List<ChannelFuture> futureList = ClientProvider.getChannelFutureConnectMap().get(oldIp);
for (ChannelFuture channelFuture : futureList) {
channelFuture.channel().closeFuture();
}
ClientProvider.getChannelFutureConnectMap().remove(oldIp);
}
}
}
}
}
client
public class RpcInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcInvokeFutureResult rpcInvokeFutureResult = null;
RpcResponse response = null;
RpcRequest rpcRequest = new RpcRequest();
try {
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//获取future返回值
rpcInvokeFutureResult = ClientProvider.sendAndGetRpcInvokeFutureResult(rpcRequest, rpcRequest.getRequestId());
response = rpcInvokeFutureResult.get(rpcRequest.getTimeOut(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("", e);
throw e;
}
//超时
if(response==null){
throw new Exception("rpc "+method.getDeclaringClass().getName()+" timeout");
}
return response.getResponse();
}
}
/**
* @author gaoguangjin
*/
@Slf4j
public class ClientProvider {
@Getter
private static Map<String, RpcInvokeFutureResult> rpcResultMap = new ConcurrentHashMap<>();
@Getter
//className -> ipList
private static Map<String, ChannelFutureHolder> clientConnectMap = new ConcurrentHashMap();
@Getter
// ip-> 多个链接
private static Map<String, List<ChannelFuture>> channelFutureConnectMap = new ConcurrentHashMap();
private static Lock lock = new ReentrantLock();
public static ChannelFuture getNettyClient(String ipStrs, String className) throws Exception {
lock.lock();
try {
if (clientConnectMap.containsKey(className)) {
return clientConnectMap.get(className).selectClient();
}
String[] ipArray = ipStrs.split("\\,");
List<String> ipList = new ArrayList<>();
for (String iparray : ipArray) {
initClient(iparray);
ipList.add(iparray);
}
clientConnectMap.put(className, new ChannelFutureHolder(ipList));
} catch (Exception e) {
log.error("init netty client error", e);
throw e;
} finally {
lock.unlock();
}
return clientConnectMap.get(className).selectClient();
}
public static List<ChannelFuture> initClient(String ip) throws InterruptedException {
lock.lock();
List<ChannelFuture> futureList = new ArrayList<>();
try {
String[] ipArray = ip.split("\\:");
if (channelFutureConnectMap.containsKey(ip)) {
return channelFutureConnectMap.get(ip);
}
for (int i = 0; i < Constants.CLIENT_INIT_CONNECTION_SIZE; i++) {
Bootstrap clentBootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
clentBootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientEncoder());
ch.pipeline().addLast(new ClientDecoder());
ch.pipeline().addLast(new RpcClientHandler());
}
}).option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = clentBootstrap.connect(ipArray[0], Integer.parseInt(ipArray[1])).sync();
futureList.add(channelFuture);
}
channelFutureConnectMap.put(ip, futureList);
} catch (Exception e) {
log.error("init netty client error", e);
throw e;
} finally {
lock.unlock();
}
return futureList;
}
public static Map<String, ChannelFutureHolder> getClientConnectMap() {
return clientConnectMap;
}
/**
* 1、缓存map 是否存在连接
* 如果存在 判断是否active
* 如果不active 就切换到其他到
*
* @param rpcRequest
* @return
* @throws Exception
*/
public static ChannelFuture getChannelFuture(RpcRequest rpcRequest) throws Exception {
ChannelFuture channelFuture = null;
String className = rpcRequest.getClassName();
if (clientConnectMap.containsKey(className)) {
channelFuture = clientConnectMap.get(className).selectClient();
if (channelFuture != null) {
return channelFuture;
}
}
String iplist = getConnectIp(className);
if (StringUtils.isEmpty(iplist)) {
throw new Exception(className + " has no availiable server");
}
channelFuture = getNettyClient(iplist, className);
return channelFuture;
}
/**
* 获取服务端ip
*
* @param className
* @return
* @throws Exception
*/
private static String getConnectIp(String className) throws Exception {
return RegisterManager.getClientInstance().getRegister().getServiceAddress(className);
}
/**
* 发送消息和返回future结果
*
* @param
* @param requestId
* @return
*/
public static RpcInvokeFutureResult sendAndGetRpcInvokeFutureResult(RpcRequest rpcRequest, String requestId) throws Exception {
RpcInvokeFutureResult rpcInvokeFutureResult = new RpcInvokeFutureResult(requestId, rpcResultMap);
Channel channel = getChannelFuture(rpcRequest).channel();
ChannelFuture channelFuture = channel.writeAndFlush(rpcRequest);
//监听resposne返回结果,如果服务端下线会收到通知
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture rfuture) throws Exception {
if (!rfuture.isSuccess()) {
log.info("rpc rsponse unsuccessful");
rpcInvokeFutureResult.setCause(rfuture.cause());
}
}
});
ClientProvider.rpcResultMap.put(requestId, rpcInvokeFutureResult);
return rpcInvokeFutureResult;
}
public static void putRpcResponse(RpcResponse response) {
RpcInvokeFutureResult rpcInvokeFutureResult = rpcResultMap.get(response.getRequestId());
if (rpcInvokeFutureResult != null) {
rpcInvokeFutureResult.put(response);
}
}
@Data
public static class ChannelFutureHolder {
private AtomicInteger atomicInteger = new AtomicInteger(1);
private String ip;
@Getter
List<String> ipList;
public ChannelFutureHolder(List<String> ipList) {
this.ipList = ipList;
}
/**
* 随机选服务端ip
* 随机选服务端的连接
*
* @return
*/
public ChannelFuture selectClient() {
String randomIp = ipList.get(new Random().nextInt(ipList.size()));
//System.out.println("invoke ip="+randomIp);
List<ChannelFuture> channelFutureList = channelFutureConnectMap.get(randomIp);
if (CollectionUtils.isEmpty(channelFutureList)) {
return null;
}
return channelFutureList.get(atomicInteger.getAndIncrement() % Constants.CLIENT_INIT_CONNECTION_SIZE);
}
}
}
server端:
添加了 shutdownhook
/**
* 测试时候需要设置 -Dport=4081
*
* @author gaoguangjin
*/
@Slf4j
public class ServerProvider {
public static Map<String, Object> cacheServiceMap = new HashMap<>();
private static RegisterManager registerManager = RegisterManager.getInstance();
private static AtomicBoolean initStatus = new AtomicBoolean(false);
public static void init() {
System.setProperty(Constants.APP_NAME_KEY, "usezkService");
if (initStatus.compareAndSet(false, true)) {
try {
scannAndCacheService();
startNettyServer();
registerServerService();
addShutDownHook();
} catch (Exception e) {
log.error("init server error", e);
throw new RuntimeException();
}
} else {
throw new IllegalStateException("can not repeat init");
}
}
private static void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
shutDownServer();
} catch (Exception e) {
log.error("停机hook失败", e);
}
}
});
}
/**
* 下线服务
*
* @throws Exception
*/
private static void shutDownServer() throws Exception {
log.info("excute shutdownserver");
for (String serviceName : cacheServiceMap.keySet()) {
registerManager.getRegister().unregisterService(serviceName, Constants.SERVICE_ADRESS);
}
}
private static void registerServerService() throws Exception {
for (String className : cacheServiceMap.keySet()) {
registerManager.getRegister().registerService(className, Constants.SERVICE_ADRESS);
}
}
private static void startNettyServer() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
serverBootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
/**自定义协议解决粘包 begin**/
ch.pipeline().addLast(new ServerEncoder());
ch.pipeline().addLast(new ServerDecoder());
/**自定义协议解决粘包 end**/
ch.pipeline().addLast(new RpcServerHandler());
}
}).bind(Constants.HOST_ADRESS, Constants.DEFAULT_SERVER_PORT);
}
public static void main(String[] args) {
init();
}
/**
* 扫描指定目录文件
*/
private static void scannAndCacheService() {
try {
log.info("scann rpc class begin");
ResourcePatternResolver rp = new PathMatchingResourcePatternResolver();
Resource[] resources = rp.getResources("classpath:com/ggj/java/rpc/demo/netty/usezk/server/service/imp/*.class");
if (resources == null || resources.length == 0) {
throw new IllegalArgumentException("scann package error");
}
for (Resource resource : resources) {
String className = resource.getFile().getPath().split("classes\\/")[1].replaceAll("\\/", ".").replaceAll(".class", "");
Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
if (clazz.getAnnotation(RpcService.class) != null) {
Object object = clazz.newInstance();
cacheServiceMap.putIfAbsent(object.getClass().getInterfaces()[0].getName(), object);
}
}
log.info("scann rpc class end");
} catch (Exception e) {
log.error("", e);
}
}
}
3.5 基于spring实现
略
四、点评pigeon框架介绍
项目地址:https://github.com/dianping/pigeon
主要特色:
1、除了支持spring schema等配置方式,也支持代码annotation方式发布服务、引用远程服务,并提供原生api接口的用法。(❤️)
2、支持http协议,方便非java应用调用pigeon的服务。(❤️❤️)
3、序列化方式除了hessian,还支持thrift等。(❤️❤️)
4、提供了服务器单机控制台pigeon-console,包含单机服务测试工具。(❤️❤️❤️❤️❤️)
5、创新的客户端路由策略,提供服务预热功能,解决线上流量大的service重启时大量超时的问题。(❤️❤️❤️❤️)
6、记录每个请求的对象大小、返回对象大小等监控信息。(❤️❤️)
7、服务端可对方法设置单独的线程池进行服务隔离,可配置客户端应用的最大并发数进行限流。(❤️❤️❤️❤️)
4.1 核心功能介绍
4.1.1 客户端路由策略
集群策略:
//失败立即抛出异常
public static final String CLUSTER_FAILFAST = "failfast";
//失败自动切换,重试其他服务器
public static final String CLUSTER_FAILOVER = "failover";
//失败忽略异常,返回null
public static final String CLUSTER_FAILSAFE = "failsafe";
//并行调用多个服务,返回调用最快的节点数据
public static final String CLUSTER_FORKING = "forking";
源码分析:
FailfastCluster
/**
* 快速失败
*/
public class FailfastCluster implements Cluster {
private ClientManager clientManager = ClientManager.getInstance();
private static final Logger logger = LoggerLoader.getLogger(FailfastCluster.class);
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
//是否超时重试
boolean timeoutRetry = invokerConfig.isTimeoutRetry();
if (!timeoutRetry) {
//按负载均衡策略获取服务端连接实例
Client remoteClient = clientManager.getClient(invokerConfig, request, null);
invocationContext.setClient(remoteClient);
try {
return handler.handle(invocationContext);
} catch (NetworkException e) {
//网络异常重试
remoteClient = clientManager.getClient(invokerConfig, request, null);
invocationContext.setClient(remoteClient);
return handler.handle(invocationContext);
}
} else {
//重试次数
int retry = invokerConfig.getRetries(invocationContext.getMethodName());
RequestTimeoutException lastError = null;
int maxInvokeTimes = retry + 1;
int invokeTimes = 0;
for (int index = 0; index < maxInvokeTimes; index++) {
Client clientSelected = null;
try {
clientSelected = clientManager.getClient(invokerConfig, request, null);
} catch (ServiceUnavailableException e) {
if (invokeTimes > 0) {
logger.error("Invoke method[" + invocationContext.getMethodName() + "] on service["
+ invokerConfig.getUrl() + "] failed with " + invokeTimes + " times");
throw lastError;
} else {
throw e;
}
}
try {
invokeTimes++;
invocationContext.setClient(clientSelected);
InvocationResponse response = null;
try {
response = handler.handle(invocationContext);
} catch (NetworkException e) {
clientSelected = clientManager.getClient(invokerConfig, request, null);
invocationContext.setClient(clientSelected);
response = handler.handle(invocationContext);
logger.info("Retry while network exception:" + e.getMessage());
}
if (lastError != null) {
logger.warn("Retry method[" + invocationContext.getMethodName() + "] on service["
+ invokerConfig.getUrl() + "] succeed after " + invokeTimes
+ " times, last failed error: " + lastError.getMessage(), lastError);
}
return response;
} catch (RequestTimeoutException e) {
lastError = e;
}
}
if (lastError != null) {
throw lastError;
} else {
throw new RemoteInvocationException(
"Invoke method[" + invocationContext.getMethodName() + "] on service[" + invokerConfig.getUrl()
+ "] failed with " + invokeTimes + " times, last error: "
+ (lastError != null ? lastError.getMessage() : ""),
lastError != null && lastError.getCause() != null ? lastError.getCause() : lastError);
}
}
}
@Override
public String getName() {
return Constants.CLUSTER_FAILFAST;
}
}
FailoverCluster:
/**
* 失败自动切换,重试其他服务器
*/
public class FailoverCluster implements Cluster {
private ClientManager clientManager = ClientManager.getInstance();
private static final Logger logger = LoggerLoader.getLogger(FailoverCluster.class);
@Override
public InvocationResponse invoke(final ServiceInvocationHandler handler, final InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
List<Client> selectedClients = new ArrayList<Client>();
Throwable lastError = null;
int retry = invokerConfig.getRetries(invocationContext.getMethodName());
//最大重试次数
int maxInvokeTimes = retry + 1;
//是否允许超时重试
boolean timeoutRetry = invokerConfig.isTimeoutRetry();
int invokeTimes = 0;
for (int index = 0; index < maxInvokeTimes; index++) {
InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
Client clientSelected = null;
try {
clientSelected = clientManager.getClient(invokerConfig, request, selectedClients);
} catch (ServiceUnavailableException e) {
if (invokeTimes > 0) {
logger.error("Invoke method[" + invocationContext.getMethodName() + "] on service["
+ invokerConfig.getUrl() + "] failed with " + invokeTimes + " times");
throw lastError;
} else {
throw e;
}
}
//记录使用过的集群client
selectedClients.add(clientSelected);
try {
//记录调用次数
invokeTimes++;
invocationContext.setClient(clientSelected);
InvocationResponse response = handler.handle(invocationContext);
if (lastError != null) {
logger.warn(
"Retry method[" + invocationContext.getMethodName() + "] on service["
+ invokerConfig.getUrl() + "] succeed after " + invokeTimes
+ " times, last failed error: " + lastError.getMessage(), lastError);
}
return response;
} catch (Throwable e) {
//超时异常
lastError = e;
if (e instanceof RequestTimeoutException) {
//如果不允许重试 直接抛出异常
if (!timeoutRetry) {
throw e;
}
}
}
}
if (lastError != null) {
logger.error("Invoke method[" + invocationContext.getMethodName() + "] on service["
+ invokerConfig.getUrl() + "] failed with " + invokeTimes + " times");
throw lastError;
} else {
throw new RemoteInvocationException("Invoke method[" + invocationContext.getMethodName() + "] on service["
+ invokerConfig.getUrl() + "] failed with " + invokeTimes + " times, last error: "
+ (lastError != null ? lastError.getMessage() : ""), lastError != null
&& lastError.getCause() != null ? lastError.getCause() : lastError);
}
}
@Override
public String getName() {
return Constants.CLUSTER_FAILOVER;
}
}
FailsafeCluster
/**
* 安全失败策略-失败忽略异常
*/
public class FailsafeCluster implements Cluster {
private ClientManager clientManager = ClientManager.getInstance();
private static final Logger logger = LoggerLoader.getLogger(FailsafeCluster.class);
private static final InvocationResponse NO_RETURN_RESPONSE = InvokerUtils.createNoReturnResponse();
@Override
public InvocationResponse invoke(final ServiceInvocationHandler handler, final InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
try {
Client remoteClient = clientManager.getClient(invokerConfig, request, null);
invocationContext.setClient(remoteClient);
return handler.handle(invocationContext);
} catch (Throwable t) {
logger.error("", t);
//返回空对象
return NO_RETURN_RESPONSE;
}
}
@Override
public String getName() {
return Constants.CLUSTER_FAILSAFE;
}
}
ForkingCluster:
/**
* 并行调用多个服务,一个成功立即返回
*/
public class ForkingCluster implements Cluster {
private ClientManager clientManager = ClientManager.getInstance();
private static final Logger logger = LoggerLoader.getLogger(ForkingCluster.class);
private final ExecutorService executor = Executors
.newCachedThreadPool(new NamedThreadFactory("Pigeon-Client-Fork-Processor", true));
//节点数
private static final String KEY_FORKING_SIZE = "pigeon.invoker.forking.size";
private Random r = new Random();
public ForkingCluster() {
ConfigManagerLoader.getConfigManager().getIntValue(KEY_FORKING_SIZE, 0);
}
private List<Client> randomList(List<Client> clients) {
List<Client> randomClients = clients;
int size = ConfigManagerLoader.getConfigManager().getIntValue(KEY_FORKING_SIZE, 0);
//3
int len = clients.size();
if (size > 0 && size < len) {
randomClients = new ArrayList<Client>(size);
//生成一个0到len-1的整型随机数
int startIndex = (int) (r.nextDouble() * len);
//从随机数开始,赠序选取数组中的连接
for (int i = startIndex; i < startIndex + size; i++) {
//如果超过了数组的长度,则从头取起,目的是为了随机
int idx = i < len ? i : (i - len);
randomClients.add(clients.get(idx));
}
}
//如果forksize大于客户端size 调用所有的
return randomClients;
}
@Override
public InvocationResponse invoke(final ServiceInvocationHandler handler, final InvokerContext invocationContext)
throws Throwable {
final InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
final List<Client> clients = clientManager.getAvailableClients(invokerConfig, request);
final AtomicInteger count = new AtomicInteger();
//阻塞队列缓存结果
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
//获取随机的client
final List<Client> selectedClients = randomList(clients);
for (final Client client : selectedClients) {
executor.execute(new Runnable() {
public void run() {
InvokerContext ctxt = new DefaultInvokerContext(invokerConfig, invocationContext.getMethodName(),
invocationContext.getParameterTypes(), invocationContext.getArguments());
ctxt.setClient(client);
ctxt.setRequest(null);
ctxt.setRequest(InvokerUtils.createRemoteCallRequest(ctxt, invokerConfig));
try {
InvocationResponse resp = handler.handle(ctxt);
//向堵塞队列塞入结果
ref.offer(resp);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selectedClients.size()) {
ref.offer(e);
}
}
}
});
}
Object ret = null;
//配置超时时间,获取超时时间内堵塞请求
if (request.getTimeout() > 0) {
ret = ref.poll(request.getTimeout(), TimeUnit.MILLISECONDS);
} else {
//堵塞等待直到有请求完成
ret = ref.take();
}
if (ret instanceof Throwable) {
throw (Throwable) ret;
} else if ((ret instanceof FutureResponse)
&& CallMethod.isFuture(invokerConfig.getCallType())) {
//如果是future响应则设置相关future
FutureFactory.setFuture(((FutureResponse) ret).getServiceFuture());
} else if (ret == null) {
throw InvocationUtils.newTimeoutException("timeout while waiting forking response:" + request);
}
return (InvocationResponse) ret;
}
@Override
public String getName() {
return Constants.CLUSTER_FORKING;
}
}
4.1.2 服务线程池隔离
线程池配置支持服务(单个类)级别与方法级别配置
如果服务有线程池配置,会在服务注册的时候就将相关的线程池创建好,线程池规则如果不配置都是默认共享线程池。
线程池有动态和非动态的区分,动态的支持配置中心修改,然后监听变动refresh线程池配置
public class NettyServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger log = LoggerLoader.getLogger(NettyServerHandler.class);
private NettyServer server;
public NettyServerHandler(NettyServer server) {
this.server = server;
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (log.isDebugEnabled()) {
if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
log.debug(e.toString());
}
}
super.handleUpstream(ctx, e);
}
/**
* 服务器端接受到消息开始处理
*
* @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.MessageEvent)
*/
@SuppressWarnings("unchecked")
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
CodecEvent codecEvent = (CodecEvent) (message.getMessage());
if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
return;
}
InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();
ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
try {
//异步调用
this.server.processRequest(request, invocationContext);
} catch (Throwable e) {
String msg = "process request failed:" + request;
// 心跳消息只返回正常的, 异常不返回
if (request.getCallType() == Constants.CALLTYPE_REPLY
&& request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
ctx.getChannel().write(ProviderUtils.createFailResponse(request, e));
}
log.error(msg, e);
}
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
this.server.getChannelGroup().add(e.getChannel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
log.info(e.getCause().getMessage(), e.getCause());
ctx.getChannel().close();
}
}
/**
* server-并行处理器
*
* @param request
* @param providerContext
* @return
*/
public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
final ProviderContext providerContext) {
requestContextMap.put(request, providerContext);
startMonitorData(request, providerContext);
Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {
@Override
public InvocationResponse call() throws Exception {
providerContext.getTimeline().add(new TimePoint(TimePhase.T));
try {
//根据链路请求
ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
.selectInvocationHandler(providerContext.getRequest().getMessageType());
if (invocationHandler != null) {
providerContext.setThread(Thread.currentThread());
return invocationHandler.handle(providerContext);
}
} catch (Throwable t) {
logger.error("Process request failed with invocation handler, you should never be here.", t);
} finally {
requestContextMap.remove(request);
}
return null;
}
};
//选择线程池
final ThreadPool pool = selectThreadPool(request);
try {
checkRequest(pool, request);
providerContext.getTimeline().add(new TimePoint(TimePhase.T));
//线程池去 执行
return pool.submit(requestExecutor);
} catch (RejectedExecutionException e) {
requestContextMap.remove(request);
endMonitorData(request, providerContext);
throw new RejectedException(getProcessorStatistics(pool), e);
}
}
private ThreadPool selectThreadPool(final InvocationRequest request) {
ThreadPool pool = null;
String serviceKey = request.getServiceName();
String methodKey = serviceKey + "#" + request.getMethodName();
// spring poolConfig
pool = getConfigThreadPool(request);
// 方法级别配置
if (pool == null && !CollectionUtils.isEmpty(methodThreadPools)) {
pool = methodThreadPools.get(methodKey);
}
// 服务级别配置
if (pool == null && !CollectionUtils.isEmpty(serviceThreadPools)) {
pool = serviceThreadPools.get(serviceKey);
}
// 开启动态线程池将获取动态线程池
if (pool == null && poolConfigSwitchable && !CollectionUtils.isEmpty(apiPoolNameMapping)) {
PoolConfig poolConfig = null;
String poolName = apiPoolNameMapping.get(methodKey);
if (StringUtils.isNotBlank(poolName)) { // 方法级别
poolConfig = poolConfigs.get(poolName);
if (poolConfig != null) {
pool = DynamicThreadPoolFactory.getThreadPool(poolConfig);
}
} else { //服务级别
poolName = apiPoolNameMapping.get(serviceKey);
if (StringUtils.isNotBlank(poolName)) {
poolConfig = poolConfigs.get(poolName);
if (poolConfig != null) {
pool = DynamicThreadPoolFactory.getThreadPool(poolConfig);
}
}
}
}
// 默认方式
if (pool == null) {
// 针对慢请求,为其选择相应的线程池
if (enableSlowPool && requestTimeoutListener.isSlowRequest(request)) {
pool = slowRequestProcessThreadPool;
} else {
//共享线程池
pool = sharedRequestProcessThreadPool;
}
}
return pool;
}
五、后续想介绍的
1、点评pigeon与阿里dubbo RPC框架介绍与核心源码分享
2、RCP框架为了提高qps的一些优化