公司常用组件记录

那些你以为很高大尚的组件其实也都是从无到有迭代出来了,就算没有造轮子的时间,也要有造轮子的💕。

短链接服务

短链接服务主要是加密和缩短原来链接,同时也可以统计短链接访问的一些情况。
生成思路:
1、将原链接存入表,获得long类型的id,
优化点:如果mysql不可用时候,可直接在redis获取自增的id, 然后调用方法获取到短链唯一字符串,
防止两种方式生产的短链冲突,在redis生产的短链字符串加上一个保留字符串(Z),这个字符串
是调用Long.toString或者parseRadix都会带有的,等到mysql正常时候,再将缓存的数据回写
到mysql里面保证落地数据一致。

2、掉用方法Long.toString(shortLinkId, 36)获得短链唯一性字符串
优化点,如果是36进制,7位置长度的话 36^6=78亿左右数据,最多可以存这么多条数据,可以试用一下的61进制
private final static char[] digits = {'0','1', '2', '3', '4', '5', '6', '7', '8',
                     '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
                     'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y',
                     'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
                     'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y'};
    //求余数求商 10进制转61进制
    public static String parseRadix(long origin){
            char[] buf = new char[61];
            int charPos = 60;
            while (origin >= 61) {
                buf[charPos--] = digits[(int)((origin % 61))];
                origin = origin / 61;
            }
            buf[charPos] = digits[(int)(origin)];
            return new String(buf, charPos, (61 - charPos));
    }
3、根据shortLinkId更新原来表,将短链字符串回写表
4、将短链字符串和原链接的对应关系写入缓存系统,redis或者tail
查询思路:
1、根据短链字符串先查询缓存,没有命中缓存再查询表,然后再塞入缓存。

统一电话加密服务

背景

对系统存储的等敏感信息进行脱敏,防止从技术人员内部造成的敏感信息泄漏问题

系统目标

目标:
手机号支撑能力:3亿 -> 15亿 -> 30亿
按业务划分,各业务互相不影响(泳道)
SLA标准:
可用性:99.999%
TP95:1.3ms、TP99:2ms
业务QPS:6w

核心流程

短信服务

相关概念

短信一般都会有模板的概念,模板可以有申请、修改、审核相关流程,模板控制了短信整体格式,方便后期统计和维护。
模板都会有权重的概念,因为短信不是24小时发送的,如果大半夜给用户发短信就是SB了,所以需要设计权重。
短信内容字数限制为64字,超过64字会自动分成多条发送。
短信又分为功能短信和营销短信:
营销短信需要支持"退订回复TD",同时营销短信发送频次限制:单个手机每天至多接收一天营销短信

插入短信

当业务方新增一条短信记录时候,服务端需要一系列检验。
1、模板是否启用
2、短信里面的链接是否是短链接
3、短信内容经过诚信服务是否有敏感词语
4、手机号码是否是黑名单
5、如果是营销短信检验发送次数
6、生成分布式unionId(可以根据SnowFlake算法)

经历过以上6步骤,短信的对象算是构造完成了,但是短信模板有优先级,优先级高的肯定是要立即发送的,比如登录
获取验证码这种,那么这种的一般是先将短信放到队列,然后再多线程消费队列去,再调用短信提供方接口,接着再将记录插入
数据库数据。如果像订单购买成功这种优先级比较低的,可以先将数据放到db里面,或者先放到kafka里面,然后消费kafka再放到db里面
,然后再启用多个线程,去发送这些短信。

对于放入db或者放入kafka,可以根据权重,比如设计一个随机数,考虑到可用性,比如放入db时候,如果这时候db不可用,就优先放入kafka。
发消息的时候,需要考虑重复发送的问题,比如相同的手机号+内容,如果30秒内重复发送就丢弃发送请求

核心点

核心功能就是,队列+线程
缓存队列都是有大小的,如果队列满了需要怎么办,如果系统重启队列里面还有内容怎么办。
如果多线程查询db里面待发送的短信,每个线程都有可能查到重复的内容,对于重复的需要怎么处理,这些都是需要待考虑的问题。

crane定时任务

定时任务一般分两种,一种是去中心化,一种是中心化。
去中心化的话,每个客户端有能力执行任务,多个客户端只有master才有执行job的权力,可以通过ZK的选举
功能实现Leader功能
中心化的话就是,服务端会记录下任务情况,等到任务该执行的时候,再随机选择客户端去执行。
去中心化和中心化各有优缺点,
去中心化好处在于只是依赖ZK的选举,但是客户端必须要严格控制幂等,同时也要考虑如果一个任务被两台服务器同时执行的情况。
中心化好处就是不依赖选举,但是依赖服务端去下发任务,如果任务很多的话会导致服务端非常臃肿,同时需要考虑服务端高可用的问题
必须要支持分布式。

corn表达式记录

字段:
秒(Seconds) 分(Minutes)  小时(Hours) 日期(DayofMonth) 月份(Month) 星期(DayofWeek) 年(可选,留空)(Year)
字符意义
*:表示匹配该域的任意值,假如在Minutes域使用*, 即表示每分钟都会触发事件
?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。用的比较少
-:表示范围。例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次。用的也比较少
/:表示起始时间开始触发,然后每隔固定时间触发一次。
例如在Minutes域使用5/20,则意味着从5分钟开始,20分钟触发一次,而25,45等分别触发一次.
0 0/30 * * * ? ,则意味着每隔30分钟执行一次,从第0分钟开始,然后是30分钟 然后是60分钟
常用表达式例子:
每天中午12点触发 :0 0 12 * * ?
每天上午10:15触发  :0 15 10 * * ?
周一到周五每天上午10:15触发 :0 15 10 ? * MON-FRI
朝九晚五工作时间内每半小时 : 0 0/30 9-17 * * ?
朝九晚五工作时间内每半小时 : 0 0/30 * * * ?
 每一小时执行一次: 0 0 * * * ?     0 0 0/1 * * ?

docker-job

有时候我们的任务执行频率不是特别高,比如每个6小时执行一次,对于这种任务,我们可以利用docker进行部署执行。
当任务执行后docker机器销毁也不会占用机器

进程内job

对于执行频率比较高的,比如每隔5分钟执行一次就适合在进程内进行部署

RPC

一、RPC框架的价值

  RPC的全称是RemoteProcedureCall,即远程过程调用。

落地微服务的基础
解决了分布式系统之间相互通信的问题
屏蔽了服务与服务之间调用的技术细节,让我们像调用本地代码一样访问远程服务

二、RPC关键词

2.1 服务注册与发现
服务动态上线与下线

2.2 软负载均衡策略
替代F5,常见有轮询法、加权轮询法、随机法、最小连接数法

2.3 服务调用链跟踪
全链路日志跟踪监控

2.4 服务监控与统计
2.5 服务编排
2.6. 服务权限控制/黑白名单
核心服务需要申请才可以访问

2.7 服务依赖关系
服务下线或者强制升级,如何告知上下游

2.8 服务分组、版本、重试 、降级、限流、容错
分组:提供接口多实现功能
版本号:当一个接口的实现,出现不兼容升级时或者处理逻辑变化时候,可以用版本号过渡
重试:防止服务抖动
2.9 SPI机制
预留可扩展接口

