博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java高并发设计(十四)-- netty通信之客户端
阅读量:5924 次
发布时间:2019-06-19

本文共 9204 字,大约阅读时间需要 30 分钟。

hot3.png

接着上面的服务端继续讲解客户端的开发

1,客户端的核心代码实现

public class RpcClientLoader extends RpcServerBase {    //单例模式的设计    private volatile static RpcClientLoader rpcClientLoader;    //执行的线程池操作    private static ListeningExecutorService threadPoolExecuter = MoreExecutors.listeningDecorator(RpcExecuter.getExecuter(10, 50, -1));    private EventLoopGroup work = new NioEventLoopGroup(PROCESS);    //发送消息的句柄    private PushMessageHandler pushMessageHandler;    //单例操作涉及到数据安全采用锁机制    private ReentrantLock lock = new ReentrantLock();    private Condition connect = lock.newCondition();    private Condition push = lock.newCondition();    /**     * 单例构造函数     */    private RpcClientLoader() {    }    /**     * 单例提供对外的访问变量     * @return     */    public static RpcClientLoader getInstance() {        return RpcClientLoaderHelp.rpcClientLoader;    }    /**     * 内部类构造安全的单例模式     */    private static class RpcClientLoaderHelp {        private static RpcClientLoader rpcClientLoader = new RpcClientLoader();    }    /**     * 加载对应的ip和协议     * @param inetAddress     */    public synchronized void init(String inetAddress) {        String[] addrs = inetAddress.split(FinalStaticUtil.DELIMITER);        if (addrs.length == 3) {            //设置当前的通信协议            setRpcSerializeProtocol(ProtocolSelecter.getProtocol(addrs[0]));            String host = addrs[1];            int port = Integer.valueOf(addrs[2]);            //netty链接的时候需要该对象            InetSocketAddress serviceAddr = new InetSocketAddress(host, port);            //采用google提供的guava框架中的线程池操作类            //PushInitTask类是主要设计netty客户端连接和相关netty句柄的操作实现类            ListenableFuture
future = threadPoolExecuter.submit(new PushInitTask(work, serviceAddr, rpcSerializeProtocol)); //对连接结果进行控制,guava框架中的内容 Futures.addCallback(future, new FutureCallback
() { //连接成功后的处理 //成功的核心处理就是把发送消息的句柄传递回来,供以后客户端发送消息使用 @Override public void onSuccess(Boolean result) { try { lock.lock(); if (null == pushMessageHandler) { push.await(); } if (result && null != pushMessageHandler) { connect.signalAll(); } } catch (Exception e) { } finally { lock.unlock(); } } //连接失败后的处理 @Override public void onFailure(Throwable t) { // TODO } }, threadPoolExecuter); } } /** * 在netty的执行过程中设置相应的服务器状态 * @param channelState */ public void setChannelState(ChannelState channelState) { state = channelState; } /** * */ public synchronized void close() { pushMessageHandler.close(); threadPoolExecuter.shutdown(); work.shutdownGracefully(); //设置服务器状态 state = ChannelState.CLOSED; } /** * 对外提供netty发送消息的句柄的访问,保证数据安全,并且保证该句柄必须存在 * @return * @throws InterruptedException */ public PushMessageHandler getPushMessageHandler() throws InterruptedException { try { lock.lock(); //如果发送消息的句柄还没有设置则还没有连接上服务端 if (null == pushMessageHandler) { connect.await(); } return pushMessageHandler; } finally { lock.unlock(); } } /** * 设计netty的客户端消息发送句柄,该方法主要提供给netty链接成功后的操作 * @param pushMessageHandler */ public void setPushMessageHandler(PushMessageHandler pushMessageHandler) { try { lock.lock(); this.pushMessageHandler = pushMessageHandler; //该方法是在client连接到server后的操作,所以是连接成功,通知可以发送消息了 push.signalAll(); } finally { lock.unlock(); } }}

2,netty客户端的实现

public class PushInitTask implements Callable
{ private EventLoopGroup eventLoopGroup; private InetSocketAddress address; private RpcSerializeProtocol protocol; public PushInitTask (EventLoopGroup eventLoopGroup, InetSocketAddress address, RpcSerializeProtocol protocol) { this.eventLoopGroup = eventLoopGroup; this.address = address; this.protocol = protocol; } /** * 下面是netty的标准实现 * 采用长连接的机制 * @return * @throws Exception */ @Override public Boolean call() throws Exception { //执行对netty的客户端操作 Bootstrap bootstrap = new Bootstrap(); //下面的参数需要在后期的升级开发中进行配置的功能升级 bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000); //设置通道流,根据序列化的机制采用不同的管道流 bootstrap.handler(new PushChannelHander(protocol)); //链接服务端 ChannelFuture future = null; try { future = bootstrap.connect(address); } catch (Exception e) { return false; } //设置服务器为正常状态 RpcClientLoader.getInstance().setChannelState(ChannelState.ALIVE); //将发送消息的句柄设置到相应的类中,给系统提示可以进行消息发送 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { //连接成功后的消息发送句柄 RpcClientLoader.getInstance().setPushMessageHandler( channelFuture.channel().pipeline().get(PushMessageHandler.class)); } }); return true; }}

3,netty客户端的句柄通道,客户端发送消息及接受反馈的操作主要体现在下面的配置

public class PushChannelHander extends ChannelInitializer
{ private RpcSerializeProtocol protocol; public PushChannelHander (RpcSerializeProtocol protocol) { this.protocol = protocol; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //获得netty的通道,并对该通道设置相应的句柄 ChannelPipeline pipeline = socketChannel.pipeline(); //设置句柄的具体实现方法 PushPipelineFrame.initPipeline(pipeline, protocol); }}
public class PushPipelineFrame {    /**     * 设置相应的编码解码操作     *     * @param pipeline     * @param protocol     */    public static void initPipeline(ChannelPipeline pipeline, RpcSerializeProtocol protocol) {        switch (protocol) {            case HESSIONSERIALIZE:                HessionMessageCodec codec = new HessionMessageCodec();                pipeline.addLast(new HessionMessageDecoder(codec));                pipeline.addLast(new HessionMessageEncoder(codec));                pipeline.addLast(new PushMessageHandler());                break;            case KRYOSERIALIZE:                break;        }    }}

4,消息的实际操作

public class PushMessageHandler extends ChannelInboundHandlerAdapter {    private ConcurrentHashMap
callbacks = new ConcurrentHashMap
(); //执行的数据通道,需要由通道进行数据的传输 private volatile Channel channel; @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); this.channel = ctx.channel(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } /** * 服务器端返回的数据结果 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcResponse rpcResponse = (RpcResponse) msg; String requestId = rpcResponse.getRequestId(); if (callbacks.containsKey(requestId)) { PushCallback callback = callbacks.get(requestId); //将结果放置在客户端的回调函数中 callback.doEnd(rpcResponse); callbacks.remove(requestId); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); ctx.close(); } /** * 客户端执行数据发送的真是操作,可以通过netty来实现 * @param rpcRequest * @return */ public PushCallback pushMessage(RpcRequest rpcRequest) { PushCallback callback = new PushCallback(rpcRequest); callbacks.put(rpcRequest.getRequestId(), callback); //数据写入通道并刷新到服务端 channel.writeAndFlush(rpcRequest); return callback; } /** * 客户端关闭通道的处理 * 仅仅在客户端程序停止时调用 */ public void close() { channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); }}

根据netty的特点,客户端的整体流程如下:

1,用户根据服务器的特点启动辅助类,保证服务是可以正常运行,并且调用netty的创建

2,用户线程创建Bootstrap,通过api设置netty客户端的相关参数

3,创建客户端的Reactor的线程组,该线程组主要处理客户端连接,io读写的NioEventLoopGroup,可以指定线程的数量,默认是cpu内核的2倍

4,通过Bootstrap的channelFactory和用户指定的channel类型,用作客户端处理的NioSocketChannel,区别于服务端的NioServerSocketChannel

5,创建ChannelHandlerPipeline,用户调度和执行网络事件,就是我们增加的handler功能

6,异步发起tcp连接,判断是否连接成功,如果注册成功,则通过继续用户数据的接受和消息的发送,如果没有连接成功,则等待链接结果

7,注册对应的网络监听状态到多路复用器,并且轮询个channel,处理连接的结果

8,连接成功,设置future结果,发送连接成功事件,出发channelpipeline执行

9,有channelpipeline调度执行系统和用户的channelhandler,执行自定义的业务逻辑

转载于:https://my.oschina.net/wangshuaixin/blog/864227

你可能感兴趣的文章
引入间接隔离变化(三)
查看>>
统一沟通-技巧-4-让国内域名提供商“提供”SRV记录
查看>>
cocos2d-x 3.0事件机制及用户输入
查看>>
比亚迪速锐F3专用夏季座套 夏天坐垫 四季坐套
查看>>
C++ 数字转换为string类型
查看>>
程序员全国不同地区,微信(面试 招聘)群。
查看>>
【干货】界面控件DevExtreme视频教程大汇总!
查看>>
闭包 !if(){}.call()
查看>>
python MySQLdb安装和使用
查看>>
Java小细节
查看>>
poj - 1860 Currency Exchange
查看>>
chgrp命令
查看>>
Java集合框架GS Collections具体解释
查看>>
洛谷 P2486 BZOJ 2243 [SDOI2011]染色
查看>>
linux 笔记本的温度提示
查看>>
(转)DOTA新版地图6.78发布:大幅改动 增两位新英雄
查看>>
数值积分中的辛普森方法及其误差估计
查看>>
Web service (一) 原理和项目开发实战
查看>>
跑带宽度多少合适_跑步机选购跑带要多宽,你的身体早就告诉你了
查看>>
[J2MEQ&A]WTK初始化WMAClient报错XXX has no IP address的解释
查看>>