zmq push/pullgit pull和push是单线程吗

消息队列库——ZeroMQ
时间: 15:37:29
&&&& 阅读:669
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&消息队列库——ZeroMQ
(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。
ZMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。
ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。
主线程与I/O线程:
I/O线程,ZMQ根据用户调用zmq_init函数时传入的参数,创建对应数量的I/O线程。每个I/O线程都有与之绑定的Poller,Poller采用经典的Reactor模式实现。
Poller根据不同操作系统平台使用不同的网络I/O模型(select、poll、epoll、devpoll、kequeue等),所有的I/O操作都是异步的,线程不会被阻塞。。
主线程与I/O线程通过Mail Box传递消息来进行通信。
Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。
Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。
Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。
每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。
ZMQ将消息通信分成4种模型:
一对一结对模型(Exclusive-Pair),可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型。
请求回应模型(Request-Reply),由Client发起请求,并由Server响应,跟一对一结对模型的区别在于可以有多个Client。
发布订阅模型(Publish-Subscribe),Publish端单向分发数据,且不关心是否把全部信息发送给Subscribe端。如果Publish端开始发布信息时,Subscribe端尚未连接进来,则这些信息会被直接丢弃。Subscribe端只能接收,不能反馈,且在Subscribe端消费速度慢于Publish端的情况下,会在Subscribe端堆积数据。
管道模型(Push-Pull),从 PUSH 端单向的向 PULL 端单向的推送数据流。如果有多个PULL端同时连接到PUSH端,则PUSH端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到PULL端上。与发布订阅模型相比,管道模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。
ZMQ提供进程内(inproc://)、进程间(ipc://)、机器间(tcp://)、广播(pgm://)等四种通信协议。
ZMQ提供的所有API均以zmq_开头,
#include &zmq.h&
gcc [flags] files -lzmq [libraries]
例如,返回当前ZMQ库的版本信息
void zmq_version (int *major, int *minor, int *patch);
在使用任何ZQM库函数之前,必须首先创建ZMQ context(上下文),程序终止时,也需要销毁context。
创建context
void *zmq_ctx_new ();
ZMQ context是线程安全的,可以在多线程环境使用,而不需要程序员对其加/解锁。
在一个进程中,可以有多个ZMQ context并存。
设置context选项
int zmq_ctx_set (void *context, int option_name, int option_value);
int zmq_ctx_get (void *context, int option_name);
销毁context
int zmq_ctx_term (void *context);
ZMQ Sockets 是代表异步消息队列的一个抽象,注意,这里的ZMQ socket和POSIX套接字的socket不是一回事,ZMQ封装了物理连接的底层细节,对用户不透明。
传统的POSIX套接字只能支持1对1的连接,而ZMQ socket支持多个Client的并发连接,甚至在没有任何对端(peer)的情况下,ZMQ sockets上也能放入消息;
ZMQ sockets不是线程安全的,因此,不要在多个线程中并行操作同一个sockets。
创建ZMQ &Sockets
void *zmq_socket (void *context, int type);
注意,ZMQ socket在bind之前还不能使用。
type参数含义
description
一对一结对模型
请求回应模型
client端使用
server端使用
ZMQ_DEALER
将消息以轮询的方式分发给所有对端(peers)
ZMQ_ROUTER
发布订阅模型
publisher端使用
subscriber端使用
push端使用
pull端使用
ZMQ_STREAM
设置socket选项
int zmq_getsockopt (void *socket, int option_name, void *option_value, size_t *option_len);
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
关闭socket
int zmq_close (void *socket);
创建一个消息流
int zmq_bind (void *socket, const char *endpoint);
int zmq_connect (void *socket, const char *endpoint);
bind函数是将socket绑定到本地的端点(endpoint),而connect函数连接到指定的peer端点。
endpoint支持的类型:
transports
description
uri example
TCP的单播通信
tcp://*:8080
本地进程间通信
zmp_inproc
本地线程间通信
PGM广播通信
int zmq_send (void *socket, void *buf, size_t len, int flags);
int zmq_recv (void *socket, void *buf, size_t len, int flags);
int zmq_send_const (void *socket, void *buf, size_t len, int flags);
zmq_recv()函数的len参数指定接收buf的最大长度,超出部分会被截断,函数返回的值是接收到的字节数,返回-1表示出错;
zmq_send()函数将指定buf的指定长度len的字节写入队列,函数返回值是发送的字节数,返回-1表示出错;
zmq_send_const()函数表示发送的buf是一个常量内存区(constant-memory),这块内存不需要复制、释放。
socket事件监控
int zmq_socket_monitor (void *socket, char * *addr, int events);
zmq_socket_monitor()函数会生成一对sockets,publishers端通过inproc://协议发布 sockets状态改变的events;消息包含2帧,第1帧包含events id和关联值,第2帧表示受影响的endpoint。
监控支持的events:
ZMQ_EVENT_CONNECTED: 建立连接ZMQ_EVENT_CONNECT_DELAYED: 连接失败ZMQ_EVENT_CONNECT_RETRIED: 异步连接/重连ZMQ_EVENT_LISTENING: bind到端点ZMQ_EVENT_BIND_FAILED: bind失败ZMQ_EVENT_ACCEPTED: 接收请求ZMQ_EVENT_ACCEPT_FAILED: 接收请求失败ZMQ_EVENT_CLOSED: 关闭连接ZMQ_EVENT_CLOSE_FAILED: 关闭连接失败ZMQ_EVENT_DISCONNECTED: 会话(tcp/ipc)中断
I/O多路复用
int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
对sockets集合的I/O多路复用,使用水平触发。
与epoll类似,items参数指定一个结构体数组(结构体定义如下),nitems指定数组的元素个数,timeout参数是超时时间(单位:ms,0表示不等待立即返回,-1表示阻塞等待)。
typedef struct
} zmq_pollitem_t;
对于每个zmq_pollitem_t元素,ZMQ会同时检查其socket(ZMQ套接字)和fd(原生套接字)上是否有指定的events发生,且ZMQ套接字优先。
events指定该sockets需要关注的事件,revents返回该sockets已发生的事件,它们的取值为:
ZMQ_POLLIN,可读;
ZMQ_POLLOUT,可写;
ZMQ_POLLERR,出错;
一个ZMQ消息就是一个用于在消息队列(进程内部或跨进程)中进行传输的数据单元,ZMQ消息本身没有数据结构,因此支持任意类型的数据,这完全依赖于程序员如何定义消息的数据结构。
一条ZMQ消息可以包含多个消息片(multi-part messages),每个消息片都是一个独立zmq_msg_t结构。
ZMQ保证以原子方式传递消息,要么所有消息片都发送成功,要么都不成功。
初始化消息
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
zmq_msg_init()函数初始化一个消息对象zmq_msg_t ,不要直接访问zmq_msg_t对象,可以通过zmq_msg_* 函数来访问它。 zmq_msg_init()、zmq_msg_init_data()、zmq_msg_init_size() 三个函数是互斥的,每次使用其中一个即可。
设置消息属性
int zmq_msg_get (zmq_msg_t *message, int property);
int zmq_msg_set (zmq_msg_t *message, int property, int value);
int zmq_msg_close (zmq_msg_t *msg);
int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
&其中,flags参数如下:
ZMQ_DONTWAIT,非阻塞模式,如果没有可用的消息,将errno设置为EAGAIN;ZMQ_SNDMORE,发送multi-part messages时,除了最后一个消息片外,其它每个消息片都必须使用 ZMQ_SNDMORE 标记位。
获取消息内容
void *zmq_msg_data (zmq_msg_t *msg);
int zmq_msg_more (zmq_msg_t *message);
size_t zmq_msg_size (zmq_msg_t *msg);
zmq_msg_data()返回指向消息对象所带内容的指针;zmq_msg_size()返回消息的字节数;zmq_msg_more()标识该消息片是否是整个消息的一部分,是否还有更多的消息片待接收;
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
zmq_msg_copy()函数实现的是浅拷贝;zmq_msg_move()函数中,将dst指向src消息,然后src被置空。
eg,接收消息的代码示例:
while (true) {
Create an empty ?MQ message to hold the message part
int rc = zmq_msg_init (&part);
assert (rc == <span style="color: #);
Block until a message is available to be received from socket
rc = zmq_msg_recv (socket, &part, <span style="color: #);
assert (rc != -<span style="color: #);
if (zmq_msg_more (&part))
fprintf (stderr, "more\n");
fprintf (stderr, "end\n");
zmq_msg_close (&part);
ZMQ提供代理功能,代理可以在前端socket和后端socket之间转发消息。
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);
共享队列(shared queue),前端是ZMQ_ROUTER socket,后端是ZMQ_DEALER socket,proxy会把clients发来的请求,公平地分发给services;转发队列(forwarded),前端是ZMQ_XSUB socket, 后端是ZMQ_XPUB socket, proxy会把从publishers收到的消息转发给所有的subscribers;流(streamer),前端是ZMQ_PULL socket, 后端是ZMQ_PUSH socket.
proxy使用的一个示例:
Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == <span style="color: #);
assert (zmq_bind (backend, "tcp://*:5556") == <span style="color: #);
Start the queue proxy, which runs until ETERM zmq_proxy frontend, backend, NULL);
ZMQ库使用POSIX处理函数错误,返回NULL指针或者负数时表示调用出错。
int zmq_errno (void);
const char *zmq_strerror (int errnum);
zmq_errno()函数返回当前线程的错误码errno变量的值;
zmq_strerror()函数将错误映射成错误字符串。
ZQM可以为IPC和TCP连接提供安全机制:
不加密,zmq_null
使用用户名/密码授权,zmq_plain
椭圆加密,zmq_curve
这些通过 zmq_setsockopt()函数设置socket选项的时候配置。
1、仅仅提供24个API接口,风格类似于BSD Socket。
2、处理了网络异常,包括连接异常中断、重连等。
3、改变TCP基于字节流收发数据的方式,处理了粘包、半包等问题,以msg为单位收发数据,结合Protocol Buffers,可以对应用层彻底屏蔽网络通信层。
4、对大数据通过SENDMORE/RECVMORE提供分包收发机制。
5、通过线程间数据流动来保证同一时刻任何数据都只会被一个线程持有,以此实现多线程的“去锁化”。
6、通过高水位HWM来控制流量,用交换SWAP来转储内存数据,弥补HWM丢失数据的缺陷。
7、服务器端和客户端的启动没有先后顺序。
&标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&国之画&&&& &&&&chrome插件&&
版权所有 京ICP备号-2
迷上了代码!Keyboard Shortcuts?
Next menu item
Previous menu item
Previous man page
Next man page
Scroll to bottom
Scroll to top
Goto homepage
Goto search(current page)
Focus search box
Change language:
Brazilian Portuguese
Chinese (Simplified)
The ZMQ class
Introduction
Class synopsis
Predefined Constants
ZMQ Constant Types
ZMQ::SOCKET_PAIR
Exclusive pair pattern
ZMQ::SOCKET_PUB
Publisher socket
ZMQ::SOCKET_SUB
Subscriber socket
ZMQ::SOCKET_REQ
Request socket
ZMQ::SOCKET_REP
Reply socket
ZMQ::SOCKET_XREQ
Alias for SOCKET_DEALER
ZMQ::SOCKET_XREP
Alias for SOCKET_ROUTER
ZMQ::SOCKET_PUSH
Pipeline upstream push socket
ZMQ::SOCKET_PULL
Pipeline downstream pull socket
ZMQ::SOCKET_ROUTER
Extended REP socket that can route replies to requesters
ZMQ::SOCKET_DEALER
Extended REQ socket that load balances to all connected peers
ZMQ::SOCKET_XPUB
Similar to SOCKET_PUB, except you can receive subscriptions as messages.
The subscription message is 0 (unsubscribe) or 1 (subscribe) followed by the topic.
ZMQ::SOCKET_XSUB
Similar to SOCKET_SUB, except you can send subscriptions as messages. See SOCKET_XPUB for format.
ZMQ::SOCKET_STREAM
Used to send and receive TCP data from a non-?MQ peer. Available if compiled against ZeroMQ 4.x or higher (Value: ).
ZMQ::SOCKOPT_HWM
The high water mark for inbound and outbound messages is a hard limit on the maximum number of outstanding messages ?MQ shall queue in memory for any single peer that the specified socket is communicating with. Setting this option on a socket will only affect connections made after the option has been set. On ZeroMQ 3.x this is a wrapper for setting both SNDHWM and RCVHWM. (Value: ).
ZMQ::SOCKOPT_SNDHWM
The ZMQ_SNDHWM option shall set the high water mark for outbound messages on the specified socket. Available if compiled against ZeroMQ 3.x or higher (Value: ).
ZMQ::SOCKOPT_RCVHWM
The SOCKOPT_RCVHWM option shall set the high water mark for inbound messages on the specified socket. Available if compiled against ZeroMQ 3.x or higher (Value: ).
ZMQ::SOCKOPT_AFFINITY
Set I/O thread affinity (Value: )
ZMQ::SOCKOPT_IDENTITY
Set socket identity (Value: )
ZMQ::SOCKOPT_SUBSCRIBE
Establish message filter. Valid for subscriber socket (Value: )
ZMQ::SOCKOPT_UNSUBSCRIBE
Remove message filter. Valid for subscriber socket (Value: )
ZMQ::SOCKOPT_RATE
Set rate for multicast sockets (pgm) (Value:
ZMQ::SOCKOPT_RECOVERY_IVL
Set multicast recovery interval (Value:
ZMQ::SOCKOPT_RECONNECT_IVL
Set the initial reconnection interval (Value:
ZMQ::SOCKOPT_RECONNECT_IVL_MAX
Set the max reconnection interval (Value:
ZMQ::SOCKOPT_MCAST_LOOP
Control multicast loopback (Value:
ZMQ::SOCKOPT_SNDBUF
Set kernel transmit buffer size (Value:
ZMQ::SOCKOPT_RCVBUF
Set kernel receive buffer size (Value:
ZMQ::SOCKOPT_RCVMORE
Receive multi-part messages (Value: )
ZMQ::SOCKOPT_TYPE
Get the socket type. Valid for getSockOpt (Value: )
ZMQ::SOCKOPT_LINGER
The linger value of the socket. Specifies how long the socket blocks
trying flush messages after it has been closed (Value: )
ZMQ::SOCKOPT_BACKLOG
The SOCKOPT_BACKLOG option shall set the maximum length of the queue of outstanding peer connections for
this only applies to connection-oriented transports. (Value: )
ZMQ::SOCKOPT_MAXMSGSIZE
Limits the maximum size of the inbound message. Value -1 means no limit. Available if compiled against ZeroMQ 3.x or higher (Value: )
Sets the timeout for send operation on the socket. Value -1 means no limit. Available if compiled against ZeroMQ 3.x or higher (Value: )
Sets the timeout for receive operation on the socket. Value -1 means no limit. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_IPV4ONLY
Disable IPV6 support if 1. Available if compiled against ZeroMQ 3.x (Value: )
ZMQ::SOCKOPT_LAST_ENDPOINT
Retrieve the last connected endpoint - for use with * wildcard ports. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_TCP_KEEPALIVE_IDLE
Idle time for TCP keepalive. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_TCP_KEEPALIVE_CNT
Count time for TCP keepalive. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_TCP_KEEPALIVE_INTVL
Interval for TCP keepalive. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_DELAY_ATTACH_ON_CONNECT
Set a CIDR string to match against incoming TCP connections. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_TCP_ACCEPT_FILTER
Set a CIDR string to match against incoming TCP connections. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_XPUB_VERBOSE
Set the XPUB to receive an application message on each instance of a subscription. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::SOCKOPT_ROUTER_RAW
Sets the raw mode on the ROUTER, when set to 1. In raw mode when using tcp:// transport the socket will read and write without ZeroMQ framing.
Available if compiled against ZeroMQ 4.0 or higher (Value: )
ZMQ::SOCKOPT_IPV6
Enable IPV6. Available if compiled against ZeroMQ 4.0 or higher (Value: )
ZMQ::CTXOPT_MAX_SOCKETS
The socket limit for this context. Available if compiled against ZeroMQ 3.x or higher (Value: )
ZMQ::POLL_IN
Poll for incoming data
ZMQ::POLL_OUT
Poll for outgoing data
ZMQ::MODE_NOBLOCK
Non-blocking operation. Deprecated, use ZMQ::MODE_DONTWAIT instead
ZMQ::MODE_DONTWAIT
Non-blocking operation
ZMQ::MODE_SNDMORE
Send multi-part message
ZMQ::DEVICE_FORWARDER
Forwarder device
ZMQ::DEVICE_QUEUE
Queue device
ZMQ::DEVICE_STREAMER
Streamer device
ZMQ::ERR_INTERNAL
ZMQ extension internal error
ZMQ::ERR_EAGAIN
Implies that the operation would block when ZMQ::MODE_DONTWAIT is used
ZMQ::ERR_ENOTSUP
The operation is not supported by the socket type
ZMQ::ERR_EFSM
The operation can not be executed because the socket is not in correct state
ZMQ::ERR_ETERM
The context has been terminated
Table of Contents — ZMQ constructor
There are no user contributed notes for this page.博客分类:
关于ZeroMq(简称ZMQ)的定义、作用和强大这里就不再赘述了。总结一下它常见的几种经典模式,然后顺便提下我最近在高并发环境下使用它出现的一个异常及其解决过程吧!
(PS:图是盗的。。。。。)
一、REQ/REP模式
这是最常见的请求/响应模式。服务端作为发送方,客户端作为请求方。
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REP);
String url = "tcp://*:9999";
socket.bind(url);
boolean wait =
while (wait) {
request = socket.recv(0);
socket.send("OK".getBytes(), 1);
} catch (ZMQException e) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
System.out.println("Connecting to hello world server...");
socket.connect("tcp://localhost:9999");
String requestString = "Hello" + " ";
byte[] request = requestString.getBytes();
socket.send(request, ZMQ.NOBLOCK);
byte[] reply = socket.recv(0);
System.out.println("Received reply
[" + new String(reply) + "]");
说明:服务端bind一个端口,客户端则connect一个ip地址(服务端所在的ip)和相应的端口。服务端通过context.socket(ZMQ.REP);表明是服务端,同理,客户端通过context.socket(ZMQ.REQ);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!
二、PUB/SUB模式
发布/订阅模式,这种模式大家想必很熟悉,比如微博消息的推送等。
发布方作为服务端,多个订阅方作为客户端。代码略,总结一下:
发布端bind一个端口,订阅端则connect一个ip地址(服务端所在的ip)和相应的端口。服务端通过context.socket(ZMQ.PUB);表明是发布端,同理,订阅端通过context.socket(ZMQ.SUB);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!——这里发布方作为服务端,订阅方作为客户端。
三、PUSH/PULL模式
总结:push端bind一个端口,pull端则connect一个ip地址(push端端所在的ip)和相应的端口。push端端通过context.socket(ZMQ.PUSH);表明是push端,同理,订阅端通过context.socket(ZMQ.PULL);表明是客户端。服务端为了持续监听,必须要把recv写在一个循环里。一般是while(true)!——这里push端作为服务端,pull端作为客户端。
后两种模式写法类似第一种。
---------------------------------------
高并发下,出现的一个异常:
zmq.ZError$IOException: java.io.IOException: Unable to establish loopback connection
at zmq.Signaler.make_fdpair(Signaler.java:87)
at zmq.Signaler.&init&(Signaler.java:48)
at zmq.Mailbox.&init&(Mailbox.java:55)
at zmq.Ctx.&init&(Ctx.java:132)
at zmq.ZMQ.zmq_ctx_new(ZMQ.java:225)
……………………
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:106)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.&init&(PipeImpl.java:122)
at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:27)
at java.nio.channels.Pipe.open(Pipe.java:133)
at zmq.Signaler.make_fdpair(Signaler.java:85)
... 36 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:72)
... 41 more
说明ZMQ的连接不够!
解决办法:发现自己的一个类,是并发情况下每笔数据都会经过的类,而我把
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket
reg = context.socket(ZMQ.PUSH);
reg.connect("tcp://localhost:xxx");
写在这个类的方法里,造成每次经过该类的该方法,都会创建一次context和socket,也许在并发情况还不是很明显的情况下,这个异常不会出现。但在高并发的情形下,频繁地创建ZMQ.Context和ZMQ.Socket并且再connect时,会造成很大的网络开销。故把该代码块放在static{...}类的静态代码块中——无论并发情况如何,只创建和连接一次。while循环里面有的代码,只能是send操作和recv操作!避免把ZMQ的Context,Socket的创建和connect操作放在循环里面。
改完之后,压力测试48w+数据,ZMQ终于没有挂了。。
Everyday都不同
浏览: 205115 次
来自: 江西
GoEasy实时Web推送,支持后台推送和前台推送两种:后台推 ...
Str5=Str1+Str2+Str3+Str4,这条语句执行 ...
写的不对,http://blog.csdn.net/lian_ ...
嗯,一针见血的指出了问题,谢谢博主
poi实现excel的导入导出就是好复杂啊 还是pageoff ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 push和pull 的文章

 

随机推荐