2.10 服务泳道
支持一个服务注册到不同的环境,这样就可以避免多版本并行提测问题。

2.11 服务region
服务多区域机房的情况下,相同区域的client优先调用相同区域的server

三、递进手写RPC

最简单的rpc流程图

3.1 入门级别

参考dubbo作者梁飞代码
public interface HelloService {
    String hello(String name);
}

public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "Hello " + name;
    }
}

public class RpcProvider {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        RpcFramework.export(service, 1234);
    }
}

public class RpcConsumer {
    public static void main(String[] args) throws Exception {
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        for (int i = 0; i < Integer.MAX_VALUE; i ++) {
            String hello = service.hello("World" + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}



public class RpcFramework {
    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null)
            throw new IllegalArgumentException("service instance == null");
        if (port <= 0 || port > 65535)
            throw new IllegalArgumentException("Invalid port " + port);
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        ServerSocket server = new ServerSocket(port);
        for (; ; ) {
            try {
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                try {
                                    String methodName = input.readUTF();
                                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                    Object[] arguments = (Object[]) input.readObject();
                                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                    try {
                                        Method method = service.getClass().getMethod(methodName, parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
        if (interfaceClass == null)
            throw new IllegalArgumentException("Interface class == null");
        if (!interfaceClass.isInterface())
            throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
        if (host == null || host.length() == 0)
            throw new IllegalArgumentException("Host == null!");
        if (port <= 0 || port > 65535)
            throw new IllegalArgumentException("Invalid port " + port);
        System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                Socket socket = new Socket(host, port);
                try {
                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                    try {
                        output.writeUTF(method.getName());
                        output.writeObject(method.getParameterTypes());
                        output.writeObject(arguments);
                        ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                        try {
                            Object result = input.readObject();
                            if (result instanceof Throwable) {
                                throw (Throwable) result;
                            }
                            return result;
                        } finally {
                            input.close();
                        }
                    } finally {
                        output.close();
                    }
                } finally {
                    socket.close();
                }
            }
        });
    }
}
如果服务处理很慢,同时N个客户端调用,会有什么问题?可以做怎么改进。

3.2 异步调用

rpc请求模式有同步模式、无返回值模式、异步模式,异步模式设计比较巧妙

如果要实现异步,那么client端在调用的时候,返回值肯定是要是future类型,但如果架包里面的DTO对象返回类型定义成Future<xx>这种格式,

后期如果想改成同步调用,就又要改变DTO类型 ,rpc请求模式与DTO相互耦合肯定违背我们的设计。

3.2.1 设计思路:
client每次请求生成唯一requestId,server端返回的时候带上requestId与返回值

3.2.2 核心代码:
client设计:

public class ConsumerClient {
    public static <T> T getProxyClass(Class<T> interfaceClass, RpcRequest rpcRequest) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = null;
                ObjectOutputStream oos = null;
                BufferedReader bufferedInputStream = null;
                try {
                    socket = new Socket("localhost", 8081);
                    oos = new ObjectOutputStream(socket.getOutputStream());
                    String requestId = UUID.randomUUID().toString();
                    rpcRequest.setRequestId(requestId);
                    rpcRequest.setMethodName(method.getName());
                    rpcRequest.setGetParameterTypes(method.getParameterTypes());
                    rpcRequest.setArgs(args);
                    oos.writeObject(rpcRequest);
                    oos.flush();
                    if (rpcRequest.isAnsyc()) {
                        //requestid 放到threadLocal
                        RpcContext.setInvokeMethodStyle(requestId);
                        Socket finalSocket = socket;
                        new Thread() {
                            @Override
                            public void run() {
                                try {
                                    ObjectInputStream ois = new ObjectInputStream(finalSocket.getInputStream());
                                    RpcResponse rpcResponse = (RpcResponse) ois.readObject();
                                    if (RpcContext.checkAnsyc(rpcResponse.getRequestId())) {
                                        RpcContext.putAnsyResult(rpcResponse.getRequestId(), rpcResponse);
                                    }
                                } catch (IOException e) {
                                    e.printStackTrace();
                                } catch (ClassNotFoundException e) {
                                    e.printStackTrace();
                                }
                            }
                        }.start();
                    } else {
                        bufferedInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "utf-8"));
                        return bufferedInputStream.readLine();
                    }
                    return null;
                } catch (Exception e) {
                    log.error("", e);
                } finally {
                }
                return null;
            }
        });
    }

    public static void main(String[] args) throws Exception, InterruptedException {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setAnsyc(true);
        TestService testService = ConsumerClient.getProxyClass(TestService.class, rpcRequest);
        long beginTime = System.currentTimeMillis();
        testService.testMethod("gaogao1");
        Future future1 = RpcContext.getFuture();

        testService.testMethod("gaogao2");
        Future future2 = RpcContext.getFuture();

        log.info("------------get result-------");
        RpcResponse response1 = (RpcResponse) future1.get();
        System.out.println(response1.getResult());

        RpcResponse response2 = (RpcResponse) future2.get();
        long endTime = System.currentTimeMillis();

        log.info("cost:{}", endTime - beginTime);
        System.out.println(response2.getResult());
    }
}



contentx设计:

public class RpcContext {
    //存放requestId
    public static ThreadLocal<String> requestIdthreadLocal = new ThreadLocal<>();
    //调用方式
    public static Map<String, Boolean> invokeMethodStyle = new ConcurrentHashMap<>();
    //缓存future
    public static Map<String, RpcResultFuture> futureMap = new ConcurrentHashMap<>();

    /**
     * 返回值放到map里面
     * @param requestId
     * @param rpcResponse
     */
    public static void putAnsyResult(String requestId, RpcResponse rpcResponse) {
        RpcResultFuture future = futureMap.get(requestId);
        future.setResult(requestId, rpcResponse);
    }

    /**
     * 获取代理future
     *
     * @return
     */
    public static Future getFuture() throws Exception {
        if(requestIdthreadLocal.get()==null){
            throw new Exception("no ansyc method");
        }
        RpcResultFuture future = new RpcResultFuture();
        //存放到缓存,这样当返回值回来时候就可以用到
        futureMap.put(requestIdthreadLocal.get(), future);
        //防止随便get
        requestIdthreadLocal.remove();
        return future;
    }

    public static void setInvokeMethodStyle(String requestId) {
        invokeMethodStyle.put(requestId, true);
        requestIdthreadLocal.set(requestId);
    }

    public static boolean checkAnsyc(String requestId) {
        return invokeMethodStyle.get(requestId) == null ? false : true;
    }
}



future设计

public class RpcResultFuture implements Future {
    private String requestId;
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private Object object = null;

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }

    @Override
    public Object get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        //clear缓存
        RpcContext.futureMap.remove(requestId);
        return object;
    }

    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return object;
    }

    public void setResult(String requestId, RpcResponse rpcResponse) {
        this.requestId = requestId;
        object = rpcResponse;
        countDownLatch.countDown();
    }
}

3.3 基于netty封装
bio版本qps非常低,同时还会有粘包拆包问题

