断 线重连一 对一请求通知 、粘性 通知 串 行 请求合 并包 分 片 处理(AbstractFragmentRequestQueue)- 缓存
- 拦截
器 支持 rxjava,提供 类似于retrofit的 支持 提供 rxjava和 rxjava2两种使用 方式
Maven
<dependency>
<groupId>com.github.typ0520</groupId>
<artifactId>bizsocket-rx2</artifactId>
<version>2.0.0</version>
</dependency>
or Gradle
buildscript {
repositories {
jcenter()
}
}
dependencies {
compile 'com.github.typ0520:bizsocket-rx2:2.0.0'
}
Maven
<dependency>
<groupId>com.github.typ0520</groupId>
<artifactId>bizsocket-rx1</artifactId>
<version>2.0.0</version>
</dependency>
or Gradle
buildscript {
repositories {
jcenter()
}
}
dependencies {
compile 'com.github.typ0520:bizsocket-rx1:2.0.0'
}
如果
命令 号 代表 请求类型,可 以想象 成 http中 url的 作用 包 序列 号 是 数 据 包 的 唯 一 索引 ,客 户端发起请求时为数 据 包 生成 一 个唯一 索引 ,服 务器返 回 请求对应的 结果时把这个包 序列 号 带回去
协议
cmd | packetId | contentLength | content |
---|---|---|---|
int | int | int | byte[] |
也可以类
{"cmd": xxx , "packetId": xxx , ...... }
sample
length(int) | cmd(int) | seq(int) | content(byte[]) |
---|---|---|---|
报文 |
- 1、
首 先 需要 创建一个数据包类继承自Packet
public class SamplePacket extends Packet {
static volatile int currentSeq = 0;
public int length;
public int cmd;
public int seq;
public String content;
@Override
public int getCommand() {
//覆 盖父类的抽象 方法
return cmd;
}
@Override
public String getPacketID() {
//覆 盖父类的抽象 方法
return String.valueOf(seq);
}
//获取请求数 据 包 byte[],写 给服务器
public byte[] toBytes() {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
BufferedSink bufferedSink = Okio.buffer(Okio.sink(bos));
try {
//包 长 = 内容 长度 + 包 头固定 的 12个字节
ByteString byteString = ByteString.encodeUtf8(content);
bufferedSink.writeInt(byteString.size() + 12);
bufferedSink.writeInt(cmd);
bufferedSink.writeInt(seq);
bufferedSink.write(byteString);
bufferedSink.flush();
} catch (IOException e) {
e.printStackTrace();
}
return bos.toByteArray();
}
}
- 2、创建PacketFactory,
主要 用 来 从流中 解析 出 server发给client的 数 据 包
public class SamplePacketFactory extends PacketFactory {
@Override
public Packet getRequestPacket(Packet reusable,Request request) {
return new SamplePacket(request.command(),request.body());
}
@Override
public Packet getHeartBeatPacket(Packet reusable) {
return new SamplePacket(SampleCmd.HEARTBEAT.getValue(), ByteString.encodeUtf8("{}"));
}
@Override
public Packet getRemotePacket(Packet reusable,BufferedSource source) throws IOException {
SamplePacket packet = new SamplePacket();
packet.length = reader.readInt();
packet.cmd = reader.readInt();
packet.seq = reader.readInt();
//减去协议头的12个字节长度
packet.content = reader.readString(packet.length - 12, Charset.forName("utf-8"));
return packet;
}
}
- 3、
配置 client
public class SampleClient extends AbstractBizSocket {
public SampleClient(Configuration configuration) {
super(configuration);
}
@Override
protected PacketFactory createPacketFactory() {
return new SamplePacketFactory();
}
}
- 3、启动client,以j2se为例
写 一 个main方法
public static void main(String[] args) {
SampleClient client = new SampleClient(new Configuration.Builder()
.host("127.0.0.1")
.port(9103)
.readTimeout(TimeUnit.SECONDS,30)
.heartbeat(60)
.build());
client.getInterceptorChain().addInterceptor(new Interceptor() {
@Override
public boolean postRequestHandle(RequestContext context) throws Exception {
System.out.println("发现一 个请求 postRequestHandle: " + context);
return false;
}
@Override
public boolean postResponseHandle(int command, Packet responsePacket) throws Exception {
System.out.println("收 到 一 个包postResponseHandle: " + responsePacket);
return false;
}
});
try {
//连接
client.connect();
//启动断 线重连
client.getSocketConnection().bindReconnectionManager();
//开启心 跳
client.getSocketConnection().startHeartBeat();
} catch (Exception e) {
e.printStackTrace();
}
//注 册 通知 ,接收 服 务端的 推送
client.subscribe(client, SampleCmd.NOTIFY_PRICE.getValue(), new ResponseHandler() {
@Override
public void sendSuccessMessage(int command, ByteString requestBody, Packet responsePacket) {
System.out.println("cmd: " + command + " ,requestBody: " + requestBody + " responsePacket: " + responsePacket);
}
@Override
public void sendFailureMessage(int command, Throwable error) {
System.out.println(command + " ,err: " + error);
}
});
//发起一 对一请求
String json = "{\"productId\" : \"1\",\"isJuan\" : \"0\",\"type\" : \"2\",\"sl\" : \"1\"}";
client.request(new Request.Builder().command(SampleCmd.CREATE_ORDER.getValue()).utf8body(json).build(), new ResponseHandler() {
@Override
public void sendSuccessMessage(int command, ByteString requestBody, Packet responsePacket) {
System.out.println("cmd: " + command + " ,requestBody: " + requestBody + " attach: " + " responsePacket: " + responsePacket);
}
@Override
public void sendFailureMessage(int command, Throwable error) {
System.out.println(command + " ,err: " + error);
}
});
//如果想 用 rxjava的 形式 调用也是支持 的 ,提供 了 类似于retrofit通 过动态代理 创建的 service类来调用
BizSocketRxSupport rxSupport = new BizSocketRxSupport.Builder()
.requestConverter(new JSONRequestConverter())
.responseConverter(new JSONResponseConverter())
.bizSocket(client)
.build();
SampleService service = rxSupport.create(SampleService.class);
JSONObject params = new JSONObject();
try {
params.put("pageSize","10000");
} catch (JSONException e) {
e.printStackTrace();
}
//rxjava范例,使用 rxjava2需修改 Subscriber
service.queryOrderList(params).subscribe(new Subscriber<JSONObject>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(JSONObject jsonObject) {
System.out.println("rx response: " + jsonObject);
}
});
//阻塞主 线程,防止 程 序 退出 ,可 以想象 成 android中 的 Looper类
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}