scalable io in java io详解为什么加锁

一个高可扩展的基于非阻塞IO的服务器架构
-------------
新增文件夹...
新增文件夹
(多个标签用逗号分隔)
线程体系结构
反应堆模式
分配器级别事件处理器
应用程序级别事件处理器
如果你被要求去写一个高可扩展性的基于JAVA的服务器,你很快就会决定使用JAVA NIO包。为了让服务器跑起来,你可能会花很多时间阅读博客和教程来了解线程同步需要NIO SELECTOR类以及处理一些常见的陷阱。本文描述了一个面向连接基于NIO的服务器的基本架构。本文会先看一下一个首选的线程模型然后讨论服务器的一些基本组件。
Threading Architecture线程体系结构
第一种也是最直观的方式去实现一个多线程的服务器是每个连接一个线程的方式。这是JAVA1.4以前的解决方案,由于老版本的JAVA缺少非阻塞的I/O支持。每个连接一个线程的方法分配一个独家的工作线程给每个连接。在处理循环中,工作线程等待新进入的数据,处理这个请求,返回响应数据,然后再调用阻塞socket的read方法。
public class Server {
private ExecutorService executors = Executors.newFixedThreadPool(10);
private boolean isRunning =
public static void main(String... args) throws ... {
new Server().launch(Integer.parseInt(args[0]));
public void launch(int port) throws ... {
ServerSocket sso = new ServerSocket(port);
while (isRunning) {
Socket s = sso.accept();
executors.execute(new Worker(s));
private class Worker implements Runnable {
private LineNumberReader in =
Worker(Socket s) throws ... {
in = new LineNumberReader(new InputStreamReader(...));
public void run() {
while (isRunning) {
// blocking read of a request (line)
String request = in.readLine();
// processing the request
String response = ...
// return the response
out.write(resonse);
out.flush();
} catch (Exception e ) {
in.close();
} 在同时发生的客户端连接和多个同步工作线程之间通常有一个单对单的关系。因为每个连接都有一个相关联的服务端等待线程,因此可以有很好的响应时间。然而,高负载需要更多的同步运行的线程,这些限制了可扩展性。尤其是,长时间存活的连接像持久化的HTTP连接导致大量的同步工作线程存在,有浪费时间等待新的客户端请求的趋势。此外,成百上千的同步线程会浪费大量的栈空间。注意,举例来说,Solaris/Sparc默认的JAVA栈空间是512KB.
如果server不得不处理大量同时发生的客户端,并且能容忍慢,无反应的客户端,就需要一种供替代的线程架构。每个事件一个线程的方式通过一种非常高效地方式实现了这样的需求。工作线程和连接独立,仅被用来处理特定的事件。举例来说,如果一个数据接收事件发生了,一个工作线程将会用来处理特定于应用程序的编码和服务任务(或至少启动这些任务)。任务一结束,工作线程就会回到线程池中。这种方式需要无阻塞的处理socket的I/O。调用socket的read或write方法需要时无阻塞的。此外,一个事件系统是必须的;它会发信号表明是否有新数据,轮流发起socket的read方法。这种方式移除了等待线程和工作线程之间的一对一关系。这样一个事件驱动的I/0系统的设计将会在反应堆模式中描述。
The Reactor Pattern反应堆模式
反应堆模式,如图1所示,把事件的检测例如准备就绪读或者准备就绪接受数据和事件的处理分离。如果一个准备就绪的事件发生了,专用工作线程内的一个事件处理器就会被通知去执行适当的处理。
Figure 1. A NIO-based Reactor pattern implementation
连接通道需要先在Selector类中注册才能参与事件的架构。这可以通过调用regisster()方法来实现。虽然这个方法是SocketChannel的一部分,这个通道将会在Selector中注册,没有其它的方法。
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
// register the connection
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
... 为了检测新的事件,Selector类提供了请求已注册的通道就绪事件的能力。通过调用select方法 ,Selector收集已注册通道的就绪事件。这个方法的调用会阻塞,直到至少一个事件已经发生。在这种情况下,方法返回了自上次调用之后就绪的I/O操作的连接数。所选的连接可以通过调用Selector的selectedkey方法来检测。这个方法返回一个Selectionkey对象集合,里面存放了IO事件的状态和连接通道的引用。
一个Selector存在于Dispatcher中。这是一个单线程的活动类围饶着Selector类。Dispatcher类的职责是检测事件然后分发消费事件的处理给EventHandler类。在这个分发循环中,Dispatcher类调用Selector类的select方法等待新的事件。如果至少一个事件发生了,这个方法就返回,每个事件相关的通道可以通过调用selectedkeys方法获得。
while (isRunning) {
// blocking call, to wait for new readiness events
int eventCount = selector.select();
// get the events
Iterator&SelectionKey& it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// readable event?
if (key.isValid() && key.isReadable()) {
eventHandler.onReadableEvent(key.channel());
// writable event?
if (key.isValid() && key.isWritable()) {
key.interestOps(SelectionKey.OP_READ); // reset to read only
eventHandler.onWriteableEvent(key.channel());
} 基于一个事件,类似于就绪读或就绪写,EventHandler会被Dispatcher调用来处理这个事件。EventHandler解码请求数据,处理必须的服务活动,编码响应数据。由于工作线程没有被强制去浪费时间等待新的请求然后建立一个连接,这种方式的可扩展性和吞吐量理论上只限制于系统资源像CPU和内存。这既便是说,响应时间将没有每个连接一个线程的方式快,由于参与线程间的切换和同步。事件驱动方法的挑战因此是最少化同步和优化线程管理,以致于这些影响可以被忽略。
Component Architecture组件架构
大多数具有高可扩展性的JAVA服务器都是建立在反应堆模式上的。这样做,反应堆模式中的类将会被增强,因为需要额外的类来连接管理,缓冲区管理,以及负载均衡。这个服用器的入口类是一个Acceptor。这个安排如图2所示。
Figure 2. Major components of a connection-oriented server
Acceptor接收器
一个服务器每个新的客户端连接将会被单个Acceptor所接收,Acceptor与服务器的端口绑定。接收器是一个单线程的活动类。由于Acceptor仅负责处理历时非常短的客户端连接请求,经常只要用阻塞I/0模式实现Acceptor就足够了。Acceptor通过调用Serversocketchannel的阻塞accept方法来处理新请求。新请求将会注册到Dispatcher,这之后,请求就可以参与到事件处理中了。
由于一个Dispatcher的可扩展性非常有限,通常都会使用一个小的Dispatchers的池。这个限制当中的一个原因是特定的操作系统实现的Selector。大多数的操作系统一对一的映射SocketChannel和文件处理。取决于具体的系统,每个Selector的最大文件处理数的限制也是不同的。
class Acceptor implements Runnable {
void init() {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(true);
serverChannel.socket().bind(new InetSocketAddress(serverPort));
public void run() {
while (isRunning) {
SocketChannel channel = serverChannel.accept();
Connection con = new Connection(channel, appHandler);
dispatcherPool.nextDispatcher().register(con);
} catch (...) {
} 在示例代码中,一个连接对象持有SocketChannel和应用级别的事件处理器。我们将会在下面描述这些类。
Dispatcher分配器
通过调用Dispatcher的register方法,SocketChannel将会注册到相关的Selector上。这里就是问题的来源。Selector在内部使用key集合来管理注册的通道。这意味着每次注册一个通道,一个相关连的SelectionKey会被创建并被加入到Selector的注册key集合。同时,并发的分发线程可以调用Selector的select方法,也会访问这个key集合。由于key集合是非线程安全的,一个非同步的Acceptor上下文注册会导致死锁和竞争。这个可以通过实现selector guard object idiom来解决,它允许暂时的挂起分配线程。参考”” (PDF)来查看这个方法的解释。
class Dispatcher implements Runnable {
private Object guard = new Object();
void register(Connection con) {
// retrieve the guard lock and wake up the dispatcher thread
// to register the connection's channel
synchronized (guard) {
selector.wakeup();
con.getChannel().register(selector, SelectionKey.OP_READ, con);
// notify the application EventHandler about the new connection
void announceWriteNeed(Connection con) {
SelectionKey key = con.getChannel().keyFor(selector);
synchronized (guard) {
selector.wakeup();
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
public void run() {
while (isRunning) {
synchronized (guard) {
// suspend the dispatcher thead if guard is locked
int eventCount = selector.select();
Iterator&SelectionKey& it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// read event?
if (key.isValid() && key.isReadable()) {
Connection con = (Connection) key.attachment();
disptacherEventHandler.onReadableEvent(con);
// write event?
} 在这个连接注册之后,Selector监听这个连接的就绪事件。如果一个事件发生了,通过传递相关的连接,这个Dispatcher的事件处理类的合适的回调方法将会被调用。
分配器级别事件处理器
处理一个就绪读事件的第一个行为是调用通道的读方法。与流接口相反,通道接口需要忽略读缓冲接口。通常会使用直接分配的ByteBuffer。直接缓冲区存在于本地内存,绕过JAVA堆内存。通过使用直接缓冲,socket的IO操作不再需要创建内部中间缓冲器。
通常情况下,读请求会被非常快的执行。Socket的读操作通常只是把一份接收到的数据从内核内存空间拷贝到读缓冲区,这个数据会存在于用户控制的内存空间。这些接收的数据将会被添加到连接的线程安全的读队列作进一步的处理。基于I/O操作的结果,特定于应用程序的任务会被执行。这些任务会被分配的应用级别的事件处理器处理。这类处理器通常被称为工作线程。
class DispatcherEventHandler {
void onReadableEvent(final Connection con) {
// get the received data
ByteBuffer readBuffer = allocateMemory();
con.getChannel().read(readBuffer);
ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);
// append it to read queue
con.getReadQueue().add(data);
// perform further operations (encode, process, decode)
// by a worker thread
if (con.getReadQueue().getSize() & 0) {
workerPool.execute(new Runnable() {
public void run() {
synchronized (con) {
con.getAppHandler().onData(con);
void onWriteableEvent(Connection con) {
ByteBuffer[] data = con.getWriteQueue().drain();
con.getChannel().write(data); // write the data
if (con.getWriteQueue().isEmpty()) {
if (con.isClosed()) {
dispatcher.deregister(con);
// there is remaining data to write
dispatcher.announceWriteNeed(con);
} 在特定于应用程序的任务中,数据会被编码,服务会被执行,数据会被写入。在写数据的时候,要被发送的数据会加入到写队列,然后调用Dispatcher类的announceWriteNeed方法。这个方法让Selector开始监听就绪读事件。如果这种事件发生,分配器级别的事件处理器就会执行onWriteableEvent方法。这从通道的写队列获取数据然后执行必要的写I/O操作。试图直接写数据,通过这种方法,将会导致死锁和竞争。
应用级别事件处理器
与分配器事件处理器相比,特定于应用的事件处理器监听高级别的面向连接的事件,例如建立连接,数据接收或者是关闭连接。具体的事件处理设计是NIO服务器框架像SEDA,MINA还有emberIO之间最大的不同。这些框架通常实现了多级的架构,这样事件处理链就可以使用。它允许增加像SSLHandler或DelayerWriteHandler之类可以拦截请求/响应处理的处理器。下面的例子展示了一个基于xSocket框架的应用级别的处理器。xScoket框架支持不同的处理器接口,这些接口里面定义了需要被实现的特定于应用的回调方法代码。
class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
private static final String DELIMITER = ...
private Mailbox mailbox = ...
public static void main(String... args) throws ... {
new MultithreadedServer(110, new POP3ProtocolHandler()).run();
public boolean onConnect(INonBlockingConnection con) throws ... {
if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
con.setWriteTransferRate(5);
// reduce transfer: 5byte/sec
con.write("+OK My POP3-Server" + DELIMITER);
public boolean onData(INonBlockingConnection con) throws ... {
String request = con.readStringByDelimiter(DELIMITER);
if (request.startsWith("QUIT")) {
mailbox.close();
con.write("+OK POP3 server signing off" + DELIMITER);
con.close();
} else if (request.startsWith("USER")) {
this.user = request.substring(4, request.length());
con.write("+OK enter password" + DELIMITER);
} else if (request.startsWith("PASS")) {
String pwd = request.substring(4, request.length());
boolean isAuthenticated = authenticator.check(user, pwd);
if (isAuthenticated) {
mailbox = MailBox.openAndLock(user);
con.write("+OK mailbox locked and ready" + DELIMITER);
} else if (...) {
} 为了更简便的访问底层的读写队列,Connection对象提供了一些便利的面向流和通道的读写方法。
通过关闭连接,底层实现初始化一个可写事件往返的刷新写队列。连接会在遗留的数据被写完之后终止。除了这样一个控制终端,连接还能因为其它的原因关闭。例如,硬件故障可能导致基于TCP的连接中断。这样的情况只有在socket上执行读写操作或空闲超时的时候检测到。大多数的NIO框架提供一个内置的程序来处理这些不受控制的中断。
Conclusion总结
一个事件驱动的非阻塞架构是实现高效,高扩展性和高稳定性服务器的一个基本的层。其中的挑战就是最小化线程同步开销和优化连接和缓冲区的管理。这会是编程中最困难的部分。
但是没有必要重复发明轮子。一些框架像xSocket,emberIO,SEDA或MINA都抽象了低层次的事件处理和线程管理来简化创建高可扩展性的服务器。以上大部分的框架都支持SSL和UDP,本文中未提及这两点。
Resources参考资料
” (PDF) describes event-driven processing by using Java NIO
“” describes how a memory leak occurs by a unwary use of the
SelectionKey’s attach
“” shows the problems with
large-scale threaded programming and event-based techniques.
description by Douglas C. Schmidt (PDF)
gives a good overview about network programming in general, and gives a good impression what happens behind the Java I/O operations on the operating-system level.
is a LGPL NIO-based library to build network applications. Most example code of this article has been written based on xSocket.
works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.
相关资讯  — 
相关文档  — 
发布时间: 15:23:28
同类热门经验
27275次浏览
21129次浏览
20579次浏览
17591次浏览
15138次浏览
19889次浏览
OPEN-OPEN, all rights reserved.Scalable IO in Java
-Doug Lea - 下载频道
- CSDN.NET
&&&&Scalable IO in Java
Scalable IO in Java
Scalable IO in Java
描述java nio 和reactor 设计模式之间的关系
若举报审核通过,可奖励20下载分
被举报人:
sunning9001
举报的资源分:
请选择类型
资源无法下载
资源无法使用
标题与实际内容不符
含有危害国家安全内容
含有反动色情等内容
含广告内容
版权问题,侵犯个人或公司的版权
*详细原因:
您可能还需要
开发技术下载排行Doug Lea Scalable IO in Java - 下载频道 - CSDN.NET
&&&&Doug Lea Scalable IO in Java
&Doug Lea Scalable IO in Java
非常经典的关于Java NIO的文档,包含了Reactor模式的详细说明,另外,还对JDK的NIO的API进行了介绍,非常权威。
作者Doug Lea就是大名鼎鼎的util.concurrent包的作者,Java世界里绝对的大爷!!!
若举报审核通过,可奖励20下载分
被举报人:
liuweiqiang_neu
举报的资源分:
请选择类型
资源无法下载
资源无法使用
标题与实际内容不符
含有危害国家安全内容
含有反动色情等内容
含广告内容
版权问题,侵犯个人或公司的版权
*详细原因:
您可能还需要
Q.为什么我点的下载下不了,但积分却被扣了
A. 由于下载人数众多,下载服务器做了并发的限制。若发现下载不了,请稍后再试,多次下载是不会重复扣分的。
Q.我的积分不多了,如何获取积分?
A. 获得积分,详细见。
完成任务获取积分。
评价资源返积分。
论坛可用分兑换下载积分。
第一次绑定手机,将获得5个C币,C币可。
下载资源意味着您已经同意遵守以下协议
资源的所有权益归上传用户所有
未经权益所有人同意,不得将资源中的内容挪作商业或盈利用途
CSDN下载频道仅提供交流平台,并不能对任何下载资源负责
下载资源中如有侵权或不适当内容,
本站不保证本站提供的资源的准确性,安全性和完整性,同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
开发技术下载排行
积分不够下载该资源
如何快速获得积分?
你下载资源过于频繁,请输入验证码
如何快速获得积分?
你已经下载过该资源,再次下载不需要扣除积分
Doug Lea Scalable IO in Java
所需积分:0
剩余积分:
VIP会员,免积分下载
会员到期时间:日
剩余下载次数:1000
VIP服务公告:5671人阅读
Netty实现原理浅析(转)
其中,MyProtocolDecoder是ChannelUpstreamHandler类型,MyProtocolEncoder是 ChannelDownstreamHandler类型,MyBusinessLogicHandler既可以是 ChannelUpstreamHandler类型,也可兼ChannelDownstreamHandler类型,视其是服务端程序还是客户端程序以及 应用需要而定。
补充一点,Netty对抽象和实现做了很好的解耦。像org.jboss.netty.channel.socket包, 定义了一些和socket处理相关的接口,而org.jboss.netty.channel.socket.nio、 org.jboss.netty.channel.socket.oio等包,则是和协议相关的实现。
7、codec framework
对于请求协议的编码解码,当然是可以按照协议格式自己操作ChannelBuffer中的字节数据。另一方面,Netty也做了几个很实用的codec helper,这里给出简单的介绍。
1)FrameDecoder:FrameDecoder内部维护了一个 DynamicChannelBuffer成员来存储接收到的数据,它就像个抽象模板,把整个解码过程模板写好了,其子类只需实现decode函数即可。 FrameDecoder的直接实现类有两个:(1)DelimiterBasedFrameDecoder是基于分割符 (比如/r/n)的解码器,可在构造函数中指定分割符。(2)LengthFieldBasedFrameDecoder是基于长度字段的解码器。如果协 议 格式类似“内容长度”+内容、“固定头”+“内容长度”+动态内容这样的格式,就可以使用该解码器,其使用方法在API
DOC上详尽的解释。
2)ReplayingDecoder: 它是FrameDecoder的一个变种子类,它相对于FrameDecoder是非阻塞解码。也就是说,使用 FrameDecoder时需要考虑到读到的数据有可能是不完整的,而使用ReplayingDecoder就可以假定读到了全部的数据。
3)ObjectEncoder 和ObjectDecoder:编码解码序列化的Java对象。
4)HttpRequestEncoder和 HttpRequestDecoder:http协议处理。
下面来看使用FrameDecoder和ReplayingDecoder的两个例子:
IntegerHeaderFrameDecoder extends
FrameDecoder {
ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf)
buf.readableBytes
buf.markReaderIndex
buf.readInt
buf.readableBytes
buf.resetReaderIndex
buf.readBytes
而使用ReplayingDecoder的解码片断类似下面的,相对来说会简化很多。
IntegerHeaderFrameDecoder2 extends
ReplayingDecoder {
ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf, VoidEnum state)
buf.readBytes
buf.readInt
就实现来说,当在ReplayingDecoder子类的decode函数中调用ChannelBuffer读数据时,如果读失败,那么 ReplayingDecoder就会catch住其抛出的Error,然后ReplayingDecoder接手控制权,等待下一次读到后续的数据后继 续decode。
尽管该文行至此处将止,但该文显然没有将Netty实现原理深入浅出的说全说透。当我打算写这篇文章时,也是一边看Netty的代码,一边总结些可写的东 西,但前后断断续续,到最后都没了多少兴致。我还是爱做一些源码分析的事情,但精力终究有限,并且倘不能把源码分析的结果有条理的托出来,不能产生有意义 的心得,这分析也没什么价值和趣味。而就分析Netty代码的感受来说,Netty的代码很漂亮,结构上层次上很清晰,不过这种面向接口及抽象层次对代码 跟踪很是个问题,因为跟踪代码经常遇到接口和抽象类,只能借助于工厂类和API
DOC,反复对照接口和实现类的对应关系。就像几乎任何优秀的Java开源项目都会用上一系列优秀的设计模式,也完全可以从模式这一点单独拿出一篇分析文 章来,尽管我目前没有这样的想法。而在此文完成之后,我也没什么兴趣再看Netty的代码了。
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:168215次
积分:3358
积分:3358
排名:第4693名
原创:153篇
转载:145篇
评论:10条
(7)(5)(61)(33)(12)(6)(2)(3)(21)(4)(14)(3)(12)(3)(3)(5)(4)(6)(18)(15)(58)(3)

我要回帖

更多关于 java.io.ioexception 的文章

 

随机推荐