3.3.1 增加功能
自定义传输协议
           单次请求体由以下两部分组成:

      1、数据byte[]总长度+4个占位符长度
      2、数据data的byte[]
自定义序列化
  提高性能
增加服务请求超时时间
server服务注解注册


3.3.2 核心代码
client:

@Slf4j
public class ClientEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        try {
//          byte[] responseByte = KryoUtil.serialiaztion(msg);
            byte[] responseByte = Tool.serialize(msg);
            int totalLength = 4 + responseByte.length;
            out.writeInt(totalLength);
            out.writeBytes(responseByte);
        } catch (Exception e) {
            log.error("", e);
        }
    }
}

public class ClientDecoder extends ByteToMessageDecoder {
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readableBytes = in.readableBytes();
        //最小可读长度要大于4
        if (readableBytes < 4) {
            return;
        }
        //标记当前指针位置
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        //如果数据可读长度小于发送来读数据包总长度,当然不能继续读
        if (readableBytes < dataLength) {
            //恢复读指针到原来的位置
            in.resetReaderIndex();
            return;
        }
        byte[] bytes = new byte[dataLength - 4];
        in.readBytes(bytes);
        //反序列请求
        out.add(Tool.deserialize(bytes, RpcResponse.class));
    }
}

public class RpcClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse) msg;
        ClientProvider.putRpcResponse(rpcResponse);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

public class RpcInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcInvokeFutureResult rpcInvokeFutureResult = null;
        RpcResponse response = null;
        try {
            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setRequestId(UUID.randomUUID().toString());
            rpcRequest.setClassName(method.getDeclaringClass().getName());
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameterTypes(method.getParameterTypes());
            rpcRequest.setParameters(args);
            //获取future返回值
            rpcInvokeFutureResult = ClientProvider.sendAndGetRpcInvokeFutureResult(rpcRequest, rpcRequest.getRequestId());
            response = rpcInvokeFutureResult.get(rpcRequest.getTimeOut(), TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error("", e);
        }
        if (response != null) {
            return response.getResponse();
        }
        return null;
    }
}
public class ClientProvider {
    private static Map<String, RpcInvokeFutureResult> rpcResultMap = new ConcurrentHashMap<>();
    private static ChannelFuture channelFuture = null;
    static {
        init();
    }

    private static void init() {
        startNettyClient();
    }

    private static void startNettyClient() {
        Bootstrap clentBootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        clentBootstrap.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        /**
                         /**自定义协议解决粘包 begin**/
                        ch.pipeline().addLast(new ClientEncoder());
                        ch.pipeline().addLast(new ClientDecoder());
                        /**自定义协议解决粘包 end**/
                        ch.pipeline().addLast(new RpcClientHandler());
                    }
                }).option(ChannelOption.SO_KEEPALIVE, true);
        try {
            channelFuture = clentBootstrap.connect("127.0.0.1", 8000).sync();
        } catch (InterruptedException e) {
            log.error("init netty client error", e);
        }
    }

    public static ChannelFuture getChannelFuture() {
        return channelFuture;
    }

    /**
     * 发送消息和返回future结果
     *
     * @param
     * @param requestId
     * @return
     */
    public static RpcInvokeFutureResult sendAndGetRpcInvokeFutureResult(RpcRequest rpcRequest, String requestId) {
        RpcInvokeFutureResult rpcInvokeFutureResult = new RpcInvokeFutureResult(requestId,rpcResultMap);
        getChannelFuture().channel().writeAndFlush(rpcRequest);
        ClientProvider.rpcResultMap.put(requestId, rpcInvokeFutureResult);
        return rpcInvokeFutureResult;
    }


    public static void putRpcResponse(RpcResponse response) {
        RpcInvokeFutureResult rpcInvokeFutureResult = rpcResultMap.get(response.getRequestId());
        if (rpcInvokeFutureResult != null) {
            rpcInvokeFutureResult.put(response);
        }
    }
}



server:

public class RpcServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = (RpcRequest) msg;
        RpcResponse response = new RpcResponse();
        response.setRequestId(rpcRequest.getRequestId());
        String requestClassName = rpcRequest.getClassName();
        Object object = ServerProvider.cacheSericeMap.get(requestClassName);
        try {
            Object result = object.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()).invoke(object, rpcRequest.getParameters());
            response.setResponse(result);
        } catch (Throwable e) {
            response.setExcetion(e.getMessage());
            log.error("", e);
        }
        ctx.writeAndFlush(response);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

public class ServerProvider {
    public static Map<String, Object> cacheSericeMap = new HashMap<>();
    private static AtomicBoolean initStatus = new AtomicBoolean(false);

    public static void init() {
        if (initStatus.compareAndSet(false, true)) {
            scannAndCacheService();
            startNettyServer();
        } else {
            throw new IllegalStateException("can not repeat init");
        }
    }

    private static void startNettyServer() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        NioEventLoopGroup boosGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        serverBootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        /**
                         /**自定义协议解决粘包 begin**/
                        ch.pipeline().addLast(new ServerEncoder());
                        ch.pipeline().addLast(new ServerDecoder());
                        /**自定义协议解决粘包 end**/
                        ch.pipeline().addLast(new RpcServerHandler());
                    }
                }).bind(8000);
    }

    public static void main(String[] args) {
        init();
    }

    /**
     * 扫描指定目录文件
     */
    private static void scannAndCacheService() {
        try {
            log.info("scann rpc class begin");
            ResourcePatternResolver rp = new PathMatchingResourcePatternResolver();
            Resource[] resources = rp.getResources("classpath:com/ggj/java/rpc/demo/netty/first/server/service/imp/*.class");
            if (resources == null || resources.length == 0) {
                throw new IllegalArgumentException("scann package error");
            }
            for (Resource resource : resources) {
                String className = resource.getFile().getPath().split("classes\\/")[1].replaceAll("\\/", ".").replaceAll(".class", "");
                Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
                if (clazz.getAnnotation(RpcService.class) != null) {
                    Object object = clazz.newInstance();
                    cacheSericeMap.putIfAbsent(object.getClass().getInterfaces()[0].getName(), object);
                }
            }
            log.info("scann rpc class end");
        } catch (Exception e) {
            log.error("", e);
        }
    }
}

3.4 基于zk实现注册中心

  服务的注册与发现是rpc框架必备的一个功能

3.4.1 难点思考
zk上的节点树结构如何设计
           服务端的信息都写到zk里面,那么zk的结构如何设计?

--ggjRPC(name space)

----server

-------com.ggj.java.rpc.demo.netty.first.server.service.AppleService   (192.168.8.103:4081,192.168.8.103:4082)

-------com.ggj.java.rpc.demo.netty.first.server.service.OrangeService  (192.168.8.103:4081,192.168.8.103:4082)

----app

-------oderService (appid)

----weight

server是用来记录服务对应的机器ip,多个ip以逗号分割,例如(192.168.8.103:4081,192.168.8.103:4082),
app 是用来记录提供服务的名称(appid)
weight 是用来记录提供服务机器的权重(负载均衡用到)
          如果换成下面的方式会有什么问题?

--ggjRPC(name space)
----server
-------orderService(192.168.8.103:4081,192.168.8.103:4082)
-------userService(192.168.8.103:4083,192.168.8.103:4084)
----app
-------usezkService
----weight


