如何建立和运用socket客户端连接池链接池

后端实时通信 socket的使用以及socket池的建立 - CSDN博客
后端实时通信 socket的使用以及socket池的建立
先可以构建一个Socket池,方便调用
import java.util.Q
import java.util.concurrent.ArrayBlockingQ
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.C
import org.zeromq.ZMQ.S
public class ZMQUtils {
&& &private static final Context context = ZMQ.context(1); // 创建一个I/O线程的上下文
&& &private static final int DEFAULT_POOL_SIZE = 5;// 默认池大小
&& &private static final Queue&Socket&// 池
&& &static {
&& &&& &pool = new ArrayBlockingQueue&Socket&(DEFAULT_POOL_SIZE);
&& & * 获取zmq的socket(req)
&& & * @return
&& &public static Socket getZMQSocket() {
&& &&& &Socket socket = pool.poll();
&& &&& &if (socket == null) {
&& &&& &&& &socket = context.socket(ZMQ.REQ);// 创建一个request类型的socket,这里可以将其简单的理解为客户端,用于向response端发送数据
&& & * 将zmq的socket(req)归还池()
&& & * @param socket
&& &public static void releaseZMQSocket(Socket socket) {
&& &&& &if (socket != null) {
&& &&& &&& &if (!pool.offer(socket)) {
&& &&& &&& &&& &socket.close();
&& &&& &&& &}
通讯调用实例代码如下:
Map&String, Object& map = new HashMap&&();
&& &&& &&& &map.put(&id&, arrayList);
&& &&& &&& &JSONObject requJson = new JSONObject();
&& &&& &&& &requJson.put(&importpcap&, map);
&& &&& &&& &String request = requJson.toString();
&& &&& &&& &Socket socket = ZMQUtils.getZMQSocket();
&& &&& &&& &try {
&& &&& &&& &&& &socket.setReceiveTimeOut(5000);
&& &&& &&& &&& &socket.connect(&tcp://& + decodeIp + &:& + decodePort); // 与response端建立连接
&& &&& &&& &&& &socket.send(request.getBytes()); // 向reponse端发送数据
&& &&& &&& &&& &byte[] responseBytes = socket.recv(); // 接收response发送回来的数据
&& &&& &&& &&& &if (null == responseBytes) {
&& &&& &&& &&& &&& &throw new ApplicationException(&后台通信异常!&);
&& &&& &&& &&& &} else {
&& &&& &&& &&& &&& &ZMQUtils.releaseZMQSocket(socket);
&& &&& &&& &&& &&& &Integer result =
&& &&& &&& &&& &&& &String response = new String(responseBytes, &UTF8&);
&& &&& &&& &&& &&& &JSONObject respJson = JSONObject.fromObject(response);
&& &&& &&& &&& &&& &if (null != respJson) {
&& &&& &&& &&& &&& &&& &JSONObject conftestplan = respJson
&& &&& &&& &&& &&& &&& &&& &&& &.getJSONObject(&importpcap&);
&& &&& &&& &&& &&& &&& &if (null != conftestplan) {
&& &&& &&& &&& &&& &&& &&& &result = conftestplan.getInt(&result&);
&& &&& &&& &&& &&& &&& &&& &// System.out.println(result);
&& &&& &&& &&& &&& &&& &}
&& &&& &&& &&& &&& &}
&& &&& &&& &&& &&& &if (null == result) {
&& &&& &&& &&& &&& &&& &throw new ApplicationException(&后台通信异常!&);
&& &&& &&& &&& &&& &} else {
&& &&& &&& &&& &&& &&& &switch (result) {
&& &&& &&& &&& &&& &&& &case 0:
&& &&& &&& &&& &&& &&& &&& &// 通信成功清空缓存ID
&& &&& &&& &&& &&& &&& &&& &context.setAttribute(&importpcapIds&, null);
&& &&& &&& &&& &&& &&& &&& &System.out.println(&推送完毕!!!!&);
&& &&& &&& &&& &&& &&& &&& &
&& &&& &&& &&& &&& &&& &default:
&& &&& &&& &&& &&& &&& &&& &
&& &&& &&& &&& &&& &&& &}
&& &&& &&& &&& &&& &}
&& &&& &&& &&& &}
&& &&& &&& &} catch (Exception e) {
&& &&& &&& &&& &if (e instanceof ApplicationException) {
&& &&& &&& &&& &&& &// throw new ApplicationException(e.getMessage());
&& &&& &&& &&& &&& &context.setAttribute(&importpcapIds&, null);
&& &&& &&& &&& &&& &System.out.println(e.getMessage());
&& &&& &&& &&& &} else {
&& &&& &&& &&& &&& &socket.close();
&& &&& &&& &&& &&& &// throw new ApplicationException(&后台通信异常!&);
&& &&& &&& &&& &&& &context.setAttribute(&importpcapIds&, null);
&& &&& &&& &&& &&& &System.out.println(&后台通信异常!&);
&& &&& &&& &&& &}
&& &&& &&& &}
本文已收录于以下专栏:
相关文章推荐
socket是通信的桥梁,(=@__@=)拿手机和电脑来说,通过socket可以建立两者之间的通信,一般在电脑上是服务器端,写一段java代码如下public class MySocket {
tcp是一种通讯方式,也叫做tcp协议,是用来规范socket数据交互的。
socket是用来做通讯的。通讯需要通讯双方的ip,socket。举例说两个人打电话,电话就相当于是socket,而打电话的...
Android端通过Usb建立Socket通讯(实时发送视频数据)1.创建出 ServerSocket
serverSocket = new ServerSocket(端口10086);2....
套接口(Socket)为目前Linux上最为广泛使用的一种的进程间通信机制,与其他的Linux通信机制不同之处在于除了它可用于单机内的进程间通信以外,还可用于不同机器之间的进程间通信。但是由于Sock...
原文地址::http://blogold.chinaunix.net/u/27131/showart_343197.html
套接口(Socket)为目前 Linux 上最为...
什么是 Socket?
Socket 英文直译为“孔或插座”,也称为套接字。用于描述 IP 地址和端口号,是一种进程间的通信机制。你可以理解为
IP 地址确定了网内的唯一计算机,而端口号则指定了将...
在 Linux 上实现基于 Socket 的多进程实时通信
周欣 (jones_.cn),
软件工程师
简介: 套接口(Socket)为目前 Lin...
简介: 套接口(Socket)为目前 Linux 上最为广泛使用的一种的进程间通信机制。但是它不能直接用来多进程之间的相互实时通信。本文提出一个基于 Socket 的多进程之间通信的实现方法,并给出样...
文章转载自:/developerworks/cn/linux/l-socket-ipc/
套接口(Socket)为目前Linux上最为广泛使用的一种的进程间通...
他的最新文章
讲师:何宇健
讲师:董岩
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)本帖子已过去太久远了,不再提供回复功能。随笔 - 22&
&&&&&&&&&&&
一开始,选用Vector&E&来存放连接。由于这个容器不是并发安全的,于是,每个方法都加一个synchronized来保持并发时的同步操作,并发效率很差,果断放弃。空余时间研究了下多线程的并发知识,决定用并发安全的阻塞队列(LinkedBlockingQueue&E&),这个容器可以自动维护容量的大小,就免去了再起一个线程去维护线程池的大小。为了保证每个连接是可用的,为线程池启动了一个守护线程去定时发送心跳。
程序如下,欢迎高手们指点缺陷:
1 public class SocketConnectionPool {
private ConnectionPoolConfig poolC // 连接池配置
private LinkedBlockingQueue&SocketConnection& freeC // 空闲池连接
public ConnectionPoolConfig getPoolConfig() {
return poolC
public LinkedBlockingQueue&SocketConnection& getFreeConnections() {
return freeC
* 功能描述:连接池构造方法
* @author mihu
* @version mihu
* @param config
* @throws UnknownHostException
* @throws IOException
public SocketConnectionPool(ConnectionPoolConfig config){
if(config == null){
log.error("func[init] ConnectionPoolConfig is null ......");
this.poolConfig =
this.freeConnections = new LinkedBlockingQueue&SocketConnection&(config.getMaxConn());
//构造指定大小的阻塞队列
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (ConnectException e){
e.printStackTrace();
} catch (SocketTimeoutException e){
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally{
//心跳检测守护线程
if(config != null){
ACheckThread aCheckThread = new ACheckThread(this);
aCheckThread.setDaemon(true);
aCheckThread.start();
* 功能描述:初始化连接池
* @throws UnknownHostException
* @throws IOException
* @author mihu
* @version mihu
* @throws InterruptedException
public void init() throws UnknownHostException, IOException,SocketTimeoutException,ConnectException, InterruptedException{
for(int i=0; i&poolConfig.getMinConn(); i++){
SocketConnection client = new SocketConnection(poolConfig.getHost(), poolConfig.getPort());
freeConnections.offer(client);
* 功能描述:新建连接
* @param totalConnSize
* @throws UnknownHostException
* @throws IOException
* @author mihu
* @version mihu
public SocketConnection newConnection() throws UnknownHostException, IOException{
SocketConnection client = new SocketConnection(this.poolConfig.getHost(), this.poolConfig.getPort());
* 功能描述:获取连接
* @return SocketConnection连接
* @throws UnknownHostException
* @throws IOException
* @author mihu
* @version mihu
* @throws InterruptedException
public SocketConnection getConnection() throws UnknownHostException, IOException, InterruptedException{
if(freeConnections.size()==0){
synchronized (freeConnections) {
int freeConnCount = freeConnections.size();
if(freeConnCount == 0 && freeConnCount & this.poolConfig.getMaxConn()){
SocketConnection client = newConnection();
SocketConnection client = freeConnections.poll(2000,TimeUnit.MILLISECONDS);
* 功能描述:将连接还回连接池
* @param client
* @throws IOException
* @author mihu
* @version mihu
* @throws InterruptedException
public void freeConnection(SocketConnection client) throws IOException, InterruptedException{
if(null != client && !client.isClosed()){
freeConnections.offer(client);
&以上连接池的代码已经被废弃了,多线程下问题多多。
&某天在网上看到数据库连接池的开源代码boneCP,据说是当前最高效的数据库连接池。于是下载了它的源码学习了一番,发现它的代码简洁,可阅读性强,没有过多的使用锁机制来避免临界竞争,这一点和c3p0很不同。
&最后根据boneCP的思维重构了自己的socket连接池,完全满足了应用的需求,哈哈~~~
阅读(...) 评论()转 IOCP+WinSock2新函数打造高性能SOCKET池 - 我要穿越 - 博客园
17:47:16|  分类:
|  标签: |字号大中小 订阅
在前一篇文章《 》中,介绍了WinSock2的一些新函数,并重点详细介绍了什么是SOCKET池,有了这个概念,现在就接着展开更深入的讨论。
首先这里要重点重申一下就是,SOCKET池主要指的是使用面向连接的协议的情况下,最常用的就是需要管理大量的TCP连接的时候。常见的就是Web服务器、FTP服务器等。
下面就分步骤的详细介绍如何最终实现SOCKET池。
一、WinSock2环境的初始化:
要使用WinSock2就需要先初始化Socket2.0的环境,不废话,上代码:
WSADATA wd = {0};
int iError = WSAStartup(MAKEWORD(2,0), &wd);
if( 0 != iError )
{//出现错误,最好跟踪看下错误码是多少
       return FALSE;
if ( LOBYTE(lpwsaData-&wVersion) != 2 )
{//非2.0以上环境 退出了事 可能是可怜的WinCE系统
       WSACleanup();
       return FALSE;
最后再不使用WinSock之后都要记得调用一下WSACleanup()这个函数;
二、装载WinSock2函数:
上一篇文章中给出了一个装载WinSock2函数的类,这里分解介绍下装载的具体过程,要提醒的就是,凡是类里面演示了动态装载的函数,最好都像那样动态载入,然后再调用。以免出现上网发帖跪求高手赐教为什么AcceptEx函数无法编译通过等问题。看完这篇文章详细你不会再去发帖找答案了,呵呵呵,好了,上代码:
//定义一个好用的载入函数 摘自CGRSMsSockFun 类
BOOL LoadWSAFun(GUID&funGuid,void*& pFun)
{//本函数利用参数返回函数指针
       DWORD dwBytes = 0;
       pFun = NULL;
       //随便创建一个SOCKET供WSAIoctl使用 并不一定要像下面这样创建
       SOCKET skTemp = ::WSASocket(AF_INET,
                     SOCK_STREAM, IPPROTO_TCP, NULL,
                     0, WSA_FLAG_OVERLAPPED);
       if(INVALID_SOCKET == skTemp)
       {//通常表示没有正常的初始化WinSock环境
              return FALSE;
       }
       ::WSAIoctl(skTemp, SIO_GET_EXTENSION_FUNCTION_POINTER,
                     &funGuid,sizeof(funGuid),&pFun,
                     sizeof(pFun), &dwBytes, NULL,NULL);
       ::closesocket(skTemp);
       return NULL != pF
//演示如何动态载入AcceptEx函数
LPFN_ACCEPTEX pfnAcceptEx; //首先声明函数指针
GUID GuidAcceptEx = WSAID_ACCEPTEX;
LoadWSAFun(GuidAcceptEx,(void*&)pfnAcceptEx); //载入
//使用丰富的参数调用
pfnAcceptEx(sListenSocket,sAcceptSocket,lpOutputBuffer,
              dwReceiveDataLength,dwLocalAddressLength,dwRemoteAddressLength,
lpdwBytesReceived,lpOverlapped);
              //或者:
              SOCKET skAccept = ::WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,
                                                 NULL, 0,WSA_FLAG_OVERLAPPED);
              PVOID pBuf = new BYTE[sizeof(sockaddr_in) + 16];
              pfnAcceptEx(skServer, skAccept,pBuf,
0,//将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
                     sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, NULL,
                                                 (LPOVERLAPPED)pAcceptOL);
       以上是一个简单的演示,如何动态载入一个WinSock2扩展函数,并调用之,其它函数的详细例子可以看前一篇文章中CGRSMsSockFun类的实现部分。如果使用CGRSMsSockFun 类的话当然更简单,像下面这样调用即可:
              CGRSMsSockFun MsSockFun;
              MsSockFun.AcceptEx(skServer, skAccept,pBuf,
0,//将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
                     sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, NULL,
                     (LPOVERLAPPED)pAcceptOL);
如果要使用这个类,那么需要一些修改,主要是异常处理部分,自己注释掉,或者用其它异常代替掉即可,这个对于有基础的读者来说不是什么难事。
三、定义OVERLAPPED结构:
要想“IOCP”就要自定义OVERLAPPED,这是彻底玩转IOCP的不二法门,可以这么说:“江湖上有多少种自定义的OVERLAPPED派生结构体,就有多少种IOCP的封装!”
OVERLAPPED本身是Windows IOCP机制内部需要的一个结构体,主要用于记录每个IO操作的“完成状态”,其内容对于调用者来说是没有意义的,但是很多时候我们把它当做一个“火车头”,因为它可以方便的把每个IO操作的相关数据简单的“从调用处运输到完成回调函数中”,这是一个非常有用的特性,哪么如何让这个火车头发挥运输的作用呢?其实很简单:让它成为一个自定义的更大结构体的第一个成员。然后用强制类型转换,将自定义的结构体转换成OVERLAPPED指针即可。当然不一定非要是新结构体的第一个成员,也可以是任何第n个成员,这时使用VC头文件中预定义的一个宏CONTAINING_RECORD再反转回来即可。
说到这里一些C++基础差一点的读者估计已经很头晕了,更不知道我再说什么,那么我就将好人做到底吧,来解释下这个来龙去脉。
首先就以我们将要使用的AcceptEx函数为例子看看它的原型吧(知道孙悟空的火眼金睛用来干嘛的吗?就是用来看原型的,哈哈哈):
BOOL AcceptEx(
  __in          SOCKET sListenSocket,
  __in          SOCKET sAcceptSocket,
  __in          PVOID lpOutputBuffer,
  __in          DWORD dwReceiveDataLength,
  __in          DWORD dwLocalAddressLength,
  __in          DWORD dwRemoteAddressLength,
  __out         LPDWORD lpdwBytesReceived,
  __in          LPOVERLAPPED lpOverlapped
注意最后一个参数,是一个OVERLAPPED结构体的指针(LP的意思是Long Pointer,即指向32位地址长指针,注意不是“老婆”拼音的缩写),本身这个参数的意思就是分配一块OVERLAPPED大小的内存,在IOCP调用方式下传递给AcceptEx函数用,调用者不用去关心里面的任何内容,而在完成过程中(很多时候是另一个线程中的事情了),通常调用GetQueuedCompletionStatus函数后,会再次得到这个指针,接着让我们也看看它的原型:
BOOL WINAPI GetQueuedCompletionStatus(
  __in          HANDLE CompletionPort,
  __out         LPDWORD lpNumberOfBytes,
  __out         PULONG_PTR lpCompletionKey,
  __out         LPOVERLAPPED* lpOverlapped,
  __in          DWORD dwMilliseconds
注意这里的LPOVERLAPPED多了一个*变成了指针的指针,并且前面的说明很清楚Out!很明白了吧,不明白就真的Out了。这里就可以重新得到调用AcceptEx传入的LPOVERLAPPED指针,也就是得到了这个“火车头”,因为只是一个指针,并没有详细的限定能有多大,所以可以在火车头的后面放很多东西。
再仔细观察GetQueuedCompletionStatus函数的参数,会发现,这时只能知道一个IO操作结束了,但是究竟是哪个操作结束了,或者是哪个SOCKET句柄上的操作结束了,并没有办法知道。通常这个信息非常重要,因为只有在IO操作实际完成之后才能释放发送或接收等操作的缓冲区。
这些信息可以定义成如下的一个扩展OVERLAPPED结构:
struct MYOVERLAPPED
    OVERLAPPED m_          
    int                    m_iOpT      
//操作类型 0=AcceptEx 1=DisconnectEx       2=ConnectEx 3=WSARecv等等
    SOCKET          m_skS       //服务端SOCKET
    SOCKET         m_skC        //客户端SOCKET
    LPVOID           m_pB            //本次IO操作的缓冲指针
    ......                                           //其它需要的信息
MYOVERLAPPED* pMyOL = new MYOVERLAPPED;
ZeroMemory(pMyOL,sizeof(MYOVERLAPPED));
pMyOL-&m_iOpType = 0;        //AcceptEx操作
pMyOL-&m_skServer = skS
pMyOL-&m_skClient = skC
BYTE* pBuf = new BYTE[256];//一个缓冲
.................. //朝缓冲中写入东西
pMyOL-&m_pBuf = pB
...............//其它的代码
AcceptEx(skServer, skClient,pBuf,
0,//将接收缓冲置为0,令AcceptEx直接返回                     
256,256,NULL,(LPOVERLAPPED)pMyOL));//注意最后这个强制类型转换
       在完成过程回调线程函数中,这样使用:
UINT CALLBACK Client_IOCPThread(void* pParam)
       {//IOCP线程函数
              .....................
              DWORD dwBytesTrans = 0;
              DWORD dwPerData = 0;
              LPOVERLAPPED lpOverlapped = NULL;
              while(1)
              {//又见死循环 呵呵呵
                     BOOL bRet = GetQueuedCompletionStatus(
                            pThis-&m_IOCP,&dwBytesTrans,&dwPerData,
                            &lpOverlapped,INFINITE);
                     if( NULL == lpOverlapped )
                     {//没有真正的完成
                            SleepEx(20,TRUE);//故意置成可警告状态
                           
                     }
                     //找回“火车头”以及后面的所有东西
                     MYOVERLAPPED*  pOL = CONTAINING_RECORD(lpOverlapped
, MYOVERLAPPED, m_ol);
                     switch(pOL-&m_iOpType)
case 0: //AcceptEx结束
{//有链接进来了 SOCKET句柄就是 pMyOL-&m_skClient
............................
........................
} //end while
........................
       }//end fun
至此,关于这个“火车头”如何使用,应该是看明白了,其实就是从函数传入,又由函数返回。只不过其间可能已经转换了线程环境,是不同的线程了。
这里再补充一个AcceptEx容易被遗漏的一个细节问题,那就是在AcceptEx完成返回之后,如下在那个连入的客户端SOCKET上调用一下:
       int nRet = ::setsockopt(
              pOL-&m_skClient,SOL_SOCKET,SO_UPDATE_ACCEPT_CONTEXT,
              (char *)&pOL-&m_skServer,sizeof(SOCKET));
这样才可以继续在这个代表客户端连接的pOL-&m_skClient上继续调用WSARecv和WSASend。
另外,在AcceptEx完成之后,通常可以用:
LPSOCKADDR addrHost = NULL;      //服务端地址
LPSOCKADDR addrClient = NULL;     //客户端地址
int lenHost = 0;
int lenClient = 0;
GetAcceptExSockaddrs(
       pOL-&m_pBuf,0,sizeof(sockaddr_in) + 16,sizeof(sockaddr_in) + 16,
       (LPSOCKADDR*) &addrHost,&lenHost,(LPSOCKADDR*) &addrClient,&lenClient);
这样来得到连入的客户端地址,以及连入的服务端地址,通常这个地址可以和这个客户端的SOCKET绑定在一起用map或hash表保存,方便查询,就不用再调用那个getpeername得到客户端的地址了。要注意的是GetAcceptExSockaddrs也是一个WinSock2扩展函数,专门配合AcceptEx使用的,需要像AcceptEx那样动态载入一下,然后再调用,详情请见前一篇文章中的CGRSMsSockFun类。
至此AcceptEx算讨论完整了,OVERLAPPED的派生定义也讲完了,让我们继续下一步。
四、编写线程池回调函数:
在讨论扩展定义OVERLAPPED结构体时,给出了非线程池版的线程函数的大概框架,也就是传统IOCP使用的自建线程使用方式,这种方式要自己创建完成端口句柄,自己将SOCKET句柄绑定到完成端口,这里就不在赘述,主要介绍下调用BindIoCompletionCallback函数时,应如何编写这个线程池的回调函数,其实它与前面那个线程函数是很类似的。先来看看回调函数长个什么样子:
VOID CALLBACK FileIOCompletionRoutine(
  [in]                 DWORD dwErrorCode,
  [in]                 DWORD dwNumberOfBytesTransfered,
  [in]                 LPOVERLAPPED lpOverlapped
第一个参数就是一个错误码,如果是0恭喜你,操作一切ok,如果有错也不要慌张,前一篇文章中已经介绍了如何翻译和看懂这个错误码。照着做就是了。
第二个参数就是说这次IO操作一共完成了多少字节的数据传输任务,这个字段有个特殊含义,如果你发现一个Recv操作结束了,并且这个参数为0,那么就是说,客户端断开了连接(注意针对的是TCP方式,整个SOCKET池就是为TCP方式设计的)。如果这个情况发生了,在SOCKET池中就该回收这个SOCKET句柄。
第三个参数现在不用多说了,立刻就知道怎么用它了。跟刚才调用GetQueuedCompletionStatus函数得到的指针是一个含义。
下面就来看一个实现这个回调的例子:
VOID CALLBACK MyIOCPThread(DWORD dwErrorCode
,DWORD dwBytesTrans,LPOVERLAPPED lpOverlapped)
       {//IOCP回调函数
              .....................
              if( NULL == lpOverlapped )
              {//没有真正的完成
                     SleepEx(20,TRUE);//故意置成可警告状态
                    
              }
              //找回“火车头”以及后面的所有东西
              MYOVERLAPPED*  pOL = CONTAINING_RECORD(lpOverlapped
, MYOVERLAPPED, m_ol);
              switch(pOL-&m_iOpType)
case 0: //AcceptEx结束
{//有链接进来了 SOCKET句柄就是 pMyOL-&m_skClient
............................
........................
       }//end fun
看起来很简单吧?好像少了什么?对了那个该死的循环,这里不用了,因为这个是由线程池回调的一个函数而已,线程的活动状态完全由系统内部控制,只管认为只要有IO操作完成了,此函数就会被调用。这里关注的焦点就完全的放到了完成之后的操作上,而什么线程啊,完成端口句柄啊什么的就都不需要了(甚至可以忘记)。
这里要注意一个问题,正如在《IOCP编程之“双节棍”》中提到的,这个函数执行时间不要过长,否则会出现掉线啊,连接不进来啊等等奇怪的事情。
另一个要注意的问题就是,这个函数最好套上结构化异常处理,尽可能的多拦截和处理异常,防止系统线程池的线程因为你糟糕的回调函数而壮烈牺牲,如果加入了并发控制,还要注意防止死锁,不然你的服务器会“死”的很难看。
理论上来说,你尽可以把这个函数看做一个与线程池函数等价的函数,只是他要尽可能的“短”(指执行时间)而紧凑(结构清晰少出错)。
最后,回调函数定义好了,就可以调用BindIoCompletionCallback函数,将一个SOCKET句柄丢进完成端口的线程池了:
BindIoCompletionCallback((HANDLE)skClient,MyIOCPThread,0);
注意最后一个参数到目前为止,你就传入0吧。这个函数的神奇就是不见了CreateIoCompletionPort的调用,不见了CreateThread的调用,不见了GetQueuedCompletionStatus等等的调用,省去了n多繁琐且容易出错的步骤,一个函数就全部搞定了。
五、服务端调用:
以上的所有步骤在完全理解后,最终让我们看看SOCKET池如何实现之。
1、按照传统,要先监听到某个IP的指定端口上:
SOCKADDR_IN    saServer = {0};
//创建监听Socket
SOCKET skServer = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP
, NULL, 0, WSA_FLAG_OVERLAPPED);
//把监听SOCKET扔进线程池,这个可以省略               ::BindIoCompletionCallback((HANDLE)skServer,MyIOCPThread, 0);
//必要时打开SO_REUSEADDR属性,重新绑定到这个监听地址
BOOL   bReuse=TRUE;                  ::setsockopt(m_skServer,SOL_SOCKET,SO_REUSEADDR
,(LPCSTR)&bReuse,sizeof(BOOL));
saServer.sin_family = AF_INET;
saServer.sin_addr.s_addr = INADDR_ANY;
// INADDR_ANY这个值的魅力是监听所有本地IP的相同端口
saServer.sin_port = htons(80);      //用80得永生
::bind(skServer,(LPSOCKADDR)&saServer,sizeof(SOCKADDR_IN));
//监听,队列长为默认最大连接SOMAXCONN
listen(skServer, SOMAXCONN);
2、就是发出一大堆的AcceptEx调用:
for(UINT i = 0; i & 1000; i++)
{//调用1000次
//创建与客户端通讯的SOCKET,注意SOCKET的创建方式
skAccept = ::WSASocket(AF_INET,
                                              SOCK_STREAM,
                                              IPPROTO_TCP,
                                              NULL,
                                              0,
                                              WSA_FLAG_OVERLAPPED);
//:以上为原文,下面为改写后的代码
skClient = ::WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED );
//丢进线程池中
BindIoCompletionCallback((HANDLE)skAccept ,MyIOCPThread,0);
//:以上为原文写法,下面为修改后代码,主要为了更正变量名,方便大家理解
BindIoCompletionCallback((HANDLE)skClient ,MyIOCPThread,0);
//创建一个自定义的OVERLAPPED扩展结构,使用IOCP方式调用
pMyOL= new MYOVERLAPPED;
pMyOL-&m_iOpType = 0;        //AcceptEx操作
pMyOL-&m_skServer = skS
pMyOL-&m_skClient = skC
BYTE* pBuf = new BYTE[256];//一个缓冲
ZeroMemory(pBuf,256*sizeof(BYTE));
pMyOL-&m_pBuf = pB
//发出AcceptEx调用
//注意将AcceptEx函数接收连接数据缓冲的大小设定成了0
//这将导致此函数立即返回,虽然与不设定成0的方式而言,
//这导致了一个较低下的效率,但是这样提高了安全性
//所以这种效率牺牲是必须的
//=================================================================================
//日修改了下面的代码 把原来的第二个参数skAccept 改为 skClient 为方便大家阅读和理解
AcceptEx(skServer, skClient,pBuf,
    0,//将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
    256,256,NULL,(LPOVERLAPPED)pMyOL);
这样就有1000个AcceptEx在提前等着客户端的连接了,即使1000个并发连接也不怕了,当然如果再BT点那么就放1w个,什么你要放2w个?那就要看看你的这个IP段的端口还够不够了,还有你的系统内存够不够用。一定要注意同一个IP地址上理论上端口最大值是65535,也就是6w多个,这个要合理的分派,如果并发管理超过6w个以上的连接时,怎么办呢?那就再插块网卡租个新的IP,然后再朝那个IP端绑定并监听即可。因为使用了INADDR_ANY,所以一监听就是所有本地IP的相同端口,如果服务器的IP有内外网之分,为了安全和区别起见可以明确指定监听哪个IP,单IP时就要注意本IP空闲端口的数量问题了。
3、AcceptEx返回后,也就是线程函数中,判定是AcceptEx操作返回后,首先需要的调用就是:
GetAcceptExSockaddrs(pBuf,0,sizeof(sockaddr_in) + 16,
       sizeof(sockaddr_in) + 16,(LPSOCKADDR*) &addrHost,&lenHost,
       (LPSOCKADDR*) &addrClient,&lenClient);
int nRet = ::setsockopt(pOL-&m_skClient, SOL_SOCKET,
       SO_UPDATE_ACCEPT_CONTEXT,(char *)&m_skServer,sizeof(m_skServer));
之后就可以WSASend或者WSARecv了。
       4、这些调用完后,就可以在这个m_skClient上收发数据了,如果收发数据结束或者IO错误,那么就回收SOCKET进入SOCKET池:
       DisconnectEx(m_skClient,&pData-&m_ol, TF_REUSE_SOCKET, 0);
       5、当DisconnectEx函数完成操作之后,在回调的线程函数中,像下面这样重新让这个SOCKET进入监听状态,等待下一个用户连接进来,至此组建SOCKET池的目的就真正达到了:
//创建一个自定义的OVERLAPPED扩展结构,使用IOCP方式调用
pMyOL= new MYOVERLAPPED;
pMyOL-&m_iOpType = 0;        //AcceptEx操作
pMyOL-&m_skServer = skS
pMyOL-&m_skClient = skC
BYTE* pBuf = new BYTE[256];//一个缓冲
ZeroMemory(pBuf,256*sizeof(BYTE));
pMyOL-&m_pBuf = pB
AcceptEx(skServer, skClient,pBuf , 0,256,256,NULL,
    (LPOVERLAPPED)pMyOL);
//注意在这个SOCKET被重新利用后,后面的再次捆绑到完成端口的操作会返回一个已设置//的错误,这个错误直接被忽略即可
         ::BindIoCompletionCallback((HANDLE)skClient,Server_IOCPThread, 0);
       至此服务端的线程池就算搭建完成了,这个SOCKET池也就是围绕AcceptEx和DisconnectEx展开的,而创建操作就全部都在服务启动的瞬间完成,一次性投递一定数量的SOCKET进入SOCKET池即可,这个数量也就是通常所说的最大并发连接数,你喜欢多少就设置多少吧,如果连接多数量就大些,如果IO操作多,连接断开请求不多就少点,剩下就是调试了。
六、客户端调用:
1、  主要是围绕利用ConnectEx开始调用:
SOCKET skConnect = ::WSASocket(AF_INET,SOCK_STREAM,IPPROTO_IP,
                            NULL,0,WSA_FLAG_OVERLAPPED);
//把SOCKET扔进IOCP
BindIoCompletionCallback((HANDLE)skConnect,MyIOCPThread,0);
//本地随便绑个端口
SOCKADDR_IN LocalIP = {};
LocalIP.sin_family = AF_INET;
LocalIP.sin_addr.s_addr = INADDR_ANY;
LocalIP.sin_port = htons( (short)0 );    //使用0让系统自动分配
int result =::bind(skConnect,(LPSOCKADDR)&LocalIP,sizeof(SOCKADDR_IN));
pMyOL= new MYOVERLAPPED;
pMyOL-&m_iOpType = 2;            //ConnectEx操作
pMyOL-&m_skServer = NULL;    //没有服务端的SOCKET
pMyOL-&m_skClient = skC
ConnectEx(skConnect,(const sockaddr*)pRemoteAddr,sizeof(SOCKADDR_IN),
       NULL,0,NULL,(LPOVERLAPPED)pOL) )
如果高兴就可以把上面的过程放到循环里面去,pRemoteAddr就是远程服务器的IP和端口,你可以重复连接n多个,然后疯狂下载东西(别说我告诉你的哈,人家的服务器宕机了找你负责)。注意那个绑定一定要有,不然调用会失败的。
2、  接下来就在线程函数中判定是ConnectEx操作,通过判定m_iOpType == 2就可以知道,然后这样做:
setsockopt( pOL-&m_skClient, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
                     NULL, 0 );
然后就是自由的按照需要调用WSASend或者WSARecv。
3、  最后使用和服务端相似的逻辑调用DisconnectEx函数,收回SOCKET并直接再次调用ConnectEx连接到另一服务器或相同的同一服务器即可。
至此客户端的SOCKET池也搭建完成了,创建SOCKET的工作也是在一开始的一次性就完成了,后面都是利用ConnectEx和DisconnectEx函数不断的连接-收发数据-回收-再连接来进行的。客户端的这个SOCKET池可以用于HTTP下载文件的客户端或者FTP下载的服务端(反向服务端)或者客户端,甚至可以用作一个网游的机器人系统,也可以作为一个压力测试的客户端核心的模型。
七、总结和提高:
以上就是比较完整的如何具体实现SOCKET池的全部内容,因为篇幅的原因就不贴全部的代码了,我相信各位看客看完之后心中应该有个大概的框架,并且也可以进行实际的代码编写工作了。可以用纯c来实现也可以用C++来实现。但是这里要说明一点就是DisconnectEx函数和ConnectEx函数似乎只能在XP SP2以上和2003Server以上的平台上使用,对于服务端来说这不是什么问题,但是对于客户端来说,使用SOCKET池时还要考虑一个兼容性问题,不得已还是要放弃在客户端使用SOCKET池。
SOCKET池的全部精髓就在于提前创建一批SOCKET,然后就是不断的重复回收再利用,比起传统的非SOCKET池方式,节省了大量的不断创建和销毁SOCKET对象的内核操作,同时借用IOCP函数AcceptEx、ConnectEx和DisconnectEx等的异步IO完成特性提升了整体性能,非常适合用于一些需要大规模TCP连接管理的场景,如:HTTP Server FTP Server和游戏服务器等。
SOCKET池的本质就是充分的利用了IOCP模型的几乎所有优势,因此要用好SOCKET池就要深入的理解IOCP模型,这是前提。有问题请跟帖讨论。
(本文原创,转载请注明出处。)

我要回帖

更多关于 socket 链接池 多线程 的文章

 

随机推荐