mqttandroid mqtt clientclient.connect 失败了怎么办

采用MQTT协议实现Android消息推送
我的图书馆
采用MQTT协议实现Android消息推送
[原]采用MQTT协议实现Android消息推送
对于消息推送,一开始还真不知道什么方式比较好,一头雾水,现在回顾总结下资料。
http://zheye.org/asks/4d99a1aafd503c41d700000a通过上面者也里面的回复,得到一些信息。1.官方的C2DM,但是只支持android2.2及以上平台的,而且使用的google的服务器。对于google服务器的问题,网友应该都清楚,天朝的大中华区局域网总是让它不时的给你断一下。
2.第三方的androidpn,和C2DM一样,都是基于XMPP扩展的,是一个开源的项目,据说不错。http://sourceforge.net/projects/androidpn/但是是基于长连接的,如果客户端数量大,特别像手机这种都是长期在线的设备,会有两个问题,(1)服务器压力,(2)手机的电池不够用啊,电量卡卡卡的被你耗光了(需要优化网络机制)。
3.使用IBM 的MQTT协议实现push消息地址:/2010/how-to-implement-push-notifications-for-android/这是一个非常理想的解决方案,是基于tcp协议的,低带宽通信,而且国外友人已经测试,耗电量很多哦~
都是E文的,不习惯看E文的,也没关系,有一前辈给整理了一个中文的:《Android推送通知指南》http://blog.csdn.net/joshua_yu/article/details/6563587看了上面这些文章的内容,完成上面的例子,然后看看源码,应该明白一些了。
========================================MQTT是一项消息传递技术,由IBM再2001年发布。
总结一下,机制就是使用一个代理服务器message broker,客户端client连接上这个服务器,然后告诉服务器说,我可以接收哪些类型的消息,同时,client也可以发布自己的消息,这些消息根据协议的内容,可以被其他client获取。
只要手机客户端,连上服务器,然后就可以接收和发布消息了,不用自己写socket什么了,
低带宽,低耗电量,代码量也少,很简单吧。
package com.pig.test.
import com.ibm.mqtt.MqttCimport com.ibm.mqtt.MqttEimport com.ibm.mqtt.MqttSimpleC
public class SubscribeClient {&private final static String CONNECTION_STRING = "tcp://192.168.1.60:1883";&private final static boolean CLEAN_START =&private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s&private final static String CLIENT_ID = "client1";&private final static String[] TOPICS = {&&"Test/TestTopics/Topic1",&&"Test/TestTopics/Topic2",&&"Test/TestTopics/Topic3",&&"tokudu/client1"&};&private final static int[] QOS_VALUES = {0, 0, 2, 0};&&//////////////////&private MqttClient mqttClient =&&public SubscribeClient(String i){&&try {&&&mqttClient = new MqttClient(CONNECTION_STRING);&&&SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();&&&mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法&&&mqttClient.connect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);&&&mqttClient.subscribe(TOPICS, QOS_VALUES);//订阅接主题&&&&&&/**&&& * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息&&& */
&&&mqttClient.publish(PUBLISH_TOPICS, "keepalive".getBytes(), QOS_VALUES[0], true);&&&&&} catch (MqttException e) {&&&// TODO Auto-generated catch block&&&e.printStackTrace();&&}&}
&/**& * 简单回调函数,处理client接收到的主题消息& * @author pig& *& */&class SimpleCallbackHandler implements MqttSimpleCallback{
&&/**&& * 当客户机和broker意外断开时触发&& * 可以再此处理重新订阅&& */&&@Override&&public void connectionLost() throws Exception {&&&// TODO Auto-generated method stub&&&System.out.println("客户机和broker已经断开");&&}
&&/**&& * 客户端订阅消息后,该方法负责回调接收处理消息&& */&&@Override&&public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {&&&// TODO Auto-generated method stub&&&System.out.println("订阅主题: " + topicName);&&&System.out.println("消息数据: " + new String(payload));&&&System.out.println("消息级别(0,1,2): " + Qos);&&&System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);&&}&&&}&&/**& * 高级回调& * @author pig& *& */&class AdvancedCallbackHandler implements MqttSimpleCallback{
&&@Override&&public void connectionLost() throws Exception {&&&// TODO Auto-generated method stub&&&&&}
&&@Override&&public void publishArrived(String arg0, byte[] arg1, int arg2,&&&&boolean arg3) throws Exception {&&&// TODO Auto-generated method stub&&&&&}&&&}&&/**& * @param args& */&public static void main(String[] args) {&&// TODO Auto-generated method stub&&&new SubscribeClient("" + i);
broker服务器,MQTT的jar包,记得下载啊,没有就消息我咯~
========================================到这里,如果完成IBM 的MQTT协议实现push消息的实例的,都会有个问题,好像没考虑到安全问题,如果客户端连上来作乱怎么办呢?
上面用的broker时rsmb的,mqtt的简单服务器。IBM已经推出了MQTT V3.1版本,已经加入了安全验证机制,不要怕啦。据国外网友说,facebook在2011年8月就是用的mqtt v3.1做的应用哦。
到这里MQTT学习资料整理差不多了,下篇,mqtt v3.1版本服务器的测试使用。
绿色通道:
(请您对文章做出评价)
阅读(25353) 评论(11) &
正要做这方面东西,学习了,谢楼主分享!
楼主,能发jar包到我的邮箱吗?谢谢了,
十分感谢楼主分享。可以把jar包发给我吗?十分感谢。
谢谢楼主,可以发jar包给我吗?
谢谢楼主,可以发jar包给我吗?
在android中server进程被kill后,如果推送消息,客户端将会无法收到消息即使在发送后重新打开应用也无法收到,请问楼主是怎么解决的?
jar包就没官方的获取地址么?楼主先发我下吧 谢谢
楼主 有完整的demo吗 发我一份哈
可以了解一下开源的DDPush推送服务器,单机千万在线,www.ddpush.net
连接上 怎么发消息啊,具体的该怎么用啊?我下载的手机端的程序,也安装了rsmb代理服务器,运行了broker.exe,运行手机程序后,看见Connection attempt to listener 1883 received from client tokudu/bda9ca7c on address 10.0.0.14:36791 的字样,然后该怎么用啊?刚刚接触mqtt 不是很明白,能给我一份完成的demo吗?谢谢了
发表评论:
馆藏&21404
TA的最新馆藏MQTT客户端实现 - 简书
MQTT客户端实现
我们使用ibm提供的com.ibm.micro.client.mqttv3.jar来实现。官方教程链接如下 鉴于英语水平有限,我还是算了吧。
来说干货。
1、首先导入com.ibm.micro.client.mqttv3.jar包
com.ibm.micro.client.mqttv3.jar
2、我们来实现MqttV3Service的类,其中里面实现了mqtt的连接、断开、发布消息的方法。
import android.os.H
import com.ibm.micro.client.mqttv3.MqttC
import com.ibm.micro.client.mqttv3.MqttConnectO
import com.ibm.micro.client.mqttv3.MqttDeliveryT
import com.ibm.micro.client.mqttv3.MqttE
import com.ibm.micro.client.mqttv3.MqttM
import com.ibm.micro.client.mqttv3.MqttPersistenceE
import com.ibm.micro.client.mqttv3.MqttT
import java.util.ArrayL
public class MqttV3Service {
String addr = "";
String port = "";
private static MqttClient client =
private static MqttTopic topic =
static ArrayList&MqttTopic& topicList = new ArrayList&MqttTopic&();
public static boolean connectionMqttServer(Handler handler, String ServAddress, String ServPort, String userID, ArrayList&String& Topics) {
String connUrl = "tcp://" + ServAddress + ":" + ServP
client = new MqttClient(connUrl, userID, null);
for (int i = 0; i & Topics.size(); i++) {
topic = client.getTopic(Topics.get(i));
topicList.add((MqttTopic) topic);
CallBack callback = new CallBack(userID, handler);
client.setCallback(callback);
MqttConnectOptions conOptions = new MqttConnectOptions();
conOptions.setUserName(MyApplication.gUserName);
conOptions.setPassword(Pssword.toCharArray());
conOptions.setCleanSession(false);
char[] ddd = conOptions.getPassword();
System.out.println(ddd);
client.connect(conOptions);
for (int i = 0; i & Topics.size(); i++) {
client.subscribe(Topics.get(i), 1);
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
public static boolean closeMqtt() {
client.disconnect();
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
public static boolean publishMsg(String msg, int Qos, int position) {
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(Qos);
MqttDeliveryT
token = topicList.get(position).publish(message);
while (!token.isComplete()) {
token.waitForCompletion(1000);
} catch (MqttPersistenceException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
在里面我们用到了用户端的一个回调CallBack(userID, handler)我们先来看看MqttCallback接口的方法
public interface MqttCallback {
void connectionLost(Throwable var1);
void messageArrived(MqttTopic var1, MqttMessage var2) throws E
void deliveryComplete(MqttDeliveryToken var1);
由此我们可以看到,我们可以使用回调来实现消息到达的处理、连接断开、发送完毕的处理。
import android.os.*;
import com.ibm.micro.client.mqttv3.*;
public class CallBack implements MqttCallback {
private String instanceData = "";
public CallBack(String instance, Handler handler) {
instanceData =
this.handler =
public void messageArrived(MqttTopic topic, MqttMessage message) {
Message msg = Message.obtain();
Bundle bundle = new Bundle();
bundle.putString("content", message.toString());
msg.what = 2;
msg.setData(bundle);
handler.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
//下面两个方法交给你们自己去实现吧。?
public void connectionLost(Throwable throwable) {
public void deliveryComplete(MqttDeliveryToken token) {
至此我们已经完成了mqtt功能驱动。下一步我们便可以结合具体使用情境来做了
3、在Activity中结合具体方法来使用。
其实我们使用也不外乎就这么几种,连接、订阅话题、发布消息,断线处理。我们可以建立一个线程,在这个线程中配置好mqtt的连接,因为我们已经在连接的方法中封装了回调函数。那么当我们连接成功后如果有消息到达等都会接受到。我们可以使用一个handler来负责接受各种消息。思路有了,代码就有了。
public void onNothingSelected(AdapterView&?& parent) {
public class MqttProcThread implements Runnable
int randomid
=(int)Math.floor(10000+Math.random()*90000);
public void run() {
Message msg = new Message();
boolean ret = MqttV3Service.connectionMqttServer(myHandler,ADDRESS,PORT,"lexin"+randomid,topicList);
msg.what = 1;
msg.what = 0;
msg.obj = "strresult";
myHandler.sendMessage(msg);
@SuppressWarnings("HandlerLeak")
private Handler myHandler = new Handler() {
public void handleMessage(Message msg) {
super.handleMessage(msg);
if (msg.what == 1){
Toast.makeText(context,"连接成功",Toast.LENGTH_SHORT).show();
}else if (msg.what == 0){
Toast.makeText(context,"连接失败",Toast.LENGTH_SHORT).show();
}else if(msg.what == 2) {
String strContent = "";
strContent += msg.getData().getString("content");
System.out.println("strcontent:"+strContent);
else if (msg.what == 3){
if (MqttV3Service.closeMqtt()){
Toast.makeText(context,"断开连接",Toast.LENGTH_SHORT).show();
如此OK。 在我们使用的地方只需新建这个线程就可以了
new Thread(new MqttProcThread()).start();
至此我们已经完成了mqtt客户端的功能了。???????????????????android(11)
& CommsCallback --&ClientComms( ClientComms class) --&MqttAsyncClient(&MqttAsyncClient class)--&MqttClient(&MqttClient &class)-&&
================
mqttandroidclient &connect &调用序列:
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener()
& * IMqttActionListener listener=token.getActionCallback & & &(); 就是传入的listener
--&MqttAndroidClient.this.doConnect();
---& this.mqttService.connect(this.clientHandle, this.connectOptions, (String)null, activityToken);
--& MqttConnection client &; client.connect(connectOptions, invocationContext, activityToken);
& &in MqttConnect.class 中, &this.myClient = new MqttAsyncClient(this.serverURI, this.clientId, this.persistence, new AlarmPingSender(this.service));
& &new MqttAsyncCcreate connectcomm
--& & &connectActionListener.connect();
--& &ms.connect(this.options, token);
& & & &MqttConnect connect = new MqttConnect
& & &ClientComms.ConnectBG conbg = new ClientComms.ConnectBG(this, token, connect);
& & conbg.start();
==========================
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:3606次
排名:千里之外
转载:11篇
(2)(1)(8)(1)(3)mqtt向android推送消息(一)——发送端使用.net - 推酷
mqtt向android推送消息(一)——发送端使用.net
使用.net进行mqtt协议通讯,主要是为了开发居于mqtt协议向android推送消息,使用.net开发可以参考mqtt.org上面的库,我用的是M2MQTT,网上资料很少,具体例子如下:
一、消息发布:
if (this.sender == null)
this.sender = new MqttClient(IPAddress.Parse(host), 1883);
//var mqttClient = new MqttClient(&localhost&);
this.sender.Connect(&sender&);
this.sender.Publish(&mobileGKTopic&, System.Text.Encoding.Default.GetBytes(textBox2.Text));
&二、消息接收
mqttClient = new MqttClient(IPAddress.Parse(host), 1883);
//mqttClient = new MqttClient(&localhost&);
mqttClient.Connect(&Receiver&);
mqttClient.MqttMsgPublishReceived += new MqttClient.MqttMsgPublishEventHandler(mqttClient_MqttMsgPublishReceived);
mqttClient.Subscribe(new string[] { &testTopic& }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
private void mqttClient_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
//throw new NotImplementedException();
var msg = System.Text.Encoding.Default.GetString(e.Message);
if (!String.IsNullOrEmpty(msg))
textBox1.AppendText(Environment.NewLine);
textBox1.AppendText(msg);
&需要注意的是,编译mqtt库时要把条件编译SSL关闭,SSL的编译我还没有高清。
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致写了一个mqttconnector 半残品(可能也算不上) - Pomelo Club
写了一个mqttconnector 半残品(可能也算不上)
17:37重新编辑
把mqttconnector.js、mqttsocket.js 放在 /usr/local/node/lib/node_modules/pomelo/lib/connectors 下
在/usr/local/node/lib/node_modules/pomelo/lib/pomelo.js 56行添加
Pomelo.connectors.__defineGetter__(&mqttconnector&, function() {
return require(&./connectors/mqttconnector&);
先安装mqtt
npm install mqtt -gd
等待官方的mqttconnector 让我们这些菜鸟学习下
我想知道官方的mqtt broker是自己实现的还是第三方的 第二个是官方的消息队列是不是用到了rabbitmq的mqtt插件第三个是官方的推送服务到底实现了哪些功能
1.broker是自己实现的2.消息队列用的就是rabbitmq,在rabbitmq上面封装了一层路由实现高可用而已。3.实现的功能有私信、广播消息推送,支持平台、版本号的过滤;离线消息,链路复用等功能。
@ pomelo消息队列是在后台服务器里操作的吗 但是队列有个问题就是一个消费者取了之后 其他消费者连上来就没有了 怎么才能实现每个用户上线之后能收到消息队列存的信息 每个用户一个消息队列吗 你们的是怎么和rabbitmq整在一起的
@ 我们的rabbitmq和消费者之间还有个redis做为消息存储的!每个用户一个队列是终极解决方案,非常完美!不过目前没有这方面的中间件,rabbitmq支撑不了这么多队列,redis的队列又不完善!以后或许会搞出这么一个中间件吧……
@ 刚才在rabbitmq(兔子没钱)群里讨论 知道有个rocketmq() 貌似很厉害的样子
@ 为什么支撑不了这么多队列 不是分布式的吗
mqttconnector.js
var util = require(&util&);
var EventEmitter = require(&events&).EventE
var mqtt = require(&mqtt&);
var MqttSocket = require(&./mqttsocket&);
var Connector = function(port, host, opts) {
if (!(this instanceof Connector)) {
return new Connector(port, host, opts);
EventEmitter.call(this);
this.port =
this.host =
this.opts =
util.inherits(Connector, EventEmitter);
module.exports = C
Connector.prototype.start = function(cb) {
var self =
//开启mqtt服务
mqtt.createServer(function(socket) {
socket.on(&connect&, function(packet) {
socket.connack({returnCode: 0});
//构造MqttSocket
var mqttsocket = new MqttSocket(packet.clientId, socket);
//当有连接事件发生时,能将连接事件抛出,并传入mqttsocket
self.emit(&connection&, mqttsocket);
}).listen(this.port);
process.nextTick(cb);
Connector.prototype.stop = function(force, cb) {
process.nextTick(cb);
var composeResponse = function(msgId, route, msgBody) {
id: msgId,
route: route,
body: msgBody
var composePush = function(route, msgBody) {
return JSON.stringify({route: route, body: msgBody});
Connector.encode = Connector.prototype.encode = function(reqId, route, msg) {
if(reqId) {
return composeResponse(reqId, route, msg);
return composePush(route, msg);
Connector.decode = Connector.prototype.decode = function(msg) {
if(typeof msg == &string&){
msg = JSON.parse(msg);
id: msg.id,
route: msg.route,
body: msg.body
mqttsocket.js
var util = require(&util&);
var EventEmitter = require(&events&).EventE
var ST_INITED = 0;
var ST_CLOSED = 1;
var Socket = function(id, socket) {
EventEmitter.call(this);
this.socket =
var self =
socket.on(&publish&, function(packet) {
self.emit(&message&, JSON.stringify({id: Date.now(), route: packet.topic, body: packet.payload}));
socket.on(&subscribe&, function(packet) {
var granted = [];
for (var i = 0; i & packet.subscriptions. i++) {
granted.push(packet.subscriptions[i].qos);
socket.suback({granted: granted, messageId: packet.messageId});
socket.on(&close&, function(err) {
self.emit(&close&);
socket.on(&disconnect&, function(){
socket.stream.end();
self.emit(&disconnect&);
socket.on(&error&, function(err) {
socket.stream.end();
self.emit(&error&);
socket.on(&pingreq&, function(packet) {
socket.pingresp();
this.state = ST_INITED;
util.inherits(Socket, EventEmitter);
//用于当服务器想把某个用户kick掉的时候调用
Socket.prototype.disconnect = function() {
if(this.state === ST_CLOSED) {
this.state = ST_CLOSED;
this.socket.stream.end();
this.emit(&disconnect&);
Socket.prototype.send = function(msg) {
if(this.state !== ST_INITED) {
if(typeof msg == &string&){
msg = JSON.parse(msg);
this.socket.publish({topic: msg.route, payload: msg.body});
Socket.prototype.sendBatch = function(msgs) {
this.send(encodeBatch(msgs));
var encodeBatch = function(msgs){
var res = &[&,
for(var i=0, l=msgs. i&l; i++) {
if(i & 0) {
res += &,&;
msg = msgs[i];
if(typeof msg === &string&) {
res += JSON.stringify(msg);
res += &]&;
module.exports = S
这个是干嘛用的?
手机上连接这个connector节省流量的
这是把mqtt 服务器端直接集成在pomelo中么?自己写的服务器socket?这里面又qos这些feature么?
我本以为应该是pomelo与另外一个mqtt服务器交互。比如Mosquitto之类的。
没有qos 我也是打算用第三方的broker 不知道官方的是什么思路 发出来就想听听他们的
@ 目前我想到的,可以用一个第三方broker,pomelo作为一个特殊设备注册进去。客户端向上就向这个设备push消息。但是这样又不好模拟request-response了。有点麻烦。其实不一定要官方开源mqtt connector,希望给个思路讨论一下,看怎么和pomelo结合最好。
可能我对mqtt了解的还不够深入。目前看来,我觉得mqtt最有意义的就是它的qos功能了。至于其他的流量消耗少之类的,我觉得倒不重要。因为业务层的数据量是固定的,协议层多几个字节少几个字节倒意义不大。同一个数据接口,用socket.io, mqtt,pomelo-hybirdsocket其实数据量上差不了多少。
mqtt可以带来一种新的交互接口架构设计:http+mqtt, 传统接口使用http+json,这样可以复用很多现有资源和框架。 mqtt负责push推送。对于实时性要求不是很高的话,这样架构相对简单很多
不过,用http+mqtt,貌似就没有必要用pomelo了。nginx+mqtt就感觉很强悍了,呵呵。
@ 主要我是要和网页端整合在一起 实现推送 兼容ie6+
移动端最重要的就是流量耗电量了,协议层少几个字节影响还是蛮大的,除过业务层数据其实更多的是心跳数据……
请问一下 mqtt 协议 他的心跳是如何发的?
检测到IP,只对针对? 还是说sub了几条,每一条都会发心跳
mqtt这个包是默认每隔30秒发一个
@ 不太了解你什么意思;心跳就是维护长连接而且,定期检测对方是否在线。心跳时间客户端可以设置……
@ mqtt怎么获取客户端ip port啊 愁死了 网上搜不到都 代码里也找不到
@ 你们客户端和后端服务之间会有lvs或者其他什么负载均衡吧,可以获取的到的。我们用的是lvs,可以搞定client.stream.remoteAddress,client.stream.remotePort
@ 这个我在昨天刚看到 在源码里找到了 client.stream是Net.Socket的子类 我现在的mqttconnector用在chat demo上可以双向通信了
@ You are right!确实是node的socket类

我要回帖

更多关于 paho.mqtt.client 的文章

 

随机推荐