服务注册与发现
           场景一:

                       假设某个服务目前只部署在某一台机器serverA(192.168.1.1)上 , client请求时候可以通过zk直接获取到serverA的ip,然后执行后续操作

            ,如果这时候服务又部署了一台机器serverB(192.168.1.2),client如何感知到新上线的节点serverB。

           场景二:

                       假设某个服务,目前只部署在serverA(192.168.1.1)和serverb(192.168.1.2)上 , client请求时候可以通过zk直接获取到serverA和serverA的ip,然后执行后续操作

           ,如果这时候下线了serverB(192.168.1.2),client如何感知到server b下线,同时切走请求流量到服务A上

client端如何高性能的发现服务 
            如果每次请求都要从zk里面拉去最新的节点,接口qps肯定会下降。

              className -> ipList
               ip-> 多个链接 (删除机器时候 客户端可以自动发现并且删除链接)
               ip->classNameSet(增加机器时候 客户端可以自动发现并且添加链接)

3.4.2 核心代码
zk注册中心:

public class RegisterManager {
    private static Lock lock = new ReentrantLock();
    private static RegisterManager registerManager = new RegisterManager();
    public Register getRegister() {
        return register;
    }
    private Register register;
    private RegisterManager() {
    }

    /**
     * 服务端调用
     *
     * @return
     */
    public static RegisterManager getInstance() {
        registerManager.init(null);
        return registerManager;
    }

    /**
     * 客户端调用
     *
     * @return
     */
    public static RegisterManager getClientInstance() {
        registerManager.init(new ZookeeperDataWatcher());
        return registerManager;
    }

    /**
     * 初始化,需要加锁防止初始化多次
     *
     * @param zookeeperDataWatcher
     */
    private void init(ZookeeperDataWatcher zookeeperDataWatcher) {
        if (register == null) {
            lock.lock();
            try {
                if (register == null) {
                    register = new ZKRegister();
                    register.initListener(zookeeperDataWatcher);
                }
            } catch (Exception e) {
                log.error("init error", e);
            } finally {
                lock.unlock();
            }
        }
    }
}

public class ZKRegister implements Register {

    private AtomicBoolean initRegister = new AtomicBoolean();
    private CuratorClient zkClient;

    public ZKRegister() {
        zkClient = new CuratorClient();
    }

    @Override
    public String getName() {
        return null;
    }

    @Override
    public String getServiceAddress(String serviceName) throws Exception {
        return zkClient.getServiceAddress(serviceName);
    }

    @Override
    public void registerService(String serviceName, String serviceAddress) throws Exception {
        ZKPathConfig zkPathConfig = new ZKPathConfig(serviceName, serviceAddress);
        if (initRegister.compareAndSet(false, true)) {
            initRegisterInfo(zkPathConfig);
        }
        zkClient.registerService(zkPathConfig, serviceAddress);
    }

    /**
     * 注册
     * 机器信息
     * 注册权重
     *
     * @param zkPathConfig
     * @throws Exception
     */
    private void initRegisterInfo(ZKPathConfig zkPathConfig) throws Exception {
        zkClient.registerServerApp(zkPathConfig);
        zkClient.registerWeight(zkPathConfig);
    }

    @Override
    public void unregisterService(String serviceName, String serviceAddress) throws Exception {
        ZKPathConfig zkPathConfig = new ZKPathConfig(serviceName, serviceName);
        zkClient.unregisterService(zkPathConfig, serviceAddress);
    }

    @Override
    public boolean unregisterServerApp(String serviceAddress) {
        return false;
    }

    @Override
    public void initListener(ZookeeperDataWatcher zookeeperDataWatcher) {
        if (zookeeperDataWatcher != null) {
            zkClient.initListener(zookeeperDataWatcher);
        }
    }
}

public class CuratorClient {

    private static CuratorFramework client = null;

    public CuratorClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(Constants.DEFAULT_CONNECT_URL, retryPolicy);
        client = CuratorFrameworkFactory.builder().namespace(Constants.NAME_SPACE).retryPolicy(retryPolicy).connectString(Constants.DEFAULT_CONNECT_URL).build();
        client.start();
    }

    private void setStringValue() {
    }

    /**
     * 并发注册会有问题
     *
     * @param zkPathConfig
     * @param serviceAddress
     * @throws Exception
     */
    public void registerService(ZKPathConfig zkPathConfig, String serviceAddress) throws Exception {
        try {
            if (CuratorUtil.checkExists(client, zkPathConfig.getServicePath())) {
                String resulst = CuratorUtil.getStringData(client, zkPathConfig.getServicePath());
                if (StringUtils.isEmpty(resulst)) {
                    CuratorUtil.update(client, zkPathConfig.getServicePath(), serviceAddress);
                } else {
                    String[] adressArray = resulst.split("\\,");
                    List<String> adressList = new ArrayList();
                    for (String registeredAdress : adressArray) {
                        //已注册就不需要重复注册
                        if (!registeredAdress.equals(serviceAddress)) {
                            adressList.add(registeredAdress.trim());
                        }
                    }
                    adressList.add(serviceAddress);
                    CuratorUtil.update(client, zkPathConfig.getServicePath(), org.apache.commons.lang.StringUtils.join(adressList, ","));
                }
            } else {
                CuratorUtil.create(client, zkPathConfig.getServicePath(), serviceAddress);
            }
        } catch (Exception e) {
            throw e;
        }
    }

    /**
     * 注册应用信息
     *
     * @param zkPathConfig
     * @throws Exception
     */
    public void registerServerApp(ZKPathConfig zkPathConfig) throws Exception {
        CuratorUtil.create(client, zkPathConfig.getAppPath(), zkPathConfig.getServiceAddress(), true);

    }

    /**
     * 注册权重
     *
     * @param zkPathConfig
     * @throws Exception
     */
    public void registerWeight(ZKPathConfig zkPathConfig) throws Exception {
        CuratorUtil.create(client, zkPathConfig.getWeightPath(), String.valueOf(Constants.DEFAULT_ONLINE_WEIGHT), true);
    }

    /**
     * 下线服务机器
     *
     * @param zkPathConfig
     * @param serviceAddress
     * @throws Exception
     */
    public void unregisterService(ZKPathConfig zkPathConfig, String serviceAddress) throws Exception {
        if (CuratorUtil.checkExists(client, zkPathConfig.getServicePath())) {
            String resulst = CuratorUtil.getStringData(client, zkPathConfig.getServicePath());
            if (!StringUtils.isEmpty(resulst)) {
                String[] adressArray = resulst.split("\\,");
                List<String> adressList = new ArrayList();
                for (String adress : adressArray) {
                    adressList.add(adress.trim());
                }
                //remove已经注册过的
                if (adressList.contains(serviceAddress)) {
                    adressList.remove(serviceAddress);
                }
                CuratorUtil.update(client, zkPathConfig.getServicePath(), org.apache.commons.lang.StringUtils.join(adressList, ","));
            }
        }
    }

    /**
     * 获取服务端提供地址
     * @param serviceName
     * @return
     * @throws Exception
     */
    public String getServiceAddress(String serviceName) throws Exception {
        ZKPathConfig zkPathConfig=new ZKPathConfig(serviceName,null);
        if (CuratorUtil.checkExists(client, zkPathConfig.getServicePath())) {
            return CuratorUtil.getStringDataAndWatch(client, zkPathConfig.getServicePath());
        }
        throw new Exception(serviceName+":has no server");
    }

    public void initListener(ZookeeperDataWatcher zookeeperDataWatcher) {
        client.getCuratorListenable().addListener(zookeeperDataWatcher);
    }
}


