bizsocket 是一个异步socket 库,对一些业务场景做了支持
断线重连
一对一请求
通知、粘性通知
串行请求合并
包分片处理(AbstractFragmetRequestQueue)
缓存
拦截器
支持rxjava,提供类似于retrofit的支持
使用方式Mave
<depedecy> <groupId>com.github.typ0520</groupId> <artifactId>bizsocket-rx</artifactId> <versio>1.0.0</versio></depedecy>orGradle
buildscript {repositories { jceter()}} depedecies {compile 'com.github.typ0520:bizsocket-rx:1.0.0'}适用协议如果想使用此库,客户端和服务器的通讯协议中必须要有命令号、包序列号这两个字段
命令号代表请求类型,可以想象成http中url的作用
包序列号是数据包的唯一索引,客户端发起请求时为数据包生成一个唯一索引,服务器返回请求对应的结果时把这个包序列号带回去
协议可以类似于下面这种:
cmdpacketIdcotetLegthcotetitititbyte[]也可以类似于下面这样的每个数据包都是一段jso字符串,包与包之间用换行符分割
{"cmd": xxx , "packetId": xxx , ...... }数据包的创建是通过这两个抽象类PacketFactory、Packet,整个库的数据流转都是通过命令号、包序列号这两个字段来完成的,字段名、出现的位置以及形式不限,只要有这两个字段就适用此库
配置BizSocketsample中cliet与server之间的通讯协议是
legth(it)cmd(it)seq(it)cotet(byte[])数据包的总长度命令号数据包的唯一索引报文体,可以想象成http协议中的body下面的代码片段来自sample,建议把代码拉下来看
1、首先需要创建一个数据包类继承自Packet
public class SamplePacket exteds Packet { static volatile it curretSeq = 0; public it legth; public it cmd; public it seq; public Strig cotet; @Override public it getCommad() { //覆盖父类的抽象方法 retur cmd; } @Override public Strig getPacketID() { //覆盖父类的抽象方法 retur Strig.valueOf(seq); } //获取请求数据包byte[],写给服务器 public byte[] toBytes() { ByteArrayOutputStream bos = ew ByteArrayOutputStream(); BufferedSik bufferedSik = Okio.buffer(Okio.sik(bos)); try { //包长 = 内容长度 + 包头固定的12个字节 ByteStrig byteStrig = ByteStrig.ecodeUtf8(cotet); bufferedSik.writeIt(byteStrig.size() + 12); bufferedSik.writeIt(cmd); bufferedSik.writeIt(seq); bufferedSik.write(byteStrig); bufferedSik.flush(); } catch (IOExceptio e) { e.pritStackTrace(); } retur bos.toByteArray(); }}2、创建PacketFactory,主要用来从流中解析出server发给cliet的数据包
public class SamplePacketFactory exteds PacketFactory { @Override public Packet getRequestPacket(Packet reusable,Request request) { retur ew SamplePacket(request.commad(),request.body()); } @Override public Packet getHeartBeatPacket(Packet reusable) { retur ew SamplePacket(SampleCmd.HEARTBEAT.getValue(), ByteStrig.ecodeUtf8("{}")); } @Override public Packet getRemotePacket(Packet reusable,BufferedSource source) throws IOExceptio { SamplePacket packet = ew SamplePacket(); packet.legth = reader.readIt(); packet.cmd = reader.readIt(); packet.seq = reader.readIt(); //减去协议头的12个字节长度 packet.cotet = reader.readStrig(packet.legth - 12, Charset.forName("utf-8")); retur packet; }}3、配置cliet
public class SampleCliet exteds AbstractBizSocket { public SampleCliet(Cofiguratio cofiguratio) { super(cofiguratio); } @Override protected PacketFactory createPacketFactory() { retur ew SamplePacketFactory(); }}3、启动cliet,以j2se为例写一个mai方法
public static void mai(Strig[] args) { SampleCliet cliet = ew SampleCliet(ew Cofiguratio.Builder() .host("127.0.0.1") .port(9103) .readTimeout(TimeUit.SECONDS,30) .heartbeat(60) .build()); cliet.getIterceptorChai().addIterceptor(ew Iterceptor() { @Override public boolea postRequestHadle(RequestCotext cotext) throws Exceptio { System.out.pritl("发现一个请求postRequestHadle: " + cotext); retur false; } @Override public boolea postResposeHadle(it commad, Packet resposePacket) throws Exceptio { System.out.pritl("收到一个包postResposeHadle: " + resposePacket); retur false; } }); try { //连接 cliet.coect(); //启动断线重连 cliet.getSocketCoectio().bidRecoectioMaager(); //开启心跳 cliet.getSocketCoectio().startHeartBeat(); } catch (Exceptio e) { e.pritStackTrace(); } //注册通知,接收服务端的推送 cliet.subscribe(cliet, SampleCmd.NOTIFY_PRICE.getValue(), ew ResposeHadler() { @Override public void sedSuccessMessage(it commad, ByteStrig requestBody, Packet resposePacket) { System.out.pritl("cmd: " + commad + " ,requestBody: " + requestBody + " resposePacket: " + resposePacket); } @Override public void sedFailureMessage(it commad, Throwable error) { System.out.pritl(commad + " ,err: " + error); } });//发起一对一请求 Strig jso = "{\"productId\" : \"1\",\"isJua\" : \"0\",\"type\" : \"2\",\"sl\" : \"1\"}"; cliet.request(ew Request.Builder().commad(SampleCmd.CREATE_ORDER.getValue()).utf8body(jso).build(), ew ResposeHadler() { @Override public void sedSuccessMessage(it commad, ByteStrig requestBody, Packet resposePacket) { System.out.pritl("cmd: " + commad + " ,requestBody: " + requestBody + " attach: " + " resposePacket: " + resposePacket); } @Override public void sedFailureMessage(it commad, Throwable error) { System.out.pritl(commad + " ,err: " + error); } });//如果想用rxjava的形式调用也是支持的,提供了类似于retrofit通过动态代理创建的service类来调用 BizSocketRxSupport rxSupport = ew BizSocketRxSupport.Builder() .requestCoverter(ew JSONRequestCoverter()) .requestCoverter(ew JSONRequestCoverter()) .bizSocket(cliet) .build(); SampleService service = rxSupport.create(SampleService.class); JSONObject params = ew JSONObject(); try { params.put("pageSize","10000"); } catch (JSONExceptio e) { e.pritStackTrace(); } service.queryOrderList(params).subscribe(ew Subscriber<JSONObject>() { @Override public void oCompleted() { } @Override public void oError(Throwable e) { } @Override public void oNext(JSONObject jsoObject) { System.out.pritl("rx respose: " + jsoObject); } });//阻塞主线程,防止程序退出,可以想象成adroid中的Looper类 while (true) { try { Thread.sleep(10000); } catch (IterruptedExceptio e) { e.pritStackTrace(); } } }
评论