欢迎来到友道创新学院!
咨询电话:010-59519886
电路设计热门培训内容之基于Netty的物联网应用


物联网是将无处不在(Ubiquitous)的末端设备(Devices)和设施(Facilities),包括具备“内在智能”的传感器、移动终端、工业系统、楼控系统、家庭智能设施、视频监控系统等、和“外在使能”(Enabled)的,如贴上RFID的各种资产(Assets)、携带无线终端的个人与车辆等等“智能化物件或动物”或“智能尘埃”(Mote),通过各种无线和/或有线的长距离和/或短距离通讯网络。这次我们要说的是智慧农业的一个项目。


本项目是基于局域网和即将到来的5G为信息载体,以终端节点(EndNodes)、网关(Gateway)、云服务器(LoRaWAN Server)和客户端(Client)组成。用于监测温室,大棚等局部环境变化。做到实时监控,提前预防。


先让我们一起看一下ChannelPipeline对事件流的拦截和处理流程                                       

每个ChannelHandler 被添加到ChannelPipeline 后,都会创建一个ChannelHandlerContext 并与之创建的ChannelHandler 关联绑定。

在ChannelHandler 添加到 ChannelPipeline 时会创建一个实例,就是接口 ChannelHandlerContext,它代表了 ChannelHandler 和ChannelPipeline 之间的关联。接口ChannelHandlerContext 主要是对通过同一个 ChannelPipeline 关联的 ChannelHandler 之间的交互进行管理。


那么我就不多说了,直接开干。

第一步:先启动线程。ServerServletListener

public void contextInitialized(ServletContextEvent arg0) {


Thread thread = new Thread(new Runnable() {


@Override


public void run() {


try {


nc = NettyClient.getInstance();


nc.setRecvCallback(new RecvData());


nc.setSendCallback(new SendData());


nc.connect("127.0.0.1", 9001);


} catch (Exception e) {


nc.shutdown();


log.error("启动Netty服务失败:" + e);


}


}


});


thread.start();


}


TcpHandler负责与管道打交道,是整个项目的最底层,他继承自ChannelInboundHandlerAdapter


是接收LoRa终端回传数据最底层的类。

TcpHandler

 //recvMessage方法接收从LoRa客户端传过来的参数


   private void recvMessage(ByteBuf buf) {


     byte[] cbBuf = new byte[buf.readableBytes()];


     buf.readBytes(cbBuf);


     logger.debug("硬件类型:" + cbBuf[0]);


     switch (cbBuf[0]) {


     case 3:


       recvTHSensor(cbBuf);


       break;


     }


   }


//cbBuf是包含了最原始的数据信息


   //这个类的主要作用是对原始数据包进行处理


   private void recvTHSensor(byte[] cbBuf) {


System.out.println("========TcpHandler==recvTHSensor==========");


     int length = cbBuf.length;


     if (6 == length) {


       MsgTHSensorStateNotify msg = new MsgTHSensorStateNotify();


       boolean b = msg.Unpacking(cbBuf);


       ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();


       if ((b) && (callback != null))


         callback.onTHSensorStateNotify(msg);


       else


         logger.info("解包出错!");


     }


     else if (10 == length) {


       MsgTHSensorNotify msg = new MsgTHSensorNotify();


       boolean b = msg.Unpacking(cbBuf);


       ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();


       if ((b) && (callback != null))


         callback.onTHSensorNotify(msg);


       else


         logger.info("解包出错!");


     }  


}


MsgTHSensorNotify

//这个类的主要作用是对原始数据包进行拆包处理处理,获取温度和湿度信息


  public boolean Unpacking(byte[] data)


   {


     int length = data.length;


     if (10 != length) return false;


     this.humidity = (Byte.toString(data[(length - 4)]) + "." + Byte.toString(data[(length - 3)]));


     this.temperature = (Byte.toString(data[(length - 2)]) + "." + Byte.toString(data[(length - 1)]));


     return true;


   }


RecvData


//收到传感器触发数据,把MsgTHSensorNotify解析出来的数据存入到数据库中


public void onTHSensorNotify(MsgTHSensorNotify arg0) {


logger.debug(arg0.toString());


try {


IDataRecordSetDao idrs = new DataRecordSetDao();


DataRecordSet drs = new DataRecordSet();


drs.setTemperature(arg0.getTemperature());


System.out.println("======onTHSensorNotify中getTemperature的值为"+arg0.getTemperature()+"==========");


drs.setHumidity(arg0.getHumidity());


System.out.println("=============onTHSensorNotify中getHumidity的值为"+arg0.getHumidity()+"=====");


drs.setData_time((new Date()).getTime());


idrs.add(drs);


} catch (SQLException e) {


logger.warn("数据新增出错"+e);


}


}