/**
 * 监听zk节点,下线时候变动
 *
 * @author gaoguangjin
 */
@Slf4j
public class ZookeeperDataWatcher implements CuratorListener {
    /**
     *  className -> ipList
     *  ip-> 多个链接 (删除机器时候 客户端可以自动发现并且删除链接)
     *  ip->classNameSet(增加机器时候 客户端可以自动发现并且添加链接)
     * @param client
     * @param event
     * @throws Exception
     */
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
        if (event.getType() == CuratorEventType.WATCHED && event.getPath() != null) {
            String path = event.getPath();
            String className = path.split("\\/")[2];
            String iplist = CuratorUtil.getStringDataAndWatch(client, path);
            if (StringUtils.isEmpty(iplist)) {
                return;
            }
            String[] iparrays = iplist.split("\\,");
            List<String> nowIpList = new ArrayList<>();
            for (String iparray : iparrays) {
                nowIpList.add(iparray);
            }

            List<String> oldIpList = ClientProvider.getClientConnectMap().get(className).getIpList();
            //以前这个class还没创建过请求
            if (CollectionUtils.isEmpty(oldIpList)) {
                return;
            }

            //判断扩容还是缩容
            if (nowIpList.size() > oldIpList.size()) {
                log.info("扩容,nowIplist={},oldIplist={}", nowIpList, oldIpList);
                nowIpList.removeAll(oldIpList);
                for (String nowIp : nowIpList) {
                    ClientProvider.initClient(nowIp);
                    ClientProvider.getClientConnectMap().get(className).getIpList().add(nowIp);
                }
            } else {
                log.info("锁容,nowIplist={},oldIplist={}", nowIpList, oldIpList);
                List<String> tempList = new ArrayList();
                tempList.addAll(oldIpList);
                tempList.removeAll(nowIpList);
                for (String oldIp : tempList) {
                    ClientProvider.getClientConnectMap().get(className).getIpList().remove(oldIp);
                    List<ChannelFuture> futureList = ClientProvider.getChannelFutureConnectMap().get(oldIp);
                    for (ChannelFuture channelFuture : futureList) {
                        channelFuture.channel().closeFuture();
                    }
                    ClientProvider.getChannelFutureConnectMap().remove(oldIp);
                }
            }
        }
    }
}





client

public class RpcInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcInvokeFutureResult rpcInvokeFutureResult = null;
        RpcResponse response = null;
        RpcRequest rpcRequest = new RpcRequest();
        try {
            rpcRequest.setRequestId(UUID.randomUUID().toString());
            rpcRequest.setClassName(method.getDeclaringClass().getName());
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameterTypes(method.getParameterTypes());
            rpcRequest.setParameters(args);
            //获取future返回值
            rpcInvokeFutureResult = ClientProvider.sendAndGetRpcInvokeFutureResult(rpcRequest, rpcRequest.getRequestId());
            response = rpcInvokeFutureResult.get(rpcRequest.getTimeOut(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("", e);
            throw e;
        }
        //超时
        if(response==null){
            throw new Exception("rpc "+method.getDeclaringClass().getName()+" timeout");
        }
        return response.getResponse();
    }
}

/**
 * @author gaoguangjin
 */
@Slf4j
public class ClientProvider {
    @Getter
    private static Map<String, RpcInvokeFutureResult> rpcResultMap = new ConcurrentHashMap<>();
    @Getter
    //className -> ipList
    private static Map<String, ChannelFutureHolder> clientConnectMap = new ConcurrentHashMap();
    @Getter
    // ip-> 多个链接
    private static Map<String, List<ChannelFuture>> channelFutureConnectMap = new ConcurrentHashMap();
    private static Lock lock = new ReentrantLock();

    public static ChannelFuture getNettyClient(String ipStrs, String className) throws Exception {
        lock.lock();
        try {
            if (clientConnectMap.containsKey(className)) {
                return clientConnectMap.get(className).selectClient();
            }
            String[] ipArray = ipStrs.split("\\,");
            List<String> ipList = new ArrayList<>();
            for (String iparray : ipArray) {
                initClient(iparray);
                ipList.add(iparray);
            }
            clientConnectMap.put(className, new ChannelFutureHolder(ipList));
        } catch (Exception e) {
            log.error("init netty client error", e);
            throw e;
        } finally {
            lock.unlock();
        }
        return clientConnectMap.get(className).selectClient();
    }

    public static List<ChannelFuture> initClient(String ip) throws InterruptedException {
        lock.lock();
        List<ChannelFuture> futureList = new ArrayList<>();
        try {
            String[] ipArray = ip.split("\\:");
            if (channelFutureConnectMap.containsKey(ip)) {
                return channelFutureConnectMap.get(ip);
            }
            for (int i = 0; i < Constants.CLIENT_INIT_CONNECTION_SIZE; i++) {
                Bootstrap clentBootstrap = new Bootstrap();
                NioEventLoopGroup group = new NioEventLoopGroup();
                clentBootstrap.group(group).channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ClientEncoder());
                                ch.pipeline().addLast(new ClientDecoder());
                                ch.pipeline().addLast(new RpcClientHandler());
                            }
                        }).option(ChannelOption.SO_KEEPALIVE, true);
                ChannelFuture channelFuture = clentBootstrap.connect(ipArray[0], Integer.parseInt(ipArray[1])).sync();
                futureList.add(channelFuture);
            }
            channelFutureConnectMap.put(ip, futureList);
        } catch (Exception e) {
            log.error("init netty client error", e);
            throw e;
        } finally {
            lock.unlock();
        }
        return futureList;
    }


    public static Map<String, ChannelFutureHolder> getClientConnectMap() {
        return clientConnectMap;
    }

    /**
     * 1、缓存map 是否存在连接
     * 如果存在 判断是否active
     * 如果不active 就切换到其他到
     *
     * @param rpcRequest
     * @return
     * @throws Exception
     */
    public static ChannelFuture getChannelFuture(RpcRequest rpcRequest) throws Exception {
        ChannelFuture channelFuture = null;
        String className = rpcRequest.getClassName();

        if (clientConnectMap.containsKey(className)) {
            channelFuture = clientConnectMap.get(className).selectClient();
            if (channelFuture != null) {
                return channelFuture;
            }
        }
        String iplist = getConnectIp(className);
        if (StringUtils.isEmpty(iplist)) {
            throw new Exception(className + " has no availiable server");
        }
        channelFuture = getNettyClient(iplist, className);
        return channelFuture;
    }

    /**
     * 获取服务端ip
     *
     * @param className
     * @return
     * @throws Exception
     */
    private static String getConnectIp(String className) throws Exception {
        return RegisterManager.getClientInstance().getRegister().getServiceAddress(className);
    }

    /**
     * 发送消息和返回future结果
     *
     * @param
     * @param requestId
     * @return
     */
    public static RpcInvokeFutureResult sendAndGetRpcInvokeFutureResult(RpcRequest rpcRequest, String requestId) throws Exception {
        RpcInvokeFutureResult rpcInvokeFutureResult = new RpcInvokeFutureResult(requestId, rpcResultMap);
        Channel channel = getChannelFuture(rpcRequest).channel();
        ChannelFuture channelFuture = channel.writeAndFlush(rpcRequest);
        //监听resposne返回结果,如果服务端下线会收到通知
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture rfuture) throws Exception {
                if (!rfuture.isSuccess()) {
                    log.info("rpc rsponse unsuccessful");
                    rpcInvokeFutureResult.setCause(rfuture.cause());
                }
            }
        });
        ClientProvider.rpcResultMap.put(requestId, rpcInvokeFutureResult);
        return rpcInvokeFutureResult;
    }


    public static void putRpcResponse(RpcResponse response) {
        RpcInvokeFutureResult rpcInvokeFutureResult = rpcResultMap.get(response.getRequestId());
        if (rpcInvokeFutureResult != null) {
            rpcInvokeFutureResult.put(response);
        }
    }

    @Data
    public static class ChannelFutureHolder {

        private AtomicInteger atomicInteger = new AtomicInteger(1);
        private String ip;
        @Getter
        List<String> ipList;

        public ChannelFutureHolder(List<String> ipList) {
            this.ipList = ipList;
        }

        /**
         * 随机选服务端ip
         * 随机选服务端的连接
         *
         * @return
         */
        public ChannelFuture selectClient() {
            String randomIp = ipList.get(new Random().nextInt(ipList.size()));
            //System.out.println("invoke ip="+randomIp);
            List<ChannelFuture> channelFutureList = channelFutureConnectMap.get(randomIp);
            if (CollectionUtils.isEmpty(channelFutureList)) {
                return null;
            }
            return channelFutureList.get(atomicInteger.getAndIncrement() % Constants.CLIENT_INIT_CONNECTION_SIZE);
        }
    }
}



