博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty--数据通信和心跳检测
阅读量:4452 次
发布时间:2019-06-07

本文共 16843 字,大约阅读时间需要 56 分钟。

数据通信

概述:

netty的ReadTimeOut实现方案3

服务端:

public class Server {    public static void main(String[] args) throws Exception{                EventLoopGroup pGroup = new NioEventLoopGroup();        EventLoopGroup cGroup = new NioEventLoopGroup();                ServerBootstrap b = new ServerBootstrap();        b.group(pGroup, cGroup)         .channel(NioServerSocketChannel.class)         .option(ChannelOption.SO_BACKLOG, 1024)         //设置日志         .handler(new LoggingHandler(LogLevel.INFO))         .childHandler(new ChannelInitializer
() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}
View Code

主要是加入sc.pipeline().addLast(new ReadTimeoutHandler(5)); 

客户端:

public class Client {    private static class SingletonHolder {        static final Client instance = new Client();    }    public static Client getInstance() {        return SingletonHolder.instance;    }    private EventLoopGroup group;    private Bootstrap b;    private ChannelFuture cf;    private Client() {        group = new NioEventLoopGroup();        b = new Bootstrap();        b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))                .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用) sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect() { try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture() { if (this.cf == null) { this.connect(); } if (!this.cf.channel().isActive()) { this.connect(); } return this.cf; } public static void main(String[] args) throws Exception { final Client c = Client.getInstance(); // c.connect(); ChannelFuture cf = c.getChannelFuture(); for (int i = 1; i <= 3; i++) { Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } cf.channel().closeFuture().sync(); new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入子线程..."); ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); // 再次发送数据 Request request = new Request(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程结束."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开连接,主线程结束.."); }}
View Code

主要看getChannelFuture这个方法,this.cf == null是第一次连接的时候用到的,!this.cf.channel().isActive() 是连接超时后重新发起连接用到的。

其他的代码:

public class ClientHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        try {            Response resp = (Response) msg;            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());        } finally {            ReferenceCountUtil.release(msg);        }    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}public class ServerHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        Request request = (Request) msg;        System.out                .println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());        Response response = new Response();        response.setId(request.getId());        response.setName("response" + request.getId());        response.setResponseMessage("响应内容" + request.getId());        ctx.writeAndFlush(response);// .addListener(ChannelFutureListener.CLOSE);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}public final class MarshallingCodeCFactory {    /**     * 创建Jboss Marshalling解码器MarshallingDecoder     * @return MarshallingDecoder     */    public static MarshallingDecoder buildMarshallingDecoder() {        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");        //创建了MarshallingConfiguration对象,配置了版本号为5         final MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        //根据marshallerFactory和configuration创建provider        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);        return decoder;    }    /**     * 创建Jboss Marshalling编码器MarshallingEncoder     * @return MarshallingEncoder     */    public static MarshallingEncoder buildMarshallingEncoder() {        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");        final MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组        MarshallingEncoder encoder = new MarshallingEncoder(provider);        return encoder;    }}public class Request implements Serializable {    private static final long serialVersionUID = 1L;    private String id;    private String name;    private String requestMessage;    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public String getRequestMessage() {        return requestMessage;    }    public void setRequestMessage(String requestMessage) {        this.requestMessage = requestMessage;    }}public class Response implements Serializable {    private static final long serialVersionUID = 1L;    private String id;    private String name;    private String responseMessage;    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public String getResponseMessage() {        return responseMessage;    }    public void setResponseMessage(String responseMessage) {        this.responseMessage = responseMessage;    }}
View Code

 心跳检测

概述:

代码示例:

public class Server {    public static void main(String[] args) throws Exception {        EventLoopGroup pGroup = new NioEventLoopGroup();        EventLoopGroup cGroup = new NioEventLoopGroup();        ServerBootstrap b = new ServerBootstrap();        b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)                // 设置日志                .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer
() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** key:ip value:auth */ private static HashMap
AUTH_IP_MAP = new HashMap
(); private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("192.168.1.200", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg) { // System.out.println(msg); String[] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if (auth != null && auth.equals(ret[1])) { ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof String) { auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMap
cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap
memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }}public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClienHeartBeattHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}public class ClienHeartBeattHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture
heartBeat; // 主动向服务器发送认证信息 private InetAddress addr; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; // 证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if (msg instanceof String) { String ret = (String) msg; if (SUCCESS_KEY.equals(ret)) { // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); // ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); // cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMap
cpuPercMap = new HashMap
(); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap
memoryMap = new HashMap
(); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }}public final class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); // 创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); // 根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; }}public class RequestInfo implements Serializable { private String ip; private HashMap
cpuPercMap; private HashMap
memoryMap; // .. other field public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public HashMap
getCpuPercMap() { return cpuPercMap; } public void setCpuPercMap(HashMap
cpuPercMap) { this.cpuPercMap = cpuPercMap; } public HashMap
getMemoryMap() { return memoryMap; } public void setMemoryMap(HashMap
memoryMap) { this.memoryMap = memoryMap; }}
View Code

当client刚刚连接的时候,会发送认证信息到server端认证,认证通过后再定时发送心跳包。

转载于:https://www.cnblogs.com/lostyears/p/8482450.html

你可能感兴趣的文章
搭建keepalived+mysql主从复制高可用
查看>>
假如你在每一个变化中看见崭新的自己
查看>>
转:iphone 申请证书
查看>>
电子测量作业——采用DDS(数字频率合成法)设计信号发生器 ,完成设计方案。...
查看>>
Python就业方向
查看>>
一步步学习SPD2010--第二章节--处理SP网站(3)--创建网站层次架构
查看>>
TCP
查看>>
Excel常用函数大全
查看>>
团队-团队编程项目中国象棋-模块测试过程
查看>>
R-创建数据集-ch2
查看>>
gitHub地址
查看>>
10个经典的C语言面试基础算法及代码
查看>>
[概念] js的函数节流和throttle和debounce详解
查看>>
普通的java Ftp客户端的文件上传
查看>>
视图系统
查看>>
Palindromes _easy version
查看>>
vue 小记
查看>>
CURRICULUM VITAE
查看>>
菱形缓冲器电路
查看>>
盲点流水账记录
查看>>