jetty 为什么自己实现jetty 线程池 优化

jetty 线程池 - 我的异常网当前位置:& &&&jetty 线程池jetty 线程池&&网友分享于:&&&搜索量:46次
场景:线程池-jetty中QueuedThreadPool分析(1)线程池--jetty中QueuedThreadPool分析(一)
jetty版本:jetty-6.1.26
1.由于jetty中的许多组件都实现了LifeCycle接口,先了解下该接口的定义:
import java.util.EventL
public interface LifeCycle
public void start()
public void stop()
public boolean isRunning();
public boolean isStarted();
public boolean isStarting();
public boolean isStopping();
public boolean isStopped();
public boolean isFailed();
public void addLifeCycleListener(LifeCycle.Listener listener);
public void removeLifeCycleListener(LifeCycle.Listener listener);
/* ------------------------ */
/** Listener.
* A listener for Lifecycle events.
public interface Listener extends EventListener
public void lifeCycleStarting(LifeCycle event);
public void lifeCycleStarted(LifeCycle event);
public void lifeCycleFailure(LifeCycle event,Throwable cause);
public void lifeCycleStopping(LifeCycle event);
public void lifeCycleStopped(LifeCycle event);
2.AbstractLifeCycle的抽象类,该类实现了LifeCycle接口(其中start()和stop()两个方法在类中采用模板模式实现):
//========================================================================
//$Id: AbstractLifeCycle.java,v 1.3
22:55:41 gregwilkins Exp $
//Copyright
Mort Bay Consulting Pty. Ltd.
//------------------------------------
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
//========================================================================
import org.mortbay.log.L
import org.mortbay.util.LazyL
* Basic implementation of the life cycle interface for components.
* @author gregw
public abstract class AbstractLifeCycle implements LifeCycle
private Object _lock = new Object();
private final int FAILED = -1, STOPPED = 0, STARTING = 1, STARTED = 2, STOPPING = 3;
private volatile int _state = STOPPED;
protected LifeCycle.Listener[] _
protected void doStart() throws Exception
protected void doStop() throws Exception
public final void start() throws Exception
synchronized (_lock)
if (_state == STARTED || _state == STARTING)
setStarting();
doStart();
Log.debug("started {}",this);
setStarted();
catch (Exception e)
setFailed(e);
catch (Error e)
setFailed(e);
public final void stop() throws Exception
synchronized (_lock)
if (_state == STOPPING || _state == STOPPED)
setStopping();
Log.debug("stopped {}",this);
setStopped();
catch (Exception e)
setFailed(e);
catch (Error e)
setFailed(e);
public boolean isRunning()
return _state == STARTED || _state == STARTING;
public boolean isStarted()
return _state == STARTED;
public boolean isStarting()
return _state == STARTING;
public boolean isStopping()
return _state == STOPPING;
public boolean isStopped()
return _state == STOPPED;
public boolean isFailed()
return _state == FAILED;
public void addLifeCycleListener(LifeCycle.Listener listener)
_listeners = (LifeCycle.Listener[])LazyList.addToArray(_listeners,listener,LifeCycle.Listener.class);
public void removeLifeCycleListener(LifeCycle.Listener listener)
_listeners = (LifeCycle.Listener[])LazyList.removeFromArray(_listeners,listener);
private void setStarted()
_state = STARTED;
if (_listeners != null)
for (int i = 0; i & _listeners. i++)
_listeners[i].lifeCycleStarted(this);
private void setStarting()
_state = STARTING;
if (_listeners != null)
for (int i = 0; i & _listeners. i++)
_listeners[i].lifeCycleStarting(this);
private void setStopping()
_state = STOPPING;
if (_listeners != null)
for (int i = 0; i & _listeners. i++)
_listeners[i].lifeCycleStopping(this);
private void setStopped()
_state = STOPPED;
if (_listeners != null)
for (int i = 0; i & _listeners. i++)
_listeners[i].lifeCycleStopped(this);
private void setFailed(Throwable th)
Log.warn("failed "+this+": "+th);
Log.debug(th);
_state = FAILED;
if (_listeners != null)
for (int i = 0; i & _listeners. i++)
_listeners[i].lifeCycleFailure(this,th);
3.QueuedThreadPool的实现(在jetty7中该类采用了concurrent包中的许多特性,有空可以对比分析下)。
其中主要的方法为:doStart(),doStop(),newThread(),dispatch(),以及内部类PoolThread的run()和dispatch()方法。
// ========================================================================
// Copyright
Mort Bay Consulting Pty. Ltd.
// ------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================================================================
package org.mortbay.
import java.io.S
import java.util.ArrayL
import java.util.HashS
import java.util.I
import java.util.L
import java.util.S
import ponent.AbstractLifeC
import org.mortbay.log.L
/* ------------------------ */
/** A pool of threads.
* Avoids the expense of thread creation by pooling threads after
* their run methods exit for reuse.
* If an idle thread is available a job is directly dispatched,
* otherwise the job is queued.
After queuing a job, if the total
* number of threads is less than the maximum pool size, a new thread
* is spawned.
* @author Greg Wilkins &&
public class QueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
private String _
private Set _//线程池里的所有poolThread
private List _//空闲的poolThread
private Runnable[] _//等待执行的job(即:工作队列)
private int _nextJ//工作队列中下一个出队的位置
private int _nextJobS//工作队列中下一个入队的位置
private int _//工作队列的实际长度
private int _maxQ
private boolean _
private int _
private final Object _lock = new Lock();//工作队列_jobs和空闲线程_idle队列的锁
private final Object _threadsLock = new Lock();//线程池所有线程_threads的锁
private final Object _joinLock = new Lock();//
private long _lastS
private int _maxIdleTimeMs=60000;
private int _maxThreads=250;
private int _minThreads=2;
private boolean _warned=
private int _lowThreads=0;
private int _priority= Thread.NORM_PRIORITY;
private int _spawnOrShrinkAt=0;
private int _maxStopTimeMs;
/* ------------------------------- */
/* Construct
public QueuedThreadPool()
_name="qtp-"+hashCode();
/* ------------------------------- */
/* Construct
public QueuedThreadPool(int maxThreads)
setMaxThreads(maxThreads);
/* ------------------------ */
/** Run job.
* @return true
public boolean dispatch(Runnable job)
if (!isRunning() || job==null)
PoolThread thread=
boolean spawn=
synchronized(_lock)
// Look for an idle thread
int idle=_idle.size();
if (idle&0)
thread=(PoolThread)_idle.remove(idle-1);
// queue the job
_queued++;//初始值为0
if (_queued&_maxQueued)//当入列的job数大于最大队列数时,更新最大队列数为当前入列个数.
_maxQueued=_
_jobs[_nextJobSlot++]=//_jobs[0]= _nextJobSlot = 1; _nextJobSlot表示下一个可以插入_jobs队列的位置。
if (_nextJobSlot==_jobs.length)//
_nextJobSlot=0;
if (_nextJobSlot==_nextJob)//_nextJob表示当前_jobs队列第一个可用的job的位置。_jobs队列已满时,重新扩容(倍增)。
// Grow the job queue
Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];//jobs队列倍增
int split=_jobs.length-_nextJ
if (split&0)
System.arraycopy(_jobs,_nextJob,jobs,0,split);
if (_nextJob!=0)
System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
_nextJob=0;
_nextJobSlot=_
spawn=_queued&_spawnOrShrinkAt;
if (thread!=null)
thread.dispatch(job);
else if (spawn)
newThread();
/* ------------------------ */
/** Get the number of idle threads in the pool.
* @see #getThreads
* @return Number of threads
public int getIdleThreads()
return _idle==null?0:_idle.size();
/* ------------------------ */
* @return low resource threads threshhold
public int getLowThreads()
return _lowT
/* ------------------------ */
* @return maximum queue size
public int getMaxQueued()
return _maxQ
/* ------------------------ */
/** Get the maximum thread idle time.
* Delegated to the named or anonymous Pool.
* @see #setMaxIdleTimeMs
* @return Max idle time in ms.
public int getMaxIdleTimeMs()
return _maxIdleTimeMs;
/* ------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMaxThreads
* @return maximum number of threads.
public int getMaxThreads()
return _maxT
/* ------------------------ */
/** Get the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMinThreads
* @return minimum number of threads.
public int getMinThreads()
return _minT
/* ------------------------ */
* @return The name of the BoundedThreadPool.
public String getName()
/* ------------------------ */
/** Get the number of threads in the pool.
* @see #getIdleThreads
* @return Number of threads
public int getThreads()
return _threads.size();
/* ------------------------ */
/** Get the priority of the pool threads.
@return the priority of the pool threads.
public int getThreadsPriority()
/* ------------------------ */
public int getQueueSize()
/* ------------------------ */
* @return the spawnOrShrinkAt
The number of queued jobs (or idle threads) needed
* before the thread pool is grown (or shrunk)
public int getSpawnOrShrinkAt()
return _spawnOrShrinkAt;
/* ------------------------ */
* @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed
* before the thread pool is grown (or shrunk)
public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
_spawnOrShrinkAt=spawnOrShrinkAt;
/* ------------------------ */
* @return maximum total time that stop() will wait for threads to die.
public int getMaxStopTimeMs()
return _maxStopTimeMs;
/* ------------------------ */
* @param stopTimeMs maximum total time that stop() will wait for threads to die.
public void setMaxStopTimeMs(int stopTimeMs)
_maxStopTimeMs = stopTimeMs;
/* ------------------------ */
* Delegated to the named or anonymous Pool.
public boolean isDaemon()
/* ------------------------ */
public boolean isLowOnThreads()
return _queued&_lowT
/* ------------------------ */
public void join() throws InterruptedException
synchronized (_joinLock)
while (isRunning()){
_joinLock.wait();
// TODO remove this semi busy loop!
while (isStopping()){
Thread.sleep(100);
/* ------------------------ */
* Delegated to the named or anonymous Pool.
public void setDaemon(boolean daemon)
/* ------------------------ */
* @param lowThreads low resource threads threshhold
public void setLowThreads(int lowThreads)
_lowThreads = lowT
/* ------------------------ */
/** Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* Delegated to the named or anonymous Pool.
* @see #getMaxIdleTimeMs
* @param maxIdleTimeMs Max idle time in ms.
public void setMaxIdleTimeMs(int maxIdleTimeMs)
_maxIdleTimeMs=maxIdleTimeMs;
/* ------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMaxThreads
* @param maxThreads maximum number of threads.
public void setMaxThreads(int maxThreads)
if (isStarted() && maxThreads&_minThreads)
throw new IllegalArgumentException("!minThreads&maxThreads");
_maxThreads=maxT
/* ------------------------ */
/** Set the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMinThreads
* @param minThreads minimum number of threads
public void setMinThreads(int minThreads)
if (isStarted() && (minThreads&=0 || minThreads&_maxThreads))
throw new IllegalArgumentException("!0&=minThreads&maxThreads");
_minThreads=minT
synchronized (_threadsLock)
while (isStarted() && _threads.size()&_minThreads)
newThread();
/* ------------------------ */
* @param name Name of the BoundedThreadPool to use when naming Threads.
public void setName(String name)
/* ------------------------ */
/** Set the priority of the pool threads.
@param priority the new thread priority.
public void setThreadsPriority(int priority)
_priority=
/* ------------------------ */
/* Start the BoundedThreadPool.
* Construct the minimum number of threads.
protected void doStart() throws Exception
if (_maxThreads&_minThreads || _minThreads&=0)
throw new IllegalArgumentException("!0&minThreads&maxThreads");
_threads=new HashSet();
_idle=new ArrayList();
_jobs=new Runnable[_maxThreads];//按照最大线程数创建的工作队列
for (int i=0;i&_minTi++)//按最小线程数创建的poolThread
newThread();
/* ------------------------ */
/** Stop the BoundedThreadPool.
* New jobs are no longer accepted,idle threads are interrupted
* and stopJob is called on active threads.
* The method then waits
* min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
* stop, at which time killJob is called.
protected void doStop() throws Exception
super.doStop();
long start=System.currentTimeMillis();
for (int i=0;i&100;i++)
synchronized (_threadsLock)
Iterator iter = _threads.iterator();
while (iter.hasNext())
((Thread)iter.next()).interrupt();
Thread.yield();
if (_threads.size()==0 || (_maxStopTimeMs&0 && _maxStopTimeMs & (System.currentTimeMillis()-start)))
Thread.sleep(i*100);
catch(InterruptedException e){}
// TODO perhaps force stops
if (_threads.size()&0)
Log.warn(_threads.size()+" threads could not be stopped");
synchronized (_joinLock)
_joinLock.notifyAll();
/* ------------------------ */
protected void newThread()
synchronized (_threadsLock)
if (_threads.size()&_maxThreads)
PoolThread thread =new PoolThread();
_threads.add(thread);//添加到线程池中
thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);//线程name = 线程hashCode + @ + 线程池名字
+ 线程在线程池中的序号(递增)
thread.start();
else if (!_warned)
Log.debug("Max threads for {}",this);
/* ------------------------ */
/** Stop a Job.
* This method is called by the Pool if a job needs to be stopped.
* The default implementation does nothing and should be extended by a
* derived thread pool class if special action is required.
* @param thread The thread allocated to the job, or null if no thread allocated.
* @param job The job object passed to run.
protected void stopJob(Thread thread, Object job)
thread.interrupt();
/* ------------------------ */
public String dump()
StringBuffer buf = new StringBuffer();
synchronized (_threadsLock)
for (Iterator i=_threads.iterator();i.hasNext();)
Thread thread = (Thread)i.next();
buf.append(thread.getName()).append(" ").append(thread.toString()).append('\n');
return buf.toString();
/* ------------------------ */
* @param name The thread name to stop.
* @return true if the thread was found and stopped.
* @Deprecated Use
#interruptThread(long)} in preference
public boolean stopThread(String name)
synchronized (_threadsLock)
for (Iterator i=_threads.iterator();i.hasNext();)
Thread thread = (Thread)i.next();
if (name.equals(thread.getName()))
thread.stop();
/* ------------------------ */
* @param name The thread name to interrupt.
* @return true if the thread was found and interrupted.
public boolean interruptThread(String name)
synchronized (_threadsLock)
for (Iterator i=_threads.iterator();i.hasNext();)
Thread thread = (Thread)i.next();
if (name.equals(thread.getName()))
thread.interrupt();
/* ------------------------ */
/** Pool Thread class.
* The PoolThread allows the threads job to be
* retrieved and active status to be indicated.
public class PoolThread extends Thread
Runnable _job=//线程池通过内部类的成员变量进行交互
/* ------------------------ */
PoolThread()
setDaemon(_daemon);
setPriority(_priority);
/* ------------------------ */
/** BoundedThreadPool run.
* Loop getting jobs and handling them until idle or stopped.
public void run()
boolean idle=//
Runnable job=//独立于线程
while (isRunning())
// Run any job that we have.
if (job!=null)
final Runnable todo=
todo.run();
synchronized(_lock)
// is there a queued job?
if (_queued&0)
_queued--;
job=_jobs[_nextJob];
_jobs[_nextJob++]=
if (_nextJob==_jobs.length)
_nextJob=0;
// Should we shrink?
final int threads=_threads.size();
if (threads&_minThreads &&
(threads&_maxThreads ||
_idle.size()&_spawnOrShrinkAt))
long now = System.currentTimeMillis();
if ((now-_lastShrink)&getMaxIdleTimeMs())
_lastShrink=
_idle.remove(this);
if (!idle)
// Add ourselves to the idle set.
_idle.add(this);
// We are idle
// wait for a dispatched job
synchronized (this)
if (_job==null)
this.wait(getMaxIdleTimeMs());
catch (InterruptedException e)
Log.ignore(e);
synchronized (_lock)
_idle.remove(this);
synchronized (_threadsLock)
_threads.remove(this);
synchronized (this)
// we died with a job! reschedule it
if (job!=null)
//此处是因为内部类和外部类有同名的方法,否则,直接调用即可.
//调用外部类对象(线程池)的dispatch()方法,而不是内部类对象(PoolThread线程)的dispatch()方法
QueuedThreadPool.this.dispatch(job);
/* ------------------------ */
void dispatch(Runnable job)
synchronized (this)
this.notify();
private class Lock{}
4.QueuedThreadPoolTest测试类,调用QueuedThreadPool对象的start()方法启动线程池,调用dispatch()方法分发任务。
package com.iteye.suo.jetty.
import org.mortbay.thread.QueuedThreadP
public class QueuedThreadPoolTest {
public static void main(String[] args) throws Exception {
QueuedThreadPool pool = new QueuedThreadPool();
pool.start();
for(int i=0;i&20;i++){
final int num =
pool.dispatch(new Runnable(){
public void run() {
System.out.println(Thread.currentThread().getName() + " loop of " + num);
System.out.println("done!");
//如何停止pool?这样停止的话,若线程池里有任务,将会被中断。
pool.stop();
12345678910
12345678910
12345678910 上一篇:下一篇:文章评论相关文章 12345678910 Copyright & &&版权所有之前写一个简单易用Jetty文章。Jetty对于做JAVA Web发展的方面来说并不陌生,他是一个servlet集装箱,只有相对Tomcat这是比较简单的设计,并且也相对简单,使用灵活,我是学习和使用openfire触的Jetty。openfire使用Jetty开发其强大的管理后台。
在我近期的一个项目里我也想用Jetty来开发一个后台管理程序,只是用Jetty来开发后台管理程序的缺点在于集群环境下。对于集群环境下的管理后台最好用Tomcat这种单独部署起来。方便管理。
先从代码简单地介绍下Jetty的几个核心:
核心类:org.mortbay.jetty.Server
核心接口:ponent.LifeCycle
核心线程池封装:org.mortbay.thread.QueuedThreadPool
核心IO处理类:org.mortbay.jetty.nio.SelectChannelConnector
核心Servlet处理类:org.mortbay.jetty.servlet.ServletHandler
依据对源代码的Debug跟踪,我画了一个简单的执行原理图,该图说明了Jetty怎样HTTP请求:
<img src="http://img.blog.csdn.net/01250?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhb3dlbjI1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" alt="">
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& 图1-- Jetty执行原理图
我在windows上调试的Jetty。所以这里没有epoll,仅仅有selector的IO多路复用模型。
下面是我调试时的截图:
<img src="http://img.blog.csdn.net/05140?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhb3dlbjI1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" alt="">
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& 图2-- 调试截图
图2要从下往上看线程模型。
能够看出尽管处理连接请求的线程处理详细请求数据的线程都由QueuedThreadPool管理,但这却是两个全然不同的线程,在设计模式里这事实上是观察者模式的体现。也叫公布与订阅模式。在NIO里,流程的进行基本上是事件驱动的。也就是没有事件就歇息。
Jetty的原理基于NIO。这里简单地说一下selector的IO多路复用思想:
1,监听者注冊要监听的事件类型到管理器中;
2。当有事件注冊到管理器里时,若有监听者注冊的类型的事件。管理器就将此事件通知给之前注冊的监听者。
3,监听者拿到事件后将其从管理器中删除并对其进行相关的处理。
这里建议读者去了解下观察者模式,毕竟他的思想还是应用挺广的。
Jetty里还有两个关键的工具就是将Http的请求数据解析的工具类:org.mortbay.jetty.HttpParser 和用于生成响数据的工具类:org.mortbay.jetty.HttpGenerator,读这两个类的同一时候能够了解下HTTP协议和Servlet规范。
因为HTTP的底层是TCP/IP。所以这里给出一个直观的TCP连接,当然这里用的是Socket实现的:
<img src="http://img.blog.csdn.net/24000?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhb3dlbjI1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" alt="">
这是我用本地的浏览器连接測试的。能够看出一些连接相关的參数。
当然,Jetty还支持HTTPS,例如以下:
<img src="http://img.blog.csdn.net/18515?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhb3dlbjI1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" alt="">
和Tomcat同样。须要安全证书,能够用JDK生成一个证书。然后測试一下Jetty的HTTPS连接,这个以后再续,感兴趣能够研究下TLS。近期暴SSL3.0和openssl都不安全,尽管TLS是在SSL3.0上标准化的,但TLS对SSL3.0的不合理之处进行了改动,毕竟标准化了还是可靠的。
关于TLS的题外话:苹果宣布其APNS对SSL3.0关闭了,我细致看了一下我们项目中的代码,原来我们一直用的都是TLS,可见标准化是多么重要。
版权声明:本文博主原创文章,博客,未经同意不得转载。
阅读(...) 评论()

我要回帖

更多关于 jetty非阻塞线程池 的文章

 

随机推荐