server端:

添加了 shutdownhook

/**
 * 测试时候需要设置 -Dport=4081
 *
 * @author gaoguangjin
 */
@Slf4j
public class ServerProvider {
    public static Map<String, Object> cacheServiceMap = new HashMap<>();
    private static RegisterManager registerManager = RegisterManager.getInstance();
    private static AtomicBoolean initStatus = new AtomicBoolean(false);

    public static void init() {
        System.setProperty(Constants.APP_NAME_KEY, "usezkService");
        if (initStatus.compareAndSet(false, true)) {
            try {
                scannAndCacheService();
                startNettyServer();
                registerServerService();
                addShutDownHook();
            } catch (Exception e) {
                log.error("init server error", e);
                throw new RuntimeException();
            }
        } else {
            throw new IllegalStateException("can not repeat init");
        }
    }

    private static void addShutDownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    shutDownServer();
                } catch (Exception e) {
                    log.error("停机hook失败", e);
                }
            }
        });
    }

    /**
     * 下线服务
     *
     * @throws Exception
     */
    private static void shutDownServer() throws Exception {
        log.info("excute shutdownserver");
        for (String serviceName : cacheServiceMap.keySet()) {
            registerManager.getRegister().unregisterService(serviceName, Constants.SERVICE_ADRESS);
        }
    }

    private static void registerServerService() throws Exception {
        for (String className : cacheServiceMap.keySet()) {
            registerManager.getRegister().registerService(className, Constants.SERVICE_ADRESS);
        }
    }

    private static void startNettyServer() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        NioEventLoopGroup boosGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        serverBootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        /**
                         /**自定义协议解决粘包 begin**/
                        ch.pipeline().addLast(new ServerEncoder());
                        ch.pipeline().addLast(new ServerDecoder());
                        /**自定义协议解决粘包 end**/
                        ch.pipeline().addLast(new RpcServerHandler());
                    }
                }).bind(Constants.HOST_ADRESS, Constants.DEFAULT_SERVER_PORT);
    }

    public static void main(String[] args) {
        init();
    }

    /**
     * 扫描指定目录文件
     */
    private static void scannAndCacheService() {
        try {
            log.info("scann rpc class begin");
            ResourcePatternResolver rp = new PathMatchingResourcePatternResolver();
            Resource[] resources = rp.getResources("classpath:com/ggj/java/rpc/demo/netty/usezk/server/service/imp/*.class");
            if (resources == null || resources.length == 0) {
                throw new IllegalArgumentException("scann package error");
            }
            for (Resource resource : resources) {
                String className = resource.getFile().getPath().split("classes\\/")[1].replaceAll("\\/", ".").replaceAll(".class", "");
                Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
                if (clazz.getAnnotation(RpcService.class) != null) {
                    Object object = clazz.newInstance();
                    cacheServiceMap.putIfAbsent(object.getClass().getInterfaces()[0].getName(), object);
                }
            }
            log.info("scann rpc class end");
        } catch (Exception e) {
            log.error("", e);
        }
    }
}


3.5 基于spring实现
 略

四、点评pigeon框架介绍
项目地址:https://github.com/dianping/pigeon

主要特色:

1、除了支持spring schema等配置方式,也支持代码annotation方式发布服务、引用远程服务,并提供原生api接口的用法。(❤️)

2、支持http协议,方便非java应用调用pigeon的服务。(❤️❤️)

3、序列化方式除了hessian,还支持thrift等。(❤️❤️)

4、提供了服务器单机控制台pigeon-console,包含单机服务测试工具。(❤️❤️❤️❤️❤️)

5、创新的客户端路由策略,提供服务预热功能,解决线上流量大的service重启时大量超时的问题。(❤️❤️❤️❤️)

6、记录每个请求的对象大小、返回对象大小等监控信息。(❤️❤️)

7、服务端可对方法设置单独的线程池进行服务隔离,可配置客户端应用的最大并发数进行限流。(❤️❤️❤️❤️)




4.1 核心功能介绍
4.1.1 客户端路由策略
  集群策略:

//失败立即抛出异常
public static final String CLUSTER_FAILFAST = "failfast";
//失败自动切换,重试其他服务器
public static final String CLUSTER_FAILOVER = "failover";
//失败忽略异常,返回null
public static final String CLUSTER_FAILSAFE = "failsafe";
//并行调用多个服务,返回调用最快的节点数据
public static final String CLUSTER_FORKING = "forking";

源码分析:
FailfastCluster
/**
 * 快速失败
 */
public class FailfastCluster implements Cluster {

    private ClientManager clientManager = ClientManager.getInstance();

