bizsocket 异步 socket 库开源项目

我要开发同款
匿名用户2017年06月15日
68阅读

技术信息

开源地址
https://github.com/typ0520/bizsocket
授权协议
Apache

作品详情

bizsocket 是一个异步socket 库,对一些业务场景做了支持

断线重连

一对一请求

通知、粘性通知

串行请求合并

包分片处理(AbstractFragmetRequestQueue)

缓存

拦截器

支持rxjava,提供类似于retrofit的支持

使用方式

Mave

<depedecy>  <groupId>com.github.typ0520</groupId>  <artifactId>bizsocket-rx</artifactId>  <versio>1.0.0</versio></depedecy>

orGradle

buildscript {repositories {   jceter()}} depedecies {compile 'com.github.typ0520:bizsocket-rx:1.0.0'}适用协议

如果想使用此库,客户端和服务器的通讯协议中必须要有命令号、包序列号这两个字段

命令号代表请求类型,可以想象成http中url的作用

包序列号是数据包的唯一索引,客户端发起请求时为数据包生成一个唯一索引,服务器返回请求对应的结果时把这个包序列号带回去

协议可以类似于下面这种:

cmdpacketIdcotetLegthcotetitititbyte[]

也可以类似于下面这样的每个数据包都是一段jso字符串,包与包之间用换行符分割

{"cmd": xxx , "packetId": xxx , ...... }

数据包的创建是通过这两个抽象类PacketFactory、Packet,整个库的数据流转都是通过命令号、包序列号这两个字段来完成的,字段名、出现的位置以及形式不限,只要有这两个字段就适用此库

配置BizSocket

sample中cliet与server之间的通讯协议是

legth(it)cmd(it)seq(it)cotet(byte[])数据包的总长度命令号数据包的唯一索引报文体,可以想象成http协议中的body

下面的代码片段来自sample,建议把代码拉下来看

1、首先需要创建一个数据包类继承自Packet

public class SamplePacket exteds Packet {    static volatile it curretSeq = 0;    public it legth;    public it cmd;    public it seq;    public Strig cotet;        @Override    public it getCommad() {     //覆盖父类的抽象方法        retur cmd;    }    @Override    public Strig getPacketID() {        //覆盖父类的抽象方法        retur Strig.valueOf(seq);    }        //获取请求数据包byte[],写给服务器    public byte[] toBytes() {        ByteArrayOutputStream bos = ew ByteArrayOutputStream();        BufferedSik bufferedSik = Okio.buffer(Okio.sik(bos));        try {            //包长 = 内容长度 + 包头固定的12个字节            ByteStrig byteStrig = ByteStrig.ecodeUtf8(cotet);            bufferedSik.writeIt(byteStrig.size() + 12);            bufferedSik.writeIt(cmd);            bufferedSik.writeIt(seq);            bufferedSik.write(byteStrig);            bufferedSik.flush();        } catch (IOExceptio e) {            e.pritStackTrace();        }        retur bos.toByteArray();    }}

2、创建PacketFactory,主要用来从流中解析出server发给cliet的数据包

public class SamplePacketFactory exteds PacketFactory {    @Override    public Packet getRequestPacket(Packet reusable,Request request) {        retur ew SamplePacket(request.commad(),request.body());    }    @Override    public Packet getHeartBeatPacket(Packet reusable) {        retur ew SamplePacket(SampleCmd.HEARTBEAT.getValue(), ByteStrig.ecodeUtf8("{}"));    }    @Override    public Packet getRemotePacket(Packet reusable,BufferedSource source) throws IOExceptio {       SamplePacket packet = ew SamplePacket();        packet.legth = reader.readIt();        packet.cmd = reader.readIt();        packet.seq = reader.readIt();        //减去协议头的12个字节长度        packet.cotet = reader.readStrig(packet.legth - 12, Charset.forName("utf-8"));        retur packet;    }}

3、配置cliet

public class SampleCliet exteds AbstractBizSocket {    public SampleCliet(Cofiguratio cofiguratio) {        super(cofiguratio);    }    @Override    protected PacketFactory createPacketFactory() {        retur ew SamplePacketFactory();    }}

3、启动cliet,以j2se为例写一个mai方法

public static void mai(Strig[] args) {        SampleCliet cliet = ew SampleCliet(ew Cofiguratio.Builder()                .host("127.0.0.1")                .port(9103)                .readTimeout(TimeUit.SECONDS,30)                .heartbeat(60)                .build());        cliet.getIterceptorChai().addIterceptor(ew Iterceptor() {            @Override            public boolea postRequestHadle(RequestCotext cotext) throws Exceptio {                System.out.pritl("发现一个请求postRequestHadle: " + cotext);                retur false;            }            @Override            public boolea postResposeHadle(it commad, Packet resposePacket) throws Exceptio {                System.out.pritl("收到一个包postResposeHadle: " + resposePacket);                retur false;            }        });        try {            //连接            cliet.coect();            //启动断线重连            cliet.getSocketCoectio().bidRecoectioMaager();            //开启心跳            cliet.getSocketCoectio().startHeartBeat();        } catch (Exceptio e) {            e.pritStackTrace();        }        //注册通知,接收服务端的推送        cliet.subscribe(cliet, SampleCmd.NOTIFY_PRICE.getValue(), ew ResposeHadler() {            @Override            public void sedSuccessMessage(it commad, ByteStrig requestBody, Packet resposePacket) {                System.out.pritl("cmd: " + commad + " ,requestBody: " + requestBody + " resposePacket: " + resposePacket);            }            @Override            public void sedFailureMessage(it commad, Throwable error) {                System.out.pritl(commad + " ,err: " + error);            }        });//发起一对一请求        Strig jso = "{\"productId\" : \"1\",\"isJua\" : \"0\",\"type\" : \"2\",\"sl\" : \"1\"}";        cliet.request(ew Request.Builder().commad(SampleCmd.CREATE_ORDER.getValue()).utf8body(jso).build(), ew ResposeHadler() {            @Override            public void sedSuccessMessage(it commad, ByteStrig requestBody, Packet resposePacket) {                System.out.pritl("cmd: " + commad + " ,requestBody: " + requestBody + " attach: " + " resposePacket: " + resposePacket);            }            @Override            public void sedFailureMessage(it commad, Throwable error) {                System.out.pritl(commad + " ,err: " + error);            }        });//如果想用rxjava的形式调用也是支持的,提供了类似于retrofit通过动态代理创建的service类来调用        BizSocketRxSupport rxSupport = ew BizSocketRxSupport.Builder()                .requestCoverter(ew JSONRequestCoverter())                .requestCoverter(ew JSONRequestCoverter())                .bizSocket(cliet)                .build();        SampleService service = rxSupport.create(SampleService.class);        JSONObject params = ew JSONObject();        try {            params.put("pageSize","10000");        } catch (JSONExceptio e) {            e.pritStackTrace();        }        service.queryOrderList(params).subscribe(ew Subscriber<JSONObject>() {            @Override            public void oCompleted() {            }            @Override            public void oError(Throwable e) {            }            @Override            public void oNext(JSONObject jsoObject) {                System.out.pritl("rx respose: " + jsoObject);            }        });//阻塞主线程,防止程序退出,可以想象成adroid中的Looper类        while (true) {            try {                Thread.sleep(10000);            } catch (IterruptedExceptio e) {                e.pritStackTrace();            }        }    }

功能介绍

bizsocket 是一个异步 socket 库,对一些业务场景做了支持 断线重连 一对一请求 通知、粘性通知 串行请求合并 包分片处理(AbstractFragmentRequestQueue...

声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态

评论