来源:开源中国 时间:2022-12-14 14:12:44
(资料图)
编辑切换为居中
添加图片注释,不超过 140 字(可选)
3、服务端连接管理器(1)服务端增加连接管理器MiiServerConnect,重写Channel的channelActive和channelInactive方法,监听Channel活跃状态情况进行处理,如下图所示: 编辑切换为居中
添加图片注释,不超过 140 字(可选)
(2)发布连接监听事件,主要通过spring的发布时间监听来处理,增加连接监听器ChannelConnectListener。 编辑切换为居中
添加图片注释,不超过 140 字(可选)
4、对客户端心跳检测(1)增加心跳检测超时时间配置,如下图所示: 编辑切换为居中
添加图片注释,不超过 140 字(可选)
(2)服务端心跳检测超时时间,超时则主动断开链接。 编辑切换为居中
添加图片注释,不超过 140 字(可选)
5、源码解读(1)服务端连接管理器源码package com.takeoff.iot.modbus.server.connect;import com.takeoff.iot.modbus.common.entity.ChannelConnectData;import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum;import com.takeoff.iot.modbus.common.utils.CacheUtils;import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils;import com.takeoff.iot.modbus.common.utils.SpringContextUtil;import com.takeoff.iot.modbus.netty.channel.MiiChannel;import com.takeoff.iot.modbus.netty.device.MiiDeviceChannel;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.ChannelHandler.Sharable;import lombok.extern.slf4j.Slf4j;import org.springframework.context.ApplicationContext;import org.springframework.util.ObjectUtils;import java.net.SocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;/*** 类功能说明:客户端链接管理器(2)连接监听器发布事件源码
* 公司名称:TF(腾飞)开源
* 作者:luorongxi
*/@Slf4j@Sharablepublic class MiiServerConnect extends ChannelInboundHandlerAdapter { private ApplicationContext getApplicationContext = SpringContextUtil.applicationContext; private static int TIMEOUT = 5000; /*** 连接成功次数*/private MaponLineMap = new HashMap<>(); /*** 连接断开次数*/private Map breakOffMap = new HashMap<>(); public MiiServerConnect(){ } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //成功后,重连失败次数清零 Channel channel = ctx.channel(); ctx.fireChannelActive(); if(!JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); MiiChannel miiChannel = new MiiDeviceChannel(channel); Integer onLine = (ObjectUtils.isEmpty(onLineMap.get(miiChannel.name())) ? 0 : onLineMap.get(miiChannel.name())) + 1; onLineMap.put(miiChannel.name(), onLine); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.ON_LINE.getKey(), address, onLine); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); //将柜地址与通讯管道的绑定关系写入缓存 CacheUtils.put(miiChannel.name(), miiChannel); } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); Channel channel = ctx.channel(); if(!JudgeEmptyUtils.isEmpty(channel) && !JudgeEmptyUtils.isEmpty(channel.remoteAddress())){ String address = channel.remoteAddress().toString().substring(1,channel.remoteAddress().toString().length()); MiiChannel miiChannel = new MiiDeviceChannel(channel); Integer breakOff = (ObjectUtils.isEmpty(breakOffMap.get(miiChannel.name())) ? 0 : breakOffMap.get(miiChannel.name())) + 1; breakOffMap.put(miiChannel.name(), breakOff); ChannelConnectData connectServerData = new ChannelConnectData(this, DeviceConnectEnum.BREAK_OFF.getKey(), address, breakOff); if(!JudgeEmptyUtils.isEmpty(connectServerData) && !JudgeEmptyUtils.isEmpty(getApplicationContext)){ getApplicationContext.publishEvent(connectServerData); } //将通讯管道的绑定关系从缓存中删除 CacheUtils.remove(miiChannel.name()); //连接断开后的最后处理 ctx.pipeline().remove(ctx.handler()); ctx.deregister(); ctx.close(); } }}
package com.takeoff.iot.modbus.common.entity;import com.takeoff.iot.modbus.common.enums.DeviceConnectEnum;import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils;import org.springframework.context.ApplicationEvent;import lombok.Getter;@Getterpublic class ChannelConnectData extends ApplicationEvent { /** * 描述: TODO(3)连接监听器源码
* Fields serialVersionUID : TODO
*/private static final long serialVersionUID = 2111432846029949421L; private String deviceAddress = null; private Integer deviceConnect = null; private String connectMsg = null; public ChannelConnectData(Object source, Integer deviceConnect, String deviceAddress, int count) { super(source); if(!JudgeEmptyUtils.isEmpty(deviceAddress)){ this.deviceConnect = deviceConnect; this.deviceAddress = deviceAddress; this.connectMsg = "设备:"+ deviceAddress + DeviceConnectEnum.getName(deviceConnect) + ",累计:"+ count + "次"; } }}
package com.takeoff.iot.modbus.test.listener;import com.takeoff.iot.modbus.common.entity.ChannelConnectData;import com.takeoff.iot.modbus.common.utils.JudgeEmptyUtils;import org.springframework.context.event.EventListener;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class ChannelConnectListener { @EventListener public void handleReceiveDataEvent(ChannelConnectData data) { if(JudgeEmptyUtils.isEmpty(data.getDeviceConnect())){ log.info("设备连接状态码:"+data.getDeviceConnect()+" ---> "+data.getConnectMsg()); } }}(4)服务端心跳检测超时时间源码
package com.takeoff.iot.modbus.server;import java.util.List;import java.util.concurrent.TimeUnit;import com.takeoff.iot.modbus.common.utils.CacheUtils;import com.takeoff.iot.modbus.netty.device.MiiDeviceChannel;import com.takeoff.iot.modbus.netty.device.MiiDeviceGroup;import com.takeoff.iot.modbus.netty.device.MiiControlCentre;import com.takeoff.iot.modbus.common.bytes.factory.MiiDataFactory;import com.takeoff.iot.modbus.common.data.MiiHeartBeatData;import com.takeoff.iot.modbus.common.message.MiiMessage;import com.takeoff.iot.modbus.netty.channel.MiiChannel;import com.takeoff.iot.modbus.netty.channel.MiiChannelGroup;import com.takeoff.iot.modbus.netty.data.factory.MiiServerDataFactory;import com.takeoff.iot.modbus.netty.handle.*;import com.takeoff.iot.modbus.netty.listener.MiiListener;import com.takeoff.iot.modbus.server.connect.MiiServerConnect;import com.takeoff.iot.modbus.server.message.sender.MiiServerMessageSender;import com.takeoff.iot.modbus.server.message.sender.ServerMessageSender;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.timeout.IdleStateEvent;import io.netty.handler.timeout.IdleStateHandler;import lombok.Getter;import lombok.extern.slf4j.Slf4j;/*** 类功能说明:设备通讯服务端(4)更详细的内容请查看“腾飞开源”物联网通讯协议 iot-modbus V3.2.8版本,gitee地址:https://gitee.com/takeoff/iot-modbus
* 公司名称:TF(腾飞)开源
* 作者:luorongxi
*/@Slf4jpublic class MiiServer extends ChannelInitializerimplements MiiControlCentre { private static int IDLE_TIMEOUT = 60000; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ChannelFuture future; private int port,nThread; @Getter private MiiChannelGroup groups; private MiiServerConnect connect; private ServerMessageSender sender; private MiiListenerHandler handler; private MiiDataFactory dataFactory; /*** 创建指定服务端口,默认线程数的服务端* @param port 服务端口*/public MiiServer(int port){ this(port, 0, IDLE_TIMEOUT); } /*** 创建指定服务端口,指定线程数的服务端* @param port 服务端口* @param nThread 执行线程池线程数* @param heartBeatTime 心跳检测超时时间(单位:毫秒)*/public MiiServer(int port, int nThread, int heartBeatTime){ this.port = port; this.nThread = nThread; this.IDLE_TIMEOUT = heartBeatTime; this.groups = new MiiChannelGroup(); this.connect = new MiiServerConnect(); this.sender = new MiiServerMessageSender(); this.handler = new MiiListenerHandler(this.groups); this.handler.addListener(MiiMessage.HEARTBEAT, new MiiListener() { @Override public void receive(MiiChannel channel, MiiMessage message) { //通讯通道绑定设备IP groups.get(channel.name()).name(message.deviceGroup()); log.info("Netty通讯已绑定设备IP:"+ message.deviceGroup()); } }); this.dataFactory = new MiiServerDataFactory(); } /*** 启动服务*/public void start(){ bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(nThread); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(this); future = b.bind(port); } /*** 停止服务*/public void stop(){ future.channel().closeFuture(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } /*** 根据名称/地址找已连接设备组* 名称/地址不存在或者未连接时返回null值* @param name 名称/地址* @return 设备组*/public MiiChannel group(String name) { return get(name); } /*** 列出所有已连接设备组清单* @return 所有已连接身边组清单*/public List groups() { return groups.list(); } public ServerMessageSender sender(){ return sender; } /*** 添加接收指定指令的消息监听器* @param command 指令类型 {@link MiiMessage}* @param listener 消息监听器* @return 上一个消息监听器,如果没有返回null*/public MiiListener addListener(int command, MiiListener listener){ return handler.addListener(command, listener); } /*** 移除接收指定指令的消息监听器* @param command 指令类型 {@link MiiMessage}* @return 移除消息监听器,如果没有返回null*/public MiiListener removeListener(int command){ return handler.removeListener(command); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); MiiDeviceGroup group = new MiiDeviceChannel(ch); add(group); //服务端心跳检测超时时间,超时则主动断开链接 p.addLast(new IdleStateHandler(0, 0, IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); p.addLast(new ChannelInboundHandlerAdapter(){ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ ctx.disconnect(); } else { super.userEventTriggered(ctx, evt); } } }); p.addLast(new MiiMessageEncoder()); p.addLast(new MiiBasedFrameDecoder()); p.addLast(new MiiMessageDecoder(dataFactory)); p.addLast(connect); p.addLast(handler); p.addLast(new MiiExceptionHandler()); } @Override public boolean add(MiiChannel channel) { return groups.add(channel); } @Override public MiiChannel remove(String name) { return groups.remove(name); } @Override public MiiChannel get(String name) { return groups.get(name); }}