    private static final Logger logger = LoggerLoader.getLogger(FailfastCluster.class);

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
        //是否超时重试
        boolean timeoutRetry = invokerConfig.isTimeoutRetry();
        if (!timeoutRetry) {
            //按负载均衡策略获取服务端连接实例
            Client remoteClient = clientManager.getClient(invokerConfig, request, null);
            invocationContext.setClient(remoteClient);
            try {
                return handler.handle(invocationContext);
            } catch (NetworkException e) {
                //网络异常重试
                remoteClient = clientManager.getClient(invokerConfig, request, null);
                invocationContext.setClient(remoteClient);
                return handler.handle(invocationContext);
            }
        } else {
            //重试次数
            int retry = invokerConfig.getRetries(invocationContext.getMethodName());
            RequestTimeoutException lastError = null;
            int maxInvokeTimes = retry + 1;
            int invokeTimes = 0;
            for (int index = 0; index < maxInvokeTimes; index++) {
                Client clientSelected = null;
                try {
                    clientSelected = clientManager.getClient(invokerConfig, request, null);
                } catch (ServiceUnavailableException e) {
                    if (invokeTimes > 0) {
                        logger.error("Invoke method[" + invocationContext.getMethodName() + "] on service["
                                + invokerConfig.getUrl() + "] failed with " + invokeTimes + " times");
                        throw lastError;
                    } else {
                        throw e;
                    }
                }
                try {
                    invokeTimes++;
                    invocationContext.setClient(clientSelected);
                    InvocationResponse response = null;
                    try {
                        response = handler.handle(invocationContext);
                    } catch (NetworkException e) {
                        clientSelected = clientManager.getClient(invokerConfig, request, null);
                        invocationContext.setClient(clientSelected);
                        response = handler.handle(invocationContext);
                        logger.info("Retry while network exception:" + e.getMessage());
                    }

                    if (lastError != null) {
                        logger.warn("Retry method[" + invocationContext.getMethodName() + "] on service["
                                + invokerConfig.getUrl() + "] succeed after " + invokeTimes
                                + " times, last failed error: " + lastError.getMessage(), lastError);
                    }
                    return response;
                } catch (RequestTimeoutException e) {
                    lastError = e;
                }
            }
            if (lastError != null) {
                throw lastError;
            } else {
                throw new RemoteInvocationException(
                        "Invoke method[" + invocationContext.getMethodName() + "] on service[" + invokerConfig.getUrl()
                                + "] failed with " + invokeTimes + " times, last error: "
                                + (lastError != null ? lastError.getMessage() : ""),
                        lastError != null && lastError.getCause() != null ? lastError.getCause() : lastError);
            }
        }
    }

    @Override
    public String getName() {
        return Constants.CLUSTER_FAILFAST;
    }

}

FailoverCluster:

/**
 * 失败自动切换,重试其他服务器
 */
public class FailoverCluster implements Cluster {

    private ClientManager clientManager = ClientManager.getInstance();

    private static final Logger logger = LoggerLoader.getLogger(FailoverCluster.class);

    @Override
    public InvocationResponse invoke(final ServiceInvocationHandler handler, final InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        List<Client> selectedClients = new ArrayList<Client>();
        Throwable lastError = null;
        int retry = invokerConfig.getRetries(invocationContext.getMethodName());
        //最大重试次数
        int maxInvokeTimes = retry + 1;
        //是否允许超时重试
        boolean timeoutRetry = invokerConfig.isTimeoutRetry();
        int invokeTimes = 0;
        for (int index = 0; index < maxInvokeTimes; index++) {
            InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
            Client clientSelected = null;
            try {
                clientSelected = clientManager.getClient(invokerConfig, request, selectedClients);
            } catch (ServiceUnavailableException e) {
                if (invokeTimes > 0) {
                    logger.error("Invoke method[" + invocationContext.getMethodName() + "] on service["
                            + invokerConfig.getUrl() + "] failed with " + invokeTimes + " times");
                    throw lastError;
                } else {
                    throw e;
                }
            }
            //记录使用过的集群client
            selectedClients.add(clientSelected);
            try {
                //记录调用次数
                invokeTimes++;
                invocationContext.setClient(clientSelected);
                InvocationResponse response = handler.handle(invocationContext);
                if (lastError != null) {
                    logger.warn(
                            "Retry method[" + invocationContext.getMethodName() + "] on service["
                                    + invokerConfig.getUrl() + "] succeed after " + invokeTimes
                                    + " times, last failed error: " + lastError.getMessage(), lastError);
                }
                return response;
            } catch (Throwable e) {
                //超时异常
                lastError = e;
                if (e instanceof RequestTimeoutException) {
                    //如果不允许重试 直接抛出异常
                    if (!timeoutRetry) {
                        throw e;
                    }
                }
            }
        }
        if (lastError != null) {
            logger.error("Invoke method[" + invocationContext.getMethodName() + "] on service["
                    + invokerConfig.getUrl() + "] failed with " + invokeTimes + " times");
            throw lastError;
        } else {
            throw new RemoteInvocationException("Invoke method[" + invocationContext.getMethodName() + "] on service["
                    + invokerConfig.getUrl() + "] failed with " + invokeTimes + " times, last error: "
                    + (lastError != null ? lastError.getMessage() : ""), lastError != null
                    && lastError.getCause() != null ? lastError.getCause() : lastError);
        }
    }

    @Override
    public String getName() {
        return Constants.CLUSTER_FAILOVER;
    }

}


FailsafeCluster
/**
 * 安全失败策略-失败忽略异常
 */
public class FailsafeCluster implements Cluster {

    private ClientManager clientManager = ClientManager.getInstance();

    private static final Logger logger = LoggerLoader.getLogger(FailsafeCluster.class);

    private static final InvocationResponse NO_RETURN_RESPONSE = InvokerUtils.createNoReturnResponse();

    @Override
    public InvocationResponse invoke(final ServiceInvocationHandler handler, final InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
        try {
            Client remoteClient = clientManager.getClient(invokerConfig, request, null);
            invocationContext.setClient(remoteClient);
            return handler.handle(invocationContext);
        } catch (Throwable t) {
            logger.error("", t);
            //返回空对象
            return NO_RETURN_RESPONSE;
        }
    }

    @Override
    public String getName() {
        return Constants.CLUSTER_FAILSAFE;
    }

}

ForkingCluster:

/**
 * 并行调用多个服务,一个成功立即返回
 */
public class ForkingCluster implements Cluster {

    private ClientManager clientManager = ClientManager.getInstance();
    private static final Logger logger = LoggerLoader.getLogger(ForkingCluster.class);
    private final ExecutorService executor = Executors
            .newCachedThreadPool(new NamedThreadFactory("Pigeon-Client-Fork-Processor", true));
    //节点数
    private static final String KEY_FORKING_SIZE = "pigeon.invoker.forking.size";
    private Random r = new Random();

    public ForkingCluster() {
        ConfigManagerLoader.getConfigManager().getIntValue(KEY_FORKING_SIZE, 0);
    }

    private List<Client> randomList(List<Client> clients) {
        List<Client> randomClients = clients;
        int size = ConfigManagerLoader.getConfigManager().getIntValue(KEY_FORKING_SIZE, 0);
        //3
        int len = clients.size();
        if (size > 0 && size < len) {
            randomClients = new ArrayList<Client>(size);
            //生成一个0到len-1的整型随机数
            int startIndex = (int) (r.nextDouble() * len);
            //从随机数开始,赠序选取数组中的连接
            for (int i = startIndex; i < startIndex + size; i++) {
                //如果超过了数组的长度,则从头取起,目的是为了随机
                int idx = i < len ? i : (i - len);
                randomClients.add(clients.get(idx));
            }
        }
        //如果forksize大于客户端size 调用所有的
        return randomClients;
    }