需要说明的一点是DataRecordSet是对MsgTHSensorNotify数据的封装,他的作用是将MsgTHSensorNotify的数据进行进一步的封装,以便数据库查询数据,接收数据,用在数据层。MsgTHSensorNotify是对TcpHandler传过来的数据进行提取封装的类。


DataRecordSetDao是数据库操作类


IDataRecordSetDao

DataRecordSet是对MsgTHSensorNotify数据的封装,他的作用是将MsgTHSensorNotify的数据进行进一步的封装,以便数据库查询数据,接收数据,用在数据层。


@Override


public void add(DataRecordSet drs) throws SQLException {


String sql = "insert into `data_record`(`temperature`,`humidity`,`dt`) values (?,?,?);";


PreparedStatement pstmt = connection.prepareStatement(sql);


pstmt.setString(1, drs.getTemperature());


pstmt.setString(2, drs.getHumidity());


pstmt.setLong(3, valueToLongs(drs.getData_time()));


System.out.println("===============插入数据库==============");


pstmt.executeUpdate();


pstmt.close();


connection.close();


}


这里必须强调一点的是drs.setData_time((new Date()).getTime());//设置的是毫秒数


他的数值已经很大了,细细一数已经到13位数了(1525513938762),这就涉及到存储的问题了。首先说java部分吧。


public void setData_time(long data_time),set方法是long类型的参数,然后从Long的包装类中valueOf(Long l)获得启发,便自己也写了一个类似的方法,在网上很多人都在问如何存储一个超过9位数的商品编号,那么这就是一个很好的例子。


public static Long valueToLongs(long l) {


return new Long(l);


}


数据库部分你需要定义一个bigint类型的time值。


现在数据已经存储到数据库了,接下来我们在通过其他类来取出数据,更加客观的展现给人们。


我们通过JSON传过来请求参数"code":0,"device":"Web","expression":{"field":"","start_time":start,"end_time":end,"length":length},"signature":"LoRa"


创建相应的实体类来接收数据,要强调一点的是实体类属性最好与AJAX传过来的键一致。类似POJO。这点很重要,否则会发生意想不到的错误,而且还不好修改。具体原因请了解@ResponseBody和@RequestBody的匹配规则。


先来看看AJAX请求的数据。




Url部分一定要修改成自己的url地址


Spring是位于前端控制器部分,他负责请求转发和数据处理。


@RequestMapping(value ="/getSensorRecord", method = {RequestMethod.POST }, produces = "application/json;charset=utf-8")


public @ResponseBody Map<String, Object> getSensorRecord(@RequestBody RecvJson recv) {


Map<String, Object> result = new HashMap<String, Object>();


if (null != recv) {


if (recv.getCode() == QueryCode.TemperatureHumidityCode && "LoRa".equals(recv.getSignature())) {


try {


//数据库接收数据,查询数据的类


IDataRecordSetDao drd = new DataRecordSetDao();


int length = (0 != recv.getExpression().getLength())?recv.getExpression().getLength():20 ;


ArrayList<DataRecordSet> list = drd.query(recv.getExpression().getStart_time(),


recv.getExpression().getEnd_time(), length);


int listLength = (list.size() > 20)?length:list.size();


for (int i = 0; i < listLength; i++) {


Map<String, Object> unit = new HashMap<String, Object>();


unit.put("id", i + 1);


if ("T".equals(recv.getExpression().getField().trim())) {


unit.put("temperature", list.get(i).getTemperature());


} else if ("H".equals(recv.getExpression().getField().trim())) {


unit.put("humidity", list.get(i).getHumidity());


} else {


unit.put("temperature", list.get(i).getTemperature());


unit.put("humidity", list.get(i).getHumidity());


}


unit.put("time",


(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(list.get(i).getData_time())));


result.put(Integer.toString(i + 1), unit);


unit = null;


}


result.put("error", 0);


result.put("data", recv.getCode());


list = null;


drd = null;


} catch (Exception e) {


logger.error("查询数据出错:" + e);


}


} else {


// {"error":1,"data":"0","message":"code或signature错误!"}


result.put("error", 1);


result.put("data", recv.getCode());


result.put("message", "code或signature错误!");


}


}


return result;


}


这部分我来详细解释一下。


@RequestMapping


