netty client怎么netty 客户端接收数据据

纯Java的高性能长连接RPC解决方案 - dragonsoar - ITeye技术网站
博客分类:
简介:轻量封装Ibatis3
因为本人在国内最大的电子商务公司工作期间,深感一个好的分表分库框架可以大大提高系统的承载能力及系统的灵活性,而一个不好的分表分库方案,则让系统在大数据量处理的时候非常郁闷。所以, 在根据笔者在从事电子商务开发的这几年中,对各个应用场景而开发的一个轻量封装Ibatis3的一个分表分库框架。
笔者工作的这几年之中,总结并开发了如下几个框架: summercool(Web 框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、 summercool-ddl(基于Mybaits的分表分库框架,已经应用国内某移动互联网公司);相继缓存方案、和消息系统解决方案也会慢慢开源。 Summercool框架做为笔者的第一个开源框架
summercool-hsf:http://summercool-/svn/trunk
1. 什么是HSF框架
HSF框架是一个高性能远程通信框架,底层基于Netty实现TCP通信,对上层进行封装,提供易于使用和高度可扩展能力。
名词解译:
1)Channel:可以理解为一个通道,或者连接
2)ChannelGroup:多个通道组合成为一个ChannelGroup
2.HSF工作流程
3.消息协议设计
消息协议这里是指对消息编码和解码的规范的一种定义,HSF内置的消息协议采用如下结构:
Length:以4个字节表示,是指ID + Content的长度。
ID:以1个字节表示,1表示Content部分被压缩,0表示未被压缩。
Content:真实的消息内容。
Netty框架原生提供一个处理器链对事件进行处理,每个处理器均实现ChannelHandler接口,ChannelHandler是个空接口,拥有三个子接口:ChannelDownstreamHandler, ChannelUpstreamHandler和LifeCycleAwareChannelHandler。这里我们主要关注前两个接口,因为它们被用来处理读与写的消息。
事件主要分为三种:ChannelEvent、MessageEvent和ExceptionEvent,一旦这些事件被触发,它们将从处理器链的一端到另一端,被逐个处理器处理,注意,整个过程是单线程场景。一般而言,ChannelEvent和ExceptionEvent事件都是从底层被触发,因此,它们会被ChannelUpstreamHandler处理。而MessageEvent则需要根据读与写方式的不同,分别从两个方向被ChannelUpstreamHandler和ChannelDownstreamHandler处理。
HSF内置的编(解)码处理器、压缩(解压)处理器及序列化(反序列化)处理器等都是直接或间接实现ChannelHandler。
? ChannelDownstreamHandler
public interface ChannelDownstreamHandler extends ChannelHandler {
* Handles the specified downstream event.
* @param ctx
the context object for this handler
* @param e
the downstream event to process or intercept
void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws E
? ChannelUpstreamHandler
public interface ChannelUpstreamHandler extends ChannelHandler {
* Handles the specified upstream event.
* @param ctx
the context object for this handler
* @param e
the upstream event to process or intercept
void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws E
4.1.Encoding流程
HSF内置的encoding过程由三个Handler组合完成,流程如下:
1) SerializeDownstreamHandler
* @Title: SerializeDownstreamHandler.java
* @Package com.gexin.hsf.netty.channelhandler.downstream
* @Description: 序列化
下午4:45:59
* @version V1.0
public class SerializeDownstreamHandler implements ChannelDownstreamHandler {
Logger logger = LoggerFactory.getLogger(getClass());
private Serializer serializer = new KryoSerializer();
public SerializeDownstreamHandler() {
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (!(e instanceof MessageEvent)) {
ctx.sendDownstream(e);
MessageEvent event = (MessageEvent)
Object originalMessage = event.getMessage();
Object encodedMessage = originalM
if (!(originalMessage instanceof Heartbeat)) {
encodedMessage = serializer.serialize(originalMessage);
encodedMessage = Heartbeat.BYTES;
if (originalMessage == encodedMessage) {
ctx.sendDownstream(e);
} else if (encodedMessage != null) {
write(ctx, e.getFuture(), encodedMessage, event.getRemoteAddress());
public void setSerializer(Serializer serializer) {
this.serializer =
2)CompressionDownstreamHandler
* @Title: CompressionDownstreamHandler.java
* @Package com.gexin.hsf.netty.channelhandler.downstream
* @Description: 压缩处理器
下午4:45:59
* @version V1.0
public class CompressionDownstreamHandler implements ChannelDownstreamHandler {
private CompressionStrategy compressionStrategy = new ThresholdCompressionStrategy();
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (!(e instanceof MessageEvent)) {
ctx.sendDownstream(e);
MessageEvent event = (MessageEvent)
Object originalMessage = event.getMessage();
if (originalMessage instanceof byte[]) {
CompressionResult compressionResult = press((byte[]) originalMessage);
byte[] resBuffer = compressionResult.getBuffer();
int length = resBuffer.
byte[] bytes = new byte[length + 1];
bytes[0] = compressionResult.isCompressed() ? (byte) 1 : (byte) 0;
for (int i = 0; i & i++) {
bytes[i + 1] = resBuffer[i];
DownstreamMessageEvent evt = new DownstreamMessageEvent(event.getChannel(), event.getFuture(), bytes,
event.getRemoteAddress());
ctx.sendDownstream(evt);
ctx.sendDownstream(e);
public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
pressionStrategy = compressionS
3)LengthBasedEncoder
* @ClassName: LengthBasedEncoder
* @Description: 基于长度的编码器
下午1:43:41
public class LengthBasedEncoder extends ObjectEncoder {
Logger logger = LoggerFactory.getLogger(getClass());
private final int estimatedL
public LengthBasedEncoder() {
this(512);
public LengthBasedEncoder(int estimatedLength) {
if (estimatedLength & 0) {
throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
this.estimatedLength = estimatedL
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (msg instanceof byte[]) {
byte[] bytes = (byte[])
ChannelBuffer ob = ChannelBuffers.dynamicBuffer(estimatedLength, channel.getConfig().getBufferFactory());
ob.writeInt(bytes.length);
ob.writeBytes(bytes);
throw new IllegalArgumentException("msg must be a byte[], but " + msg);
4.2.Decoding流程
decoding流程与encoding正好相反,流程如下:
1)LengthBasedDecoder
对于TCP通信而言,粘包是很正常的现象,因此decoder必须处理粘包问题。HsfFrameDecoder是一个支持粘包处理的decoder类抽象。
* @ClassName: LengthBasedDecoder
* @Description: 基于长度的解码器
下午1:42:59
public class LengthBasedDecoder extends HsfFrameDecoder {
private Logger logger = LoggerFactory.getLogger(getClass());
private int headerFieldLength = 4;
public LengthBasedDecoder() {
public LengthBasedDecoder(int headerFieldLength) {
this.headerFieldLength = headerFieldL
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() &= headerFieldLength) {
buffer.markReaderIndex();
int length = buffer.readInt();
if (length & 0) {
logger.error("msg length must &= 0. but length={}", length);
} else if (length == 0) {
return Heartbeat.BYTES;
} else if (buffer.readableBytes() &= length) {
byte[] bytes = new byte[length];
buffer.readBytes(bytes);
buffer.resetReaderIndex();
2)DecompressionUpstreamHandler
* @Title: DecompressionUpstreamHandler.java
* @Package com.gexin.hsf.netty.channelhandler.downstream
* @Description: 解压缩处理器
下午4:45:59
* @version V1.0
public class DecompressionUpstreamHandler extends SimpleChannelUpstreamHandler {
private CompressionStrategy compressionStrategy = new ThresholdCompressionStrategy();
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof byte[]) {
byte[] bytes = (byte[]) e.getMessage();
int length = bytes.
if (length & 0) {
byte[] buffer = new byte[length - 1];
for (int i = 1; i & i++) {
buffer[i - 1] = bytes[i];
if (bytes[0] == 1) {
buffer = compressionStrategy.decompress(buffer);
UpstreamMessageEvent event = new UpstreamMessageEvent(e.getChannel(), buffer, e.getRemoteAddress());
super.messageReceived(ctx, event);
super.messageReceived(ctx, e);
public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
pressionStrategy = compressionS
3)DeserializeUpstreamHandler
* @Title: DeserializeUpstreamHandler.java
* @Package com.gexin.hsf.netty.channelhandler.downstream
* @Description: 反序列化
下午4:45:59
* @version V1.0
public class DeserializeUpstreamHandler extends SimpleChannelUpstreamHandler {
private Logger logger = LoggerFactory.getLogger(getClass());
private Serializer serializer = new KryoSerializer();
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() == null) {
} else if (e.getMessage() instanceof byte[]) {
byte[] bytes = (byte[]) e.getMessage();
if (bytes.length == 0) {
msg = Heartbeat.getSingleton();
msg = serializer.deserialize(bytes);
} catch (Exception ex) {
UpstreamMessageEvent event = new UpstreamMessageEvent(e.getChannel(), msg, e.getRemoteAddress());
super.messageReceived(ctx, event);
super.messageReceived(ctx, e);
public void setSerializer(Serializer serializer) {
this.serializer =
4.3.处理器链的建立
HSF使用如下的方式构建处理器链:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 注册各种自定义Handler
for (String key : handlers.keySet()) {
pipeline.addLast(key, handlers.get(key));
// 注册链路空闲检测Handler
Integer writeIdleTime = LangUtil.parseInt(options.get(HsfOptions.WRITE_IDLE_TIME));
Integer readIdleTime = LangUtil.parseInt(options.get(HsfOptions.READ_IDLE_TIME));
if (writeIdleTime == null) {
writeIdleTime = 10;
if (readIdleTime == null) {
// 默认为写空闲的3倍
readIdleTime = writeIdleTime * 3;
pipeline.addLast("timeout", new IdleStateHandler(idleTimer, readIdleTime, writeIdleTime, 0));
pipeline.addLast("idleHandler", new StateCheckChannelHandler(HsfAcceptorImpl.this));
// 注册事件分发Handler
pipeline.addLast("dispatchHandler", new DispatchUpStreamHandler(eventDispatcher));
5.Dispatcher
消息经过Handler链处理后,将被Dispatcher转发,并进入EventListener链处理。
Dispatcher内置两个线程池:channelExecutor和msgExecutor。
channelExecutor用于处理通道事件和异常事件,考虑到在通道事件可能需要同步调用远程服务,因此此线程池不设上线(因为同步调用将会阻塞当前线程)。
msgExecutor用于处理消息事件,根据经验值,缺省最大线程数为150,该值可以通过Option参数修改。
6.EventListener
EventListener有以下三种:
1)ChannelEventListener
* @Title: ChannelEventListener.java
* @Package com.gexin.hsf.netty.listener
* @Description: 通道事件监听类
上午11:45:50
* @version V1.0
public interface ChannelEventListener extends EventListener {
* Invoked when a {@link Channel} was closed and all its related resources were released.
* @param ctx
* @param channel
* @param e
* @return EventBehavior Whether to continue the events deliver
public EventBehavior channelClosed(ChannelHandlerContext ctx, HsfChannel channel, ChannelStateEvent e);
* Invoked when a {@link Channel} is open, bound to a local address, and connected to a remote address.
* @param ctx
* @param channel
* @param e
* @return EventBehavior Whether to continue the events deliver
public EventBehavior channelConnected(ChannelHandlerContext ctx, HsfChannel channel, ChannelStateEvent e);
* Invoked when a group is created.
* @param ctx
* @param channel
* @param groupName
* @return EventBehavior Whether to continue the events deliver
public EventBehavior groupCreated(ChannelHandlerContext ctx, HsfChannel channel, String groupName);
* Invoked when a group is removed.
* @param ctx
* @param channel
* @param groupName
* @return EventBehavior Whether to continue the events deliver
public EventBehavior groupRemoved(ChannelHandlerContext ctx, HsfChannel channel, String groupName);
2)MessageEventListener
* @Title: MessageListener.java
* @Package com.gexin.hsf.netty.listener
* @Description: 消息监听接口
上午11:36:22
* @version V1.0
public interface MessageEventListener extends EventListener {
* Invoked when a message object (e.g: {@link ChannelBuffer}) was received
* from a remote peer.
public EventBehavior messageReceived(ChannelHandlerContext ctx, HsfChannel channel, MessageEvent e);
3)ExceptionEventListener
* @Title: ExceptionEventListener.java
* @Package com.gexin.hsf.netty.listener
* @Description: 异常监听接口
上午11:48:09
* @version V1.0
public interface ExceptionEventListener extends EventListener {
* Invoked when an exception was raised by an I/O thread or a {@link ChannelHandler}.
public EventBehavior exceptionCaught(ChannelHandlerContext ctx, Channel channel, ExceptionEvent e);
Hsf框架会预先在EventListener链末端注册ServiceMessageEventListener,该Listener负责调用被注册的Service,并将返回值或异常回传。
1)RemoteServiceContract注解
所有实现了拥有RemoteServiceContract注解的Java类都可以直接注册到HsfService,示例如下:
@RemoteServiceContract
public interface TestService {
String test(String ctx);
public class TestServiceImpl implements TestService {
public String test(String ctx) {
return String.valueOf("hello " + ctx);
2)ServiceEntry
对于未添加RemoteServiceContract注解的接口,Hsf框架使用org.summercool.hsf.pojo.ServiceEntry类实现注册。
3)注册Service
服务提供方需要向Hsf注册Service方可被远程调用,示例如下:
? 注册Service
HsfAcceptor acceptor = new HsfAcceptorImpl();
// 注册Service
acceptor.registerService(new TestServiceImpl());
// 监听端口
acceptor.bind(8082);
? 远程调用Service
HsfConnector connector = new HsfConnectorImpl();
connector.connect(new InetSocketAddress("127.0.0.1",8082));
// 同步方式
TestService testService = ServiceProxyFactory.getRoundFactoryInstance(connector).wrapSyncProxy(TestService.class);
System.out.println(testService.test("HSF"));
3)同步与异步
7.Handshake
当通道建立后,Client和Server会进行三次握手,以完成初始化
初次握手步骤
1)Client与Server建立连接成功
2)Client向Server发送握手请求包(handshake request)
3)Server接收到握手请求包后,生成group信息,然后触发groupCreated事件,接着向client发送握手反馈包(handshake ack)
4)Client接收到握手反馈包后,生成group信息,然后触发groupCreated事件,接着向server发送握手完成包(handshake finish)
非初次握手步骤
1)Client与Server建立连接成功
2)Client向Server发送握手请求包(handshake request)
3)Server接收到握手请求包后,添加该连接到Group,接着向client发送握手反馈包(handshake ack)
4)Client接收到握手反馈包后,添加该连接到Group,接着向server发送握手完成包(handshake finish)
以上三次握手所发送的包都只包含本身的group信息,但Hsf对外提供了握手的扩展接口,应用可以使用该接口结合自身的业务,以完成连接建立后的初始化工作。
Client握手扩展接口
8.Heartbeat、超时及重连机制
Heartbeat和超时机制依赖于Netty的读空闲和写空闲回调。
当发生写空闲时,会向对方发送Heartbeat消息,写空闲时间可以通过参数HsfOptions.WRITE_IDLE_TIME设定,缺省为10秒。
当发生读空闲时,即判定为超时,主动关闭连接,读空闲时间可以通过参数HsfOptions.READ_IDLE_TIME设定,缺省为60秒。
对于断开的连接,Hsf会为其重连,重连频率通过HsfOptions.RECONNECT_INTERVAL参数设定,缺省为10000毫秒。
9.Option参数
Hsf支持以参数配置:
HsfOptions.TCP_NO_DELAY
TCP参数,是否关闭延迟发送消息包
HsfOptions.KEEP_ALIVE
TCP参数,是否保持连接
HsfOptions.REUSE_ADDRESS
TCP参数,是否重用端口
HsfOptions.WRITE_IDLE_TIME
写空闲时间(秒)
HsfOptions.READ_IDLE_TIME
读空闲时间(秒)
HsfOptions.SYNC_INVOKE_TIMEOUT
同步调用超时时间(毫秒)
HsfOptions.HANDSHAKE_TIMEOUT
握手超时时间(毫秒)
HsfOptions.FLOW_LIMIT
HsfOptions.TIMEOUT_WHEN_FLOW_EXCEEDED
申请流量超时时间(毫秒)
HsfOptions.MAX_THREAD_NUM_OF_DISPATCHER
分发器的最大线程数
HsfOptions.CHANNEL_NUM_PER_GROUP
每个Group建立的通道数
Runtime.getRuntime().availableProcessors()
HsfOptions.RECONNECT_INTERVAL
重连频率(毫秒)
HsfOptions.CONNECT_TIMEOUT
建立连接超时时间(毫秒)
HsfOptions.HOLD_CALLBACK_MESSAGE
是否缓存Callback方式发送的消息,缓存后将会在发送失败时回调doException方法参数传入
这些参数可以通过如下方式调整:
HsfConnector connector = new HsfConnectorImpl();
connector.setOption(HsfOptions.CHANNEL_NUM_PER_GROUP, 1);
10.Hsf的使用
待续(大家可以先看代码中的test包)
浏览 42768
论坛回复 /
(61 / 39235)
你好,我现在也在研究这块,但是现在& http://summercool-/svn/trunk/ 下载不了代码了。所以请分享一份代码 ,谢谢。我的邮箱是 暂时在迁到gitcafe上面,等迁号了会通知你的~
thank you& 我还没看代码,我想问下你这个可以加SSL吗,可以用来传文件吗。加不了SSL
能不能多点文档啊?比如概要设计和详细设计文档。正在弄中~
看您的代码就是一种享受!呵呵。还有您的缓存方案和消息系统框架什么时候开源?andye 写道您这个框架是基于点对点调用模式实现的,如果实现多对多的调用,如Dubbo框架,改动会不会比较大?暂时不会开源,因为堆外大缓存在国内还没有好的解决方案,这个解决方案用处比较广,所以可能会做为我们这样创业公司一个非常好的卖点。先谢谢各位的支持,呵~
您这个框架是基于点对点调用模式实现的,如果实现多对多的调用,如Dubbo框架,改动会不会比较大?怎么是点对点呢? 楼主下面已经说了,configServer包就是处理多点调用的;如果你认为configServer包太简陋,可以改用dubbo的zokeeper方式来管理多个服务提供者,以提供工业强度。不过原理都是一样的。
您这个框架是基于点对点调用模式实现的,如果实现多对多的调用,如Dubbo框架,改动会不会比较大?
请问一下,phone的测试包的client和server如何进行交互,client向server发送消息,server接受client的消息,并向client发送消息?是的,这个只是写的一个思路测试包,不严格,只是能跑通而已
稳定性和性能怎么样??假如客户端是其他语言调用也可以?肯定可以了; 稳定性和性能还是不错的,某些大公司内部一直在用的~
楼主,谢谢分享。提点意见。你这里提到的HSF似乎与网上其他介绍的tb-hsf方案不一样,这里提到的似乎没有负载和集群方面的考虑吧?也就是server无法简单的通过增加机器来扩展性能。对否?是可以通过加机器来实现扩展性能的,里面有一个configserver包,里面的解决方案和tb的一样的~
& 上一页 1
dragonsoar
浏览: 140282 次
来自: 杭州
归纳的很好。
服务器端如果要向某个客户端发送消息是否是需要服务端通过一个Ma ...
请问分布式事务是如何处理的
前来膜拜。。
背景是透明的gif 处理之后背景就变黑色了是什么情况?一个简单 netty client pojo通讯实现 - guoc - ITeye技术网站
------------------------------------
------------------------------------
package admin.
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelF
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.channel.ExceptionE
import org.jboss.netty.channel.MessageE
import org.jboss.netty.channel.SimpleChannelH
public class TimeClientHandler extends SimpleChannelHandler {
private String returnFlag =
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
CtrlProtocol m = (CtrlProtocol) e.getMessage();
this.returnFlag = m.getFlag();
//System.out.println(returnFlag);
e.getChannel().close();
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
e.getChannel().close();
public ChannelFuture process(Channel channel,CtrlProtocol requestParameter) {
return channel.write(requestParameter);
public String getReturnFlag(){
return this.returnF
-------------------------------
-------------------------------
package admin.
import org.jboss.netty.channel.ChannelP
import org.jboss.netty.channel.ChannelPipelineF
import org.jboss.netty.channel.C
public class TimeClientPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new CtrlProtocolDecoder());
pipeline.addLast("encoder", new CtrlProtocolEncoder());
pipeline.addLast("handler", new TimeClientHandler());
-------------------------------
-------------------------------
package admin.
import org.jboss.netty.buffer.ChannelB
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.handler.codec.frame.FrameD
import mons.logging.L
import mons.logging.LogF
public class CtrlProtocolDecoder extends FrameDecoder {
private static final Log
= LogFactory.getLog(CtrlProtocolDecoder.class);
* (non-Javadoc)
* @see org.jboss.netty.handler.codec.frame.FrameDecoder#decode(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.Channel, org.jboss.netty.buffer.ChannelBuffer)
protected CtrlProtocol decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer arg2) throws Exception {
ChannelBuffer buf = arg2;
if (buf.readableBytes() & 8) {
ilog.error("收到的协议数据不完整,无法解析协议头.");
short head_flag = buf.readShort();
short head_code = buf.readShort();
int head_length = buf.readInt();
CtrlProtocol protocol = new CtrlProtocol(head_flag, head_code, head_length);
("收到的协议:" + head_flag + "
" + head_code + "
" + head_length + "\n");
-------------------------------
-------------------------------
package admin.
import org.jboss.netty.buffer.ChannelB
import org.jboss.netty.buffer.ChannelB
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.handler.codec.oneone.OneToOneE
public class CtrlProtocolEncoder extends OneToOneEncoder {
* (non-Javadoc)
* @see org.jboss.netty.handler.codec.oneone.OneToOneEncoder#encode(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.Channel, java.lang.Object)
protected Object encode(ChannelHandlerContext arg0, Channel arg1, Object arg2) throws Exception {
CtrlProtocol protocol = (CtrlProtocol) arg2;
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeShort((short) protocol.getFlag());
buf.writeShort((short) protocol.getCode());
buf.writeInt(protocol.getLength());
-------------------------------
-------------------------------
package admin.
public class CtrlProtocol {
private short
private short
private int
public CtrlProtocol(short f, short c, int l) {
public static CtrlProtocol getCtrlInstance() {
return new CtrlProtocol((short) 1, (short) 1, (short) 1);
public short getFlag() {
public void setFlag(short flag) {
this.flag =
public short getCode() {
public void setCode(short code) {
this.code =
public int getLength() {
public void setLength(int length) {
this.length =
-------------------------------
6 客户端执行异步发送接收(服务端返回数据与客户端发送数据结构一致)
-------------------------------
import org.jboss.netty.bootstrap.ClientB
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelF
import org.jboss.netty.channel.ChannelF
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelF
CtrlProtocol cp = CtrlProtocol.getCtrlInstance();
String flag = this.toDo
///////////////////////////////////////////////
//Socket异步发送接收通讯协议
//参数:CtrlProtocol 协议
private String toDo
(CtrlProtocol cp) throws Exception{
String flag = "0";//结果
String host ="127.0.0.1";
int port = 7788;
ChannelFactory factory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new TimeClientPipelineFactory());
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(
host, port));
Channel channel = future.awaitUninterruptibly().getChannel();
TimeClientHandler handler = channel.getPipeline().get(TimeClientHandler.class);
handler.process(channel, cp);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
future.getChannel().getCloseFuture().awaitUninterruptibly();
factory.releaseExternalResources();
flag = handler.getReturnFlag();
浏览: 19656 次
来自: 北京

我要回帖

更多关于 netty 接收不到数据 的文章

 

随机推荐