    @Override
    public InvocationResponse invoke(final ServiceInvocationHandler handler, final InvokerContext invocationContext)
            throws Throwable {
        final InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
        final List<Client> clients = clientManager.getAvailableClients(invokerConfig, request);
        final AtomicInteger count = new AtomicInteger();
        //阻塞队列缓存结果
        final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
        //获取随机的client
        final List<Client> selectedClients = randomList(clients);
        for (final Client client : selectedClients) {
            executor.execute(new Runnable() {
                public void run() {
                    InvokerContext ctxt = new DefaultInvokerContext(invokerConfig, invocationContext.getMethodName(),
                            invocationContext.getParameterTypes(), invocationContext.getArguments());
                    ctxt.setClient(client);
                    ctxt.setRequest(null);
                    ctxt.setRequest(InvokerUtils.createRemoteCallRequest(ctxt, invokerConfig));
                    try {
                        InvocationResponse resp = handler.handle(ctxt);
                        //向堵塞队列塞入结果
                        ref.offer(resp);
                    } catch (Throwable e) {
                        int value = count.incrementAndGet();
                        if (value >= selectedClients.size()) {
                            ref.offer(e);
                        }
                    }
                }
            });
        }
        Object ret = null;
        //配置超时时间,获取超时时间内堵塞请求
        if (request.getTimeout() > 0) {
            ret = ref.poll(request.getTimeout(), TimeUnit.MILLISECONDS);
        } else {
            //堵塞等待直到有请求完成
            ret = ref.take();
        }
        if (ret instanceof Throwable) {
            throw (Throwable) ret;
        } else if ((ret instanceof FutureResponse)
                && CallMethod.isFuture(invokerConfig.getCallType())) {
            //如果是future响应则设置相关future
            FutureFactory.setFuture(((FutureResponse) ret).getServiceFuture());
        } else if (ret == null) {
            throw InvocationUtils.newTimeoutException("timeout while waiting forking response:" + request);
        }
        return (InvocationResponse) ret;
    }

    @Override
    public String getName() {
        return Constants.CLUSTER_FORKING;
    }

}


4.1.2 服务线程池隔离
线程池配置支持服务(单个类)级别与方法级别配置

如果服务有线程池配置,会在服务注册的时候就将相关的线程池创建好,线程池规则如果不配置都是默认共享线程池。

线程池有动态和非动态的区分,动态的支持配置中心修改,然后监听变动refresh线程池配置

public class NettyServerHandler extends SimpleChannelUpstreamHandler {
    private static final Logger log = LoggerLoader.getLogger(NettyServerHandler.class);
    private NettyServer server;
    public NettyServerHandler(NettyServer server) {
        this.server = server;
    }
    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (log.isDebugEnabled()) {
            if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
                log.debug(e.toString());
            }
        }
        super.handleUpstream(ctx, e);
    }

    /**
     * 服务器端接受到消息开始处理
     *
     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
     * org.jboss.netty.channel.MessageEvent)
     */
    @SuppressWarnings("unchecked")
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
        CodecEvent codecEvent = (CodecEvent) (message.getMessage());

        if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
            return;
        }
        InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();
        ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
        try {
            //异步调用
            this.server.processRequest(request, invocationContext);
        } catch (Throwable e) {
            String msg = "process request failed:" + request;
            // 心跳消息只返回正常的, 异常不返回
            if (request.getCallType() == Constants.CALLTYPE_REPLY
                    && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
                ctx.getChannel().write(ProviderUtils.createFailResponse(request, e));
            }
            log.error(msg, e);
        }
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    }

    @Override
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
        this.server.getChannelGroup().add(e.getChannel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        log.info(e.getCause().getMessage(), e.getCause());
        ctx.getChannel().close();
    }

}

    /**
     * server-并行处理器
     *
     * @param request
     * @param providerContext
     * @return
     */
    public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
                                                       final ProviderContext providerContext) {
        requestContextMap.put(request, providerContext);
        startMonitorData(request, providerContext);
        Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {
            @Override
            public InvocationResponse call() throws Exception {
                providerContext.getTimeline().add(new TimePoint(TimePhase.T));
                try {
                    //根据链路请求
                    ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
                            .selectInvocationHandler(providerContext.getRequest().getMessageType());
                    if (invocationHandler != null) {
                        providerContext.setThread(Thread.currentThread());
                        return invocationHandler.handle(providerContext);
                    }
                } catch (Throwable t) {
                    logger.error("Process request failed with invocation handler, you should never be here.", t);
                } finally {
                    requestContextMap.remove(request);
                }
                return null;
            }
        };
        //选择线程池
        final ThreadPool pool = selectThreadPool(request);

        try {
            checkRequest(pool, request);
            providerContext.getTimeline().add(new TimePoint(TimePhase.T));
            //线程池去 执行
            return pool.submit(requestExecutor);
        } catch (RejectedExecutionException e) {
            requestContextMap.remove(request);
            endMonitorData(request, providerContext);
            throw new RejectedException(getProcessorStatistics(pool), e);
        }

    }



private ThreadPool selectThreadPool(final InvocationRequest request) {
        ThreadPool pool = null;
        String serviceKey = request.getServiceName();
        String methodKey = serviceKey + "#" + request.getMethodName();

        // spring poolConfig
        pool = getConfigThreadPool(request);

        // 方法级别配置
        if (pool == null && !CollectionUtils.isEmpty(methodThreadPools)) {
            pool = methodThreadPools.get(methodKey);
        }
        // 服务级别配置
        if (pool == null && !CollectionUtils.isEmpty(serviceThreadPools)) {
            pool = serviceThreadPools.get(serviceKey);
        }

        // 开启动态线程池将获取动态线程池
        if (pool == null && poolConfigSwitchable && !CollectionUtils.isEmpty(apiPoolNameMapping)) {
            PoolConfig poolConfig = null;
            String poolName = apiPoolNameMapping.get(methodKey);
            if (StringUtils.isNotBlank(poolName)) { // 方法级别
                poolConfig = poolConfigs.get(poolName);
                if (poolConfig != null) {
                    pool = DynamicThreadPoolFactory.getThreadPool(poolConfig);
                }
            } else { //服务级别
                poolName = apiPoolNameMapping.get(serviceKey);
                if (StringUtils.isNotBlank(poolName)) {
                    poolConfig = poolConfigs.get(poolName);
                    if (poolConfig != null) {
                        pool = DynamicThreadPoolFactory.getThreadPool(poolConfig);
                    }
                }
            }
        }

        // 默认方式
        if (pool == null) {
            // 针对慢请求,为其选择相应的线程池
            if (enableSlowPool && requestTimeoutListener.isSlowRequest(request)) {
                pool = slowRequestProcessThreadPool;
            } else {
                //共享线程池
                pool = sharedRequestProcessThreadPool;
            }
        }
        return pool;
    }

五、后续想介绍的
1、点评pigeon与阿里dubbo RPC框架介绍与核心源码分享

2、RCP框架为了提高qps的一些优化