RequestMapping是一个用来处理请求地址映射的注解,可用于类或方法上。用于类上,表示类中的所有响应请求的方法都是以该地址作为父路径。


RequestMapping注解有六个属性,下面我们把她分成三类进行说明。


1、 value, method

value:     指定请求的实际地址,指定的地址可以是URI Template 模式;


method:  指定请求的method类型, GET、POST、PUT、DELETE等;


2、 consumes,produces;

consumes: 指定处理请求的提交内容类型(Content-Type),例如application/json, text/html;


produces:    指定返回的内容类型,仅当request请求头中的(Accept)类型中包含该指定类型才返回;


3、 params,headers;

params: 指定request中必须包含某些参数值是,才让该方法处理。


headers: 指定request中必须包含某些指定的header值,才能让该方法处理请求。


Bat启动类

入口函数IotHubServer,这个启动类是由Netty框架实现的,适合有一定基础的朋友,在这里我是参考李林锋编著的《Netty权威指南》当知识积累到一定程度,要学会看书,找资料,看论文,这是培养思维方式。很多同学有一个误区,当有一个新技术出现的时候,如果脑海里第一时间想到的是有没有视频,这就完了,出视频的时候基本上是有人已经研究透这东西了,随着时间和经验的增长,要去当领跑者,而不是局限于跟随者,要提升自己的认知。当然,遇到问题最好先看源码,这样提升很快。


首先让我们看一下入口main函数:


public static void main(String[] args) {


PropertyConfigurator.configure("config/log4j.properties");


System.out.println("===============IotHubServer==》》》main============================");


new IotHubServer(Port).run();



/**


 *用于启动服务端 IotHubChannelInitializer


 */


public void run() {


try {


IotHubChannelInitializer iothub = new IotHubChannelInitializer();


iothub.run(this.port);


System.out.println("=================run============================");


} catch (Exception e) {


logger.error("服务启动失败->" + e.getMessage());


}


}


IotHubChannelInitializer

需要说明的是,Netty协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送消息给对方,通信方式可以是TWO WAY或者ONE WAY。双方之间的心跳采用的是Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到消息后发送应答消息Pong给客户端。


如果客户端连续发送N条Ping消息都没有收到服务端返回的Pong消息,说明链路已经挂死或者双方处于异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,直到重连成功。


public void run(int port) throws Exception {


//配置服务器端的NIO线程组,接受客户端的连接、TCP数据的读写


EventLoopGroup bossGroup = new NioEventLoopGroup();


EventLoopGroup workerGroup = new NioEventLoopGroup();


try {


ServerBootstrap bootstrap = new ServerBootstrap();


bootstrap.group(bossGroup, workerGroup);


bootstrap.channel(NioServerSocketChannel.class);


bootstrap.childHandler(new IotHubChannelInitializer()); 


bootstrap.option(ChannelOption.SO_BACKLOG, 1024);


bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);


bootstrap.option(ChannelOption.SO_REUSEADDR, true);


bootstrap.option(ChannelOption.SO_RCVBUF, 128);


bootstrap.option(ChannelOption.SO_SNDBUF, 128);


bootstrap.option(ChannelOption.SO_TIMEOUT, 5000);


bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);


logger.info("服务启动。");


ChannelFuture future = bootstrap.bind(port);


future.sync();


future.channel().closeFuture().sync();


} catch (InterruptedException e) {


throw e;


} finally {


workerGroup.shutdownGracefully();


bossGroup.shutdownGracefully();


}



/*


*这个函数的作用是当创建NioSocketChannel成功之后,在初始化它的时候将它的channelhandler设置到ChannelPipeline 中去,用于处理网络IO事件。


*/


@Override


protected void initChannel(SocketChannel ch) throws Exception {


ChannelPipeline pipeline = ch.pipeline();


ByteBuf cbDelimiter = ch.alloc().buffer(2);


cbDelimiter.writeBytes(Delimiter.getBytes());


pipeline.addLast(new DelimiterBasedFrameDecoder(128, true, false, cbDelimiter));


pipeline.addLast(new IotHubHandler());


}


IotHubHandler


/*


 * 当ChannelHandler被添加到一个ChannelPipeline时被调用


 */


@Override


public void handlerAdded(ChannelHandlerContext ctx) throws Exception {


SocketChannel channel = (SocketChannel)ctx.channel();


String clientIp = channel.remoteAddress().getAddress().getHostAddress();


String log = String.format("Connect, Ip:%s", clientIp);


logger.info(log);


super.handlerAdded(ctx);


}


