接着上面的服务端继续讲解客户端的开发
1,客户端的核心代码实现
public class RpcClientLoader extends RpcServerBase { //单例模式的设计 private volatile static RpcClientLoader rpcClientLoader; //执行的线程池操作 private static ListeningExecutorService threadPoolExecuter = MoreExecutors.listeningDecorator(RpcExecuter.getExecuter(10, 50, -1)); private EventLoopGroup work = new NioEventLoopGroup(PROCESS); //发送消息的句柄 private PushMessageHandler pushMessageHandler; //单例操作涉及到数据安全采用锁机制 private ReentrantLock lock = new ReentrantLock(); private Condition connect = lock.newCondition(); private Condition push = lock.newCondition(); /** * 单例构造函数 */ private RpcClientLoader() { } /** * 单例提供对外的访问变量 * @return */ public static RpcClientLoader getInstance() { return RpcClientLoaderHelp.rpcClientLoader; } /** * 内部类构造安全的单例模式 */ private static class RpcClientLoaderHelp { private static RpcClientLoader rpcClientLoader = new RpcClientLoader(); } /** * 加载对应的ip和协议 * @param inetAddress */ public synchronized void init(String inetAddress) { String[] addrs = inetAddress.split(FinalStaticUtil.DELIMITER); if (addrs.length == 3) { //设置当前的通信协议 setRpcSerializeProtocol(ProtocolSelecter.getProtocol(addrs[0])); String host = addrs[1]; int port = Integer.valueOf(addrs[2]); //netty链接的时候需要该对象 InetSocketAddress serviceAddr = new InetSocketAddress(host, port); //采用google提供的guava框架中的线程池操作类 //PushInitTask类是主要设计netty客户端连接和相关netty句柄的操作实现类 ListenableFuturefuture = threadPoolExecuter.submit(new PushInitTask(work, serviceAddr, rpcSerializeProtocol)); //对连接结果进行控制,guava框架中的内容 Futures.addCallback(future, new FutureCallback () { //连接成功后的处理 //成功的核心处理就是把发送消息的句柄传递回来,供以后客户端发送消息使用 @Override public void onSuccess(Boolean result) { try { lock.lock(); if (null == pushMessageHandler) { push.await(); } if (result && null != pushMessageHandler) { connect.signalAll(); } } catch (Exception e) { } finally { lock.unlock(); } } //连接失败后的处理 @Override public void onFailure(Throwable t) { // TODO } }, threadPoolExecuter); } } /** * 在netty的执行过程中设置相应的服务器状态 * @param channelState */ public void setChannelState(ChannelState channelState) { state = channelState; } /** * */ public synchronized void close() { pushMessageHandler.close(); threadPoolExecuter.shutdown(); work.shutdownGracefully(); //设置服务器状态 state = ChannelState.CLOSED; } /** * 对外提供netty发送消息的句柄的访问,保证数据安全,并且保证该句柄必须存在 * @return * @throws InterruptedException */ public PushMessageHandler getPushMessageHandler() throws InterruptedException { try { lock.lock(); //如果发送消息的句柄还没有设置则还没有连接上服务端 if (null == pushMessageHandler) { connect.await(); } return pushMessageHandler; } finally { lock.unlock(); } } /** * 设计netty的客户端消息发送句柄,该方法主要提供给netty链接成功后的操作 * @param pushMessageHandler */ public void setPushMessageHandler(PushMessageHandler pushMessageHandler) { try { lock.lock(); this.pushMessageHandler = pushMessageHandler; //该方法是在client连接到server后的操作,所以是连接成功,通知可以发送消息了 push.signalAll(); } finally { lock.unlock(); } }}
2,netty客户端的实现
public class PushInitTask implements Callable{ private EventLoopGroup eventLoopGroup; private InetSocketAddress address; private RpcSerializeProtocol protocol; public PushInitTask (EventLoopGroup eventLoopGroup, InetSocketAddress address, RpcSerializeProtocol protocol) { this.eventLoopGroup = eventLoopGroup; this.address = address; this.protocol = protocol; } /** * 下面是netty的标准实现 * 采用长连接的机制 * @return * @throws Exception */ @Override public Boolean call() throws Exception { //执行对netty的客户端操作 Bootstrap bootstrap = new Bootstrap(); //下面的参数需要在后期的升级开发中进行配置的功能升级 bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000); //设置通道流,根据序列化的机制采用不同的管道流 bootstrap.handler(new PushChannelHander(protocol)); //链接服务端 ChannelFuture future = null; try { future = bootstrap.connect(address); } catch (Exception e) { return false; } //设置服务器为正常状态 RpcClientLoader.getInstance().setChannelState(ChannelState.ALIVE); //将发送消息的句柄设置到相应的类中,给系统提示可以进行消息发送 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { //连接成功后的消息发送句柄 RpcClientLoader.getInstance().setPushMessageHandler( channelFuture.channel().pipeline().get(PushMessageHandler.class)); } }); return true; }}
3,netty客户端的句柄通道,客户端发送消息及接受反馈的操作主要体现在下面的配置
public class PushChannelHander extends ChannelInitializer{ private RpcSerializeProtocol protocol; public PushChannelHander (RpcSerializeProtocol protocol) { this.protocol = protocol; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //获得netty的通道,并对该通道设置相应的句柄 ChannelPipeline pipeline = socketChannel.pipeline(); //设置句柄的具体实现方法 PushPipelineFrame.initPipeline(pipeline, protocol); }}
public class PushPipelineFrame { /** * 设置相应的编码解码操作 * * @param pipeline * @param protocol */ public static void initPipeline(ChannelPipeline pipeline, RpcSerializeProtocol protocol) { switch (protocol) { case HESSIONSERIALIZE: HessionMessageCodec codec = new HessionMessageCodec(); pipeline.addLast(new HessionMessageDecoder(codec)); pipeline.addLast(new HessionMessageEncoder(codec)); pipeline.addLast(new PushMessageHandler()); break; case KRYOSERIALIZE: break; } }}
4,消息的实际操作
public class PushMessageHandler extends ChannelInboundHandlerAdapter { private ConcurrentHashMapcallbacks = new ConcurrentHashMap (); //执行的数据通道,需要由通道进行数据的传输 private volatile Channel channel; @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); this.channel = ctx.channel(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } /** * 服务器端返回的数据结果 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcResponse rpcResponse = (RpcResponse) msg; String requestId = rpcResponse.getRequestId(); if (callbacks.containsKey(requestId)) { PushCallback callback = callbacks.get(requestId); //将结果放置在客户端的回调函数中 callback.doEnd(rpcResponse); callbacks.remove(requestId); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); ctx.close(); } /** * 客户端执行数据发送的真是操作,可以通过netty来实现 * @param rpcRequest * @return */ public PushCallback pushMessage(RpcRequest rpcRequest) { PushCallback callback = new PushCallback(rpcRequest); callbacks.put(rpcRequest.getRequestId(), callback); //数据写入通道并刷新到服务端 channel.writeAndFlush(rpcRequest); return callback; } /** * 客户端关闭通道的处理 * 仅仅在客户端程序停止时调用 */ public void close() { channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); }}
根据netty的特点,客户端的整体流程如下:
1,用户根据服务器的特点启动辅助类,保证服务是可以正常运行,并且调用netty的创建
2,用户线程创建Bootstrap,通过api设置netty客户端的相关参数
3,创建客户端的Reactor的线程组,该线程组主要处理客户端连接,io读写的NioEventLoopGroup,可以指定线程的数量,默认是cpu内核的2倍
4,通过Bootstrap的channelFactory和用户指定的channel类型,用作客户端处理的NioSocketChannel,区别于服务端的NioServerSocketChannel
5,创建ChannelHandlerPipeline,用户调度和执行网络事件,就是我们增加的handler功能
6,异步发起tcp连接,判断是否连接成功,如果注册成功,则通过继续用户数据的接受和消息的发送,如果没有连接成功,则等待链接结果
7,注册对应的网络监听状态到多路复用器,并且轮询个channel,处理连接的结果
8,连接成功,设置future结果,发送连接成功事件,出发channelpipeline执行
9,有channelpipeline调度执行系统和用户的channelhandler,执行自定义的业务逻辑