golang mqtt brserver 哪个靠谱

各种MQTT Server(如Apollo、Mosquitto)单台能支持多少client? - 知乎82被浏览15256分享邀请回答97 条评论分享收藏感谢收起22 条评论分享收藏感谢收起查看更多回答1、node.js实战--一个极其简单的MQTT服务器及通信 - 简书
1、node.js实战--一个极其简单的MQTT服务器及通信
,是IBM开发的一个及时通讯协议。关于更多的MQTT协议的内容,请自行百度、google之。本文主要介绍如何使用node.js搭建一个极其简单的MQTT服务器,并和nodeMCU进行通信。
服务端的实现
这里使用安装了工具的visual studio 2015作为开发环境。创建一个空白的Web应用。当然,你也可以创建express的工程。
新建的工程附带了一个hello world的例子,先把代码删掉。想要用node.js做mqtt服务器,我们还需要个名字叫的module。这里以Embedded的方式来install。在vs的命令行窗口里面输入
.npm install mosca --save
注意npm前面有个点。打开命令行窗口的方法见下图。
使用命令行加入module
加入mosca模块
var mosca = require('mosca')
var moscaSettings = {
port: 1883,
这里我们不需要使用数据库、设置密码之类的东西,只设置端口就行。接下来创建一个server
var server = new mosca.Server(moscaSettings);
绑定事件回调,mosca有下面几种事件,按需绑定。
clientConnected, 客户端已经连接,
参数:client;
clientDisconnecting, 客户端正在断开连接, 参数:client;
clientDisconnected, 客户端已经断开连接,
参数:client;
published, 新消息发布,
参数:packet, client;
subscribed, 客户端订阅信息,
参数:topic,
unsubscribed, 客户端取消订阅信息,
参数:topic,
server.on('ready', setup);
function setup() {
console.log('Mosca server is up & running');
server.on('clientConnected', function (client) {
console.log('Client Connected: ', client.id);
server.on('clientDisconnected', function (client) {
console.log('Client Disconnected: ', client.id);
published事件单独拿出来说一下。packet包含了下面几个内容:
topic,主题
payload,内容
server.on('published', function (packet, client) {
//console.log('Published: ', packet);
switch (packet.topic) {
case 'test':
console.log("payload: ", packet.payload.toString());
var msg = {
topic: 'repeat',
payload: packet.payload,
retain: false
server.publish(msg, function () {
console.log('repeat!
这里接收到topic为test就把原文中的payload publish出去。到此,工作就完成了。编译,调试-开始执行(不调试)。接着用来测试效果。先创建一个连接,在server URL那添加端口,其他默认即可,配置如下:
接下来,用nodeMCU来做客户端,把串口的数据转发个服务端。首先,我们要让nodeMCU模块可以连入我们的局域网。非常简单,只需要几句语句就可以解决。
wifi.setmode(wifi.STATION)
wifi.sta.autoconnect(1)
wifi.sta.config("ssid", "password")
连入WiFi后,可能需要一点时间才能连接成功。接着创建一个MQTT的客户端,绑定connect事件回调和message事件回调。
m = mqtt.Client("nodeMCU", 120)
m:on("connect", function(client) print("connected") end)
m:on("message", function(client, topic, data)
print(topic .. ":")
if data ~= nil then
print(data)
函数m:connect用来连接到node.js的MQTT服务器。不过,连接可能需要点事件,也不一定一次就连接成功。所以,重新封装了一下用一个定时器来回调轮询。
function connect()
m:connect("192.168.199.202",1883,
function(client)
print("connected")
function(client, reason)
print("fail reason: " .. reason)
function publish()
tmr.stop(0)
m:subscribe("repeat", 0, function(client) print("subscribe success") end)
tmr.alarm(0, 2000, tmr.ALARM_AUTO, function()
m:publish("test", "SOS", 0, 0)
tmr.alarm(0, 2000, tmr.ALARM_AUTO, connect)
m:subscribe函数用来订阅某个topic。m:publish用来发布一个消息。这里使用定时器定时发布消息。服务器会在收到消息后,转发出来,topic为repeat。于是,我们可以看到nodeMCU一直在print自己发布出去的消息。效果如下:
在嵌入式开发中(特别是物联网相关的产品),可能会用到串口来和其他模块进行通信。如果通信协议是自定义,调试中可能要和上层服务通信,增加了整个调试的复杂性。这种情况下,就可以借助nodemcu的串口插入到单片机的串口与模块中间。接收并转发单片机发送出来的串口数据,同时又可以把数据publish到前面实现的mqtt服务器上。做到既不影响单片机与模块通信,又可以分析数据。借助node.js丰富的模块,可以实现更加复杂高效的调试工具,告别使用串口调试软件看hex数据这种虐心的调试手段。
这里给个例子供参考。主要用到了中的uart、wifi、mqtt几个模块。nodeMCU接收到串口数据后,把数据又通过串口发送出去。可以将nodeMCU接到单片机的TX与模块的RX管脚中间。与此同时,nodeMCU还将收到的串口数据打包成mqtt数据发往服务器。
gpio.mode(1, gpio.INPUT, gpio.PULLUP)
wifi.setmode(wifi.STATION)
wifi.sta.autoconnect(1)
wifi.sta.config("ssid", "password")
m = mqtt.Client("nodeMCU", 120)
m:on("connect", function(client) print("connected") end)
function connect()
m:connect("192.168.199.202",1883,
function(client)
print("connected")
function(client, reason)
print("fail reason: " .. reason)
tmr.register(1, 1, tmr.ALARM_SEMI, function()
uart.write(0, buf)
m:publish("test", buf, 0, 0)
tmr.alarm(2, 10000, tmr.ALARM_SINGLE, function()
if(gpio.read(1) == 0) then
uart.setup(0, 38400, 8, uart.PARITY_NONE, uart.STOPBITS_1, 0)
print("38400 8-n-1")
uart.on("data", function(data)
buf = buf .. data
tmr.stop(1)
tmr.interval(1, 1)
tmr.start(1)
uart.setup(0, , uart.PARITY_NONE, uart.STOPBITS_1, 1)
print("-n-1")
uart.on("data")
捕捉串口数据
服务端接收到publish的数据后,可以解析成可以读懂的字符数后,既通过console打印出来,也可以在web页面上显示。具体实现方法不在这里赘述。
爱折腾,不务正业,不鸡汤的90后程序猿38977人阅读
android(225)

最近公司做的项目中有用到消息推送,经过多方面的筛选之后确定了使用MQTT协议,相对于XMPP,MQTT更加轻量级,并且占用用户很少的带宽。
MQTT是IBM推出的一种针对移动终端设备的基于TCP/IP的发布/预订协议,可以连接大量的远程传感器和控制设备。
MQTT的官网见:。其中里面提供了官方推荐的各种服务器和客户端使用的各种语言版本的API。
下面以服务器Apollo 1.6为例,之前尝试过使用ActiveMQ,效果很不理想,只能实现服务器和客户端一对一的通信,从上了解到Apollo属于activemq的一个子工程。先不管这些了,言归正传,以下在windows环境下。
1、在下载Apollo服务器,下载后解压,然后运行apache-apollo-1.6\bin\apollo.cmd,输入create mybroker(名字任意取,这里是根据介绍的来取的)创建服务器实例,服务器实例包含了所有的配置,运行时数据等,并且和一个服务器进程关联。
2、create mybroker之后会在bin目录下生成mybroker文件夹,里面包含有很多信息,其中etc\apollo.xml文件下是配置服务器信息的文件,etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,后面会介绍,可以修改原始的admin=password,可以接着换行添加新的用户名密码。
3、打开cmd,运行…apache-apollo-1.6\bin\mybroker\bin\apollo-broker.cmd run 开启服务器,可以在浏览器中输入查看是否安装成功,该界面展示了topic,连接数等很多信息。
经过上面的简单步骤,服务器基本上就已经完成,下一篇将介绍Android客户端的编写和注意事项。
客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:
1、新建android工程MQTTClient
2、MainActivity代码如下:package ldw.
import java.util.concurrent.E
import java.util.concurrent.ScheduledExecutorS
import java.util.concurrent.TimeU
import org.eclipse.paho.client.mqttv3.IMqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttE
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.persist.MemoryP
import android.app.A
import android.os.B
import android.os.H
import android.os.M
import android.view.KeyE
import android.widget.TextV
import android.widget.T
public class MainActivity extends Activity {
private TextView resultTv;
private String host = &tcp://127.0.0.1:1883&;
private String userName = &admin&;
private String passWord = &password&;
private MqttC
private String myTopic = &test/topic&;
private MqttConnectO
private ScheduledExecutorS
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main);
resultTv = (TextView) findViewById(R.id.result);
handler = new Handler() {
public void handleMessage(Message msg) {
super.handleMessage(msg);
if(msg.what == 1) {
Toast.makeText(MainActivity.this, (String) msg.obj,
Toast.LENGTH_SHORT).show();
System.out.println(&-----------------------------&);
} else if(msg.what == 2) {
Toast.makeText(MainActivity.this, &连接成功&, Toast.LENGTH_SHORT).show();
client.subscribe(myTopic, 1);
} catch (Exception e) {
e.printStackTrace();
} else if(msg.what == 3) {
Toast.makeText(MainActivity.this, &连接失败,系统正在重连&, Toast.LENGTH_SHORT).show();
startReconnect();
private void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
if(!client.isConnected()) {
connect();
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
private void init() {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, &test&,
new MemoryPersistence());
//MQTT的连接设置
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(userName);
//设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置回调
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
System.out.println(&connectionLost----------&);
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
System.out.println(&deliveryComplete---------&
+ token.isComplete());
public void messageArrived(String topicName, MqttMessage message)
throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println(&messageArrived----------&);
Message msg = new Message();
msg.what = 1;
msg.obj = topicName+&---&+message.toString();
handler.sendMessage(msg);
connect();
} catch (Exception e) {
e.printStackTrace();
private void connect() {
new Thread(new Runnable() {
public void run() {
client.connect(options);
Message msg = new Message();
msg.what = 2;
handler.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
Message msg = new Message();
msg.what = 3;
handler.sendMessage(msg);
}).start();
public boolean onKeyDown(int keyCode, KeyEvent event) {
if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {
client.disconnect();
} catch (Exception e) {
e.printStackTrace();
return super.onKeyDown(keyCode, event);
protected void onDestroy() {
super.onDestroy();
scheduler.shutdown();
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
由于项目需要,我用到了心跳重连。根据的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。
3、新建j2se工程MQTTServer
4、Server代码如下:import java.awt.C
import java.awt.event.ActionE
import java.awt.event.ActionL
import javax.swing.JB
import javax.swing.JF
import javax.swing.JP
import org.eclipse.paho.client.mqttv3.IMqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.MqttT
import org.eclipse.paho.client.mqttv3.persist.MemoryP
public class Server extends JFrame {
private static final long serialVersionUID = 1L;
private JP
private JB
private MqttC
private String host = &tcp://127.0.0.1:1883&;
// private String host = &tcp://localhost:1883&;
private String userName = &test&;
private String passWord = &test&;
private MqttT
private MqttM
private String myTopic = &test/topic&;
public Server() {
client = new MqttClient(host, &Server&,
new MemoryPersistence());
connect();
} catch (Exception e) {
e.printStackTrace();
Container container = this.getContentPane();
panel = new JPanel();
button = new JButton(&发布话题&);
button.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent ae) {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println(token.isComplete()+&========&);
} catch (Exception e) {
e.printStackTrace();
panel.add(button);
container.add(panel, &North&);
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println(&connectionLost-----------&);
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println(&deliveryComplete---------&+token.isComplete());
public void messageArrived(String topic, MqttMessage arg1)
throws Exception {
System.out.println(&messageArrived----------&);
topic = client.getTopic(myTopic);
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
System.out.println(message.isRetained()+&------ratained状态&);
message.setPayload(&eeeeeaaaaaawwwwww---&.getBytes());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
public static void main(String[] args) {
Server s = new Server();
s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
s.setSize(600, 370);
s.setLocationRelativeTo(null);
s.setVisible(true);
上面代码跟客户端的代码差不多,这里就不做解释了。
没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。
jar包下载地址:
https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1458127次
积分:11685
积分:11685
排名:第1336名
原创:112篇
转载:238篇
评论:164条
(2)(1)(7)(23)(9)(8)(20)(6)(17)(59)(52)(51)(58)(4)(1)(1)(2)(9)(1)(3)(2)(4)(8)(2)MQTT Server 性能测试分析
我的图书馆
MQTT Server 性能测试分析
& &/articles/rmUBJf2012&&MQTT (Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。
apollo是一个基于java的开源消息服务器,通过安装插件,可以支持MQTT消息。mosquitto是另一个用C语言开发的开源MQTT服务器。为了测试这两种服务器的性能,我下载了一个
的基准测试程序,运行在亚马逊EC2的一个large实例上。运行时间大约为3个小时。 实例启动后,先安装jdk1.6,然后安装,运行MQTT Benchmark。
sudo apt-get install screen
curl /chirino/mqtt-benchmark/tarball/master | tar -zxv
mv chirino-mqtt-benchmark-* mqtt-benchmark
screen ./mqtt-benchmark/bin/benchmark-all
测试后的结果见
,测试结果表明,apollo在重负载的情况下,性能要高于mosquitto,而且更加稳定。 &&
TA的最新馆藏
喜欢该文的人也喜欢MQTT -&MQ Telemetry Transport
轻量级的 machine-to-machine 通信协议。
publish/subscribe模式。
基于TCP/IP。
适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。
是一种比较不错的Android消息推送方案。
FacebookMessenger采用了MQTT。
MQTT有可能成为物联网的重要协议。
MessageType
TCP连接建立完毕后,Client向Server发出一个Request。
如果一段时间内接收不到Server的Response,则关闭socket,重新建立一个session连接。
如果一个ClientID已经与服务器连接,则持有同样ClientID的旧有连接必须由服务器关闭后,新建立才能建立。
Server发出Response响应。
0x00 Connection Accepted
0x01 Connection Refused: unacceptable protocol version
0x02 Connection Refused: identifier rejected
0x03 Connection Refused: server unavailable
0x04 Connection Refused: bad user name or password
0x05 Connection Refused: not authorized
PUBLISH 发布消息
Client/Servier均可以进行PUBLISH。
publish message 应该包含一个TopicName(Subject/Channel),即订阅关键词。
关于Topic通配符
/:用来表示层次,比如a/b,a/b/c。
#:表示匹配&=0个层次,比如a/#就匹配a/,a/b,a/b/c。
单独的一个#表示匹配所有。
不允许 a#和a/#/c。
+:表示匹配一个层次,例如a/+匹配a/b,a/c,不匹配a/b/c。
单独的一个+是允许的,a+不允许,a/+/b不允许
PUBACK 发布消息后的确认
QoS=1时,Server向Client发布该确认(Client收到确认后删除),订阅者向Server发布确认。
PUBREC / PUBREL / PUBCOMP
1. Server-&Client发布PUBREC(已收到);
2. Client-&Server发布PUBREL(已释放);
3. Server-&Client发布PUBCOMP(已完成),Client删除msg;
订阅者也会向Server发布类似过程确认。
PINGREQ / PINGRES 心跳
Client有责任发送KeepAliveTime时长告诉给Server。在一个时长内,发送PINGREQ,Server发送PINGRES确认。
Server在1.5个时长内未收到PINGREQ,就断开连接。
Client在1个时长内未收到PINGRES,断开连接。
一般来说,时长设置为几个分钟。最大18hours,0表示一直未断开。
QoS=0:最多一次,有可能重复或丢失。
QoS=1:至少一次,有可能重复。
Client[Qos=1,DUP=0/*重复次数*/,MessageId=x] ---&PUBLISH--& Server收到后,存储Message,发布,删除,向Client回发PUBACK
Client收到PUBACK后,删除Message;如果未收到PUBACK,设置DUP++,重新发送,Server端重新发布,所以有可能重复发送消息。
QoS=2:只有一次,确保消息只到达一次(用于比较严格的计费系统)。
Clean Session
如果为false(flag=0),Client断开连接后,Server应该保存Client的订阅信息。
如果为true(flag=1),表示Server应该立刻丢弃任何会话状态信息。
阅读(...) 评论()

我要回帖

更多关于 php mqtt server 的文章

 

随机推荐