/*


 * 当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,发送查询指令给服务器。


 */


@Override


public void channelActive(ChannelHandlerContext ctx) throws Exception {


super.channelActive(ctx);



/*


*  拦截处理器


*/


@Override


public boolean acceptInboundMessage(Object msg) throws Exception {


return super.acceptInboundMessage(msg);



/*


 * 当从Channel中读数据时被调用,channelRead0还有一个好处就是你不用关心释放资源,因为源码中已经帮你释放


 */


@Override


protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {


System.out.println("=================channelRead0============================");


recvMessage(ctx, msg);


}


/*


 * 当Channel上的某个读操作完成时被调用


 */


@Override


public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {


ctx.flush();


System.out.println("=================channelReadComplete============================");


super.channelReadComplete(ctx);



//这是用来接收数据的方法


private void recvMessage(ChannelHandlerContext ctx, Object msg) {


System.out.println("=================recvMessage============================");


SocketChannel socketchannel = (SocketChannel)ctx.channel();


String clientIp = socketchannel.remoteAddress().getAddress().getHostAddress();


logger.info("收到数据:" + clientIp);


try {


        ByteBuf byteBuf = (ByteBuf) msg;


        byteBuf.discardReadBytes();


        byte[] header = {0};


        header[0]= byteBuf.readByte();


        System.out.println("=================header[0]============================");


        switch (header[0]) {


        //硬件协议头


case com.nycent.iothub.entity.FrameType.ProtocolHead:


logger.debug("硬件连接");


mapContext.put(ctx, ClientType.ClientTypeThing); //记录硬件连接


recvFromThings(ctx, byteBuf);


break;


//默认软件控制系统


case com.nycent.iothub.entity.FrameType.ProtoApiTypeMobile:


mapContext.put(ctx, ClientType.ClientTypeMobile); //记录手机移动端


recvFromSystem(ctx, byteBuf);


break;


case com.nycent.iothub.entity.FrameType.ProtoApiTypeServer:


mapContext.put(ctx, ClientType.ClientTypeWeb); //记录web端


recvFromSystem(ctx, byteBuf);


break;


default:


ctx.close();


break;


}


} catch (Exception e) {


logger.error("->data exception:" + e.getMessage());


logger.error("->ip:" + clientIp + "->msg:" + msg);


ctx.close();


} finally {


//ReferenceCountUtil.release(msg);


}



/**


 * 收到硬件设备操作,心跳检测,数据处理


 */


private void recvFromThings(ChannelHandlerContext ctx, ByteBuf byteBuf) {


System.out.println("=================recvFromThings============================");


try {


byte cbBuf[] = new byte[byteBuf.readableBytes()];


byteBuf.readBytes(cbBuf);


//打印日志


if(logger.isDebugEnabled()){


logger.debug("接收到硬件数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(cbBuf));


}


//处理接收到的数据


int length = cbBuf.length-2;


int val_sum = 0;


for(int i=0; i<=length; i++){


val_sum += (0x0ff & cbBuf[i]);


}


if((0x0ff & cbBuf[0]) == length && val_sum ==(0x0ff & cbBuf[length+1])){


logger.debug("校验成功");


//数组拷贝


byte buf[] = new byte[length+1];


buf[0] = cbBuf[2];


System.arraycopy(cbBuf, 1, buf, 1, length);


if(logger.isDebugEnabled()){


logger.debug("发送第三方数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(buf));


}


sendToSystem(buf);


}


} catch (Exception e) {


logger.error(e.getMessage());


ctx.close();


}



/**


 * 转发数据到web服务器


 * @param buf


 */


public void sendToSystem(byte[] buf) {


System.out.println("=================sendToSystem============================");


for (Map.Entry<ChannelHandlerContext, Integer> entry : mapContext.entrySet()) {


if (entry.getValue() == ClientType.ClientTypeMobile) {


ChannelHandlerContext ctx = entry.getKey();


ByteBuf msg = ctx.alloc().buffer(buf.length + 1);


msg.writeByte(com.nycent.iothub.entity.FrameType.ProtoApiTypeMobile);


msg.writeBytes(buf);


msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());


ctx.writeAndFlush(msg);


}else if(entry.getValue() == ClientType.ClientTypeWeb){


ChannelHandlerContext ctx = entry.getKey();


ByteBuf msg = ctx.alloc().buffer(buf.length + 1);


msg.writeByte(com.nycent.iothub.entity.FrameType.ProtoApiTypeServer);


msg.writeBytes(buf);


msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());


ctx.writeAndFlush(msg);


}


}



/**


 * 转发数据到硬件


 * @param cbBuf


 */


public void sendToGateway(byte[] buf) {


System.out.println("=================sendToGateway============================");


for (Map.Entry<ChannelHandlerContext, Integer> entry : mapContext.entrySet()) {


if (entry.getValue() == ClientType.ClientTypeThing) {


ChannelHandlerContext ctx = entry.getKey();


ByteBuf msg = ctx.alloc().buffer(buf.length + 1);


msg.writeByte(com.nycent.iothub.entity.FrameType.ProtocolHead);


msg.writeBytes(buf);    


//msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());


//以下四行代码是测试msg的信息


 ByteBuf byteBuf = (ByteBuf) msg;


        byte[] req = new byte[byteBuf.readableBytes()];


        for (byte b : req) {


System.out.println("sendToGateway中msg的信息为"+b);


}        


ctx.writeAndFlush(msg);


}


}


}


/**


 * 收到硬件设备操作


 */


private void recvFromThings(ChannelHandlerContext ctx, ByteBuf byteBuf) {


System.out.println("=================recvFromThings============================");


try {


byte cbBuf[] = new byte[byteBuf.readableBytes()];


byteBuf.readBytes(cbBuf);


//打印日志


if(logger.isDebugEnabled()){


logger.debug("接收到硬件数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(cbBuf));


}


//处理接收到的数据


int length = cbBuf.length-2;


int val_sum = 0;


for(int i=0; i<=length; i++){


val_sum += (0x0ff & cbBuf[i]);



if((0x0ff & cbBuf[0]) == length && val_sum ==(0x0ff & cbBuf[length+1])){


logger.debug("校验成功");


//数组拷贝


byte buf[] = new byte[length+1];


buf[0] = cbBuf[2];


System.arraycopy(cbBuf, 1, buf, 1, length);


if(logger.isDebugEnabled()){


logger.debug("发送第三方数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(buf));


}


sendToSystem(buf);


}


} catch (Exception e) {


logger.error(e.getMessage());


ctx.close();


}


}


 


public class HexUtil {


 


public HexUtil() {


}


 


/**


 * 将指定byte数组以16进制的形式打印到控制台


 */


public static void printHexString(String hint, byte[] b) {


System.out.print("=========》》》"+hint);


System.out.print("======printHexString===》》》");


for (int i = 0; i < b.length; i++) {


String hex = Integer.toHexString(b[i] & 0xFF);


if (hex.length() == 1) {


hex = '0' + hex;


}


System.out.print(hex.toUpperCase() + " ");


}


System.out.println("");


}


/**


 *


 */


public static String Bytes2HexString(byte[] b) {


System.out.print("======Bytes2HexString===");


String ret = "";


for (int i = 0; i < b.length; i++) {


String hex = Integer.toHexString(b[i] & 0xFF);


if (hex.length() == 1) {


hex = '0' + hex;


}


ret += hex.toUpperCase() + "  ";


}


return ret;


}

/**


 * 将两个ASCII字符合成一个字节; 如:"EF"–> 0xEF


 */


public static byte uniteBytes(byte src0, byte src1) {


byte _b0 = Byte.decode("0x" + new String(new byte[] { src0 }))


.byteValue();


_b0 = (byte) (_b0 << 4);


byte _b1 = Byte.decode("0x" + new String(new byte[] { src1 }))


.byteValue();


byte ret = (byte) (_b0 ^ _b1);


return ret;


}


/**


 * 将指定字符串src,以每两个字符分割转换为16进制形式 如:"2B44EFD9" –> byte[]{0x2B, 0x44, 0xEF,


 * 0xD9}


 */


public static byte[] HexString2Bytes(String src) {


int nLen = src.length();


byte[] ret = new byte[nLen/2];


byte[] tmp = src.getBytes();


for (int i = 0; i < nLen/2; i++) {


ret[i] = uniteBytes(tmp[i * 2], tmp[i * 2 + 1]);


}


return ret;


}


}


/*


 * 计算Map对象的个数


 */


public class MapSize<K, V> {


/*


 * 计算值重复个数


 */


public int mapRepeatSize(Map<K, V> map, V value) {


if(map.size() < 1){


return 0;


}


int count = 0;


for(Map.Entry<K, V> entry : map.entrySet()){


if(entry.getValue() == value)count++;


}


return count;


}


}


立即咨询有惊喜哦 !