Hi大家好,今天给大家分享下MQTT的入门教学,能帮助大家快速上手使用。
首先在学习MQTT协议前,让我们先了解下MQTT是个什么东西,以下链接简单的介绍了MQTT协议的含义,以及它的特性。没了解过的同学先花几分钟熟悉下,看不懂没关系,在接下来的步骤中会慢慢解释。
https://www.runoob.com/w3cnote/mqtt-intro.htmlhttps://www.runoob.com/w3cnote/mqtt-intro.html
我们先需要下载MQTT的服务器:emqx-broker (MQTT的服务器不止这一种,本次主要以为emqx来讲解)
下载地址为:https://www.emqx.io/
点击下载按钮,选择EMQX开源版,选择需要下载的版本号和操作系统windows(如果你是使用linux系统也可以下载相应的centos)
下载完毕之后解压,我们会得到一个EMQX的文件夹,我们进入emqx中,在进入bin中,并用cmd打开这个路径,然后使用emqx start启动这个emqx的服务器。
此时代表emqx启动成功,(如果要关闭emqx则需要使用命令cd bin,先进入bin目录下,再使用emqx stop关闭服务器)
我们现在需要判断emqx是否启动成功,先随便打开一个浏览器输入:
http://127.0.0.1:18083
127.0.0.1代表的是你的本机的ip地址,18083代表的是端口号,进入登录界面的端口号。
登录账号为:admin 密码为public
此时登录成功,可以在设置中开启中文,此时也可以点击客户端,查看此时连接emqx服务器的客户端。(此时并没有客户端连接
我们接着打开cmd 输入命令ipconfig,可以看到适配器的ipv4的ip为:192.168.15.1
我们也可以使用http://192.168.15.1:18083 进入登录页面,账号密码相同,其实本质上来使用192.168.15.1还是127.0.0.1用谁打开服务器都没影响,因为他们都是属于你本机ip地址,但是在后面的客户端连接服务器时,需要使用192.168.15.1这个ip,不然的话可能到时候会连接不上。
接下来我们来考虑客户端连接服务端的问题:
我使用的是android studio来进行安卓开发,首先我们先新建一个项目。
第一步我们先导入我们所需要的mqtt的jar包:org.eclipse.paho.client.mqttv3-1.2.5.jar(因为mqtt不上java自动的库文件,我们需要自己下载导入)
mqtt jar 下载地址:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/
将下载的jar包复制至libs目录下,并右击mqtt jar包 ADD As Libray.. ,将mqtt jar包导入库文件中。
首先在MainActivity的java文件中确定编写mqtt的客户端代码如下:
package com.example.mqtt_test1; import androidx.appcompat.app.AppCompatActivity; import android.annotation.SuppressLint; import android.content.DialogInterface; import android.os.Bundle; import android.os.Handler; import android.os.Message; import android.view.View; import android.widget.TextView; import android.widget.Toast; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.json.JSONObject; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class MainActivity extends AppCompatActivity { private String host = "tcp://192.168.15.1:1883"; private String userName = "admin"; private String passWord = "public"; private String mqtt_id="111111"; private int i = 1; private Handler handler; private MqttClient client; private String mqtt_sub_topic = "second"; //为了保证你不受到别人的消息 哈哈 private String mqtt_pub_topic ="first"; private MqttConnectOptions options; private ScheduledExecutorService scheduler; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); TextView text1 = findViewById(R.id.test1); init(); startReconnect(); handler = new Handler() { @SuppressLint("SetTextIl8n") public void handleMessage(Message msg) { super.handleMessage(msg); switch (msg.what) { case 1: //开机校验更新回传 break; case 2: //反馈回转 break; case 3: //MQTT收到消息回传 text1.setText(msg.obj.toString()); break; case 30: //连接失败 Toast.makeText(MainActivity.this,"连接失败",Toast.LENGTH_SHORT).show(); break; case 31: //连接成功 Toast.makeText(MainActivity.this,"连接成功",Toast.LENGTH_SHORT).show(); try { client.subscribe(mqtt_sub_topic,2); } catch (MqttException e) { e.printStackTrace(); } publishmessageplus(mqtt_pub_topic,"第一个客户端发送的信息"); break; default: break; } } }; } private void init() { try { //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(host, mqtt_id, new MemoryPersistence()); //MQTT的连接设置 options = new MqttConnectOptions(); //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); //设置连接的用户名 options.setUserName(userName); //设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //设置回调 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { //连接丢失后,一般在这里面进行重连 System.out.println("connectionLost----------"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //publish后会执行到这里 System.out.println("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topicName, MqttMessage message) throws Exception { //subscribe后得到的消息会执行到这里面 System.out.println("messageArrived----------"); Message msg = new Message(); msg.what = 3; msg.obj = topicName + "---" + message.toString(); handler.sendMessage(msg); } }); } catch (Exception e) { e.printStackTrace(); } } private void Mqtt_connect() { new Thread(new Runnable() { @Override public void run() { try { if (!(client.isConnected())){ client.connect(options); Message msg = new Message(); msg.what=31; handler.sendMessage(msg); } } catch (Exception e) { e.printStackTrace(); Message msg = new Message(); msg.what = 30; handler.sendMessage(msg); } } }).start(); } private void startReconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (!client.isConnected()) { Mqtt_connect(); } } }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); } private void publishmessageplus(String topic,String message2) { if (client == null || !client.isConnected()) { return; } MqttMessage message = new MqttMessage(); message.setPayload(message2.getBytes()); try { client.publish(topic,message); } catch (MqttException e) { e.printStackTrace(); } } }
在AndroidManifest.xml文件中添加网络状态,确保连接mqtt服务器!
运行启动,此时打开网页的192.168.15.1:18083可以发现有一个客户端连接成功!
两个客户端之间传递信息:首先保证两客户端的id不相同,这样才能同时连接上mqtt的服务器。
在连接上mqtt成功的位置确定要订阅的主题mqtt_sub_topic(为了测试,这个订阅的主题为另一个客户端发布的主题)
try { client.subscribe(mqtt_sub_topic,1); } catch (MqttException e) { e.printStackTrace(); }
发布信息使用的方法:
publishmessageplus(mqtt_pub_topic,"第一个客户端发送的信息");
接着再创建一个安卓项目,与第一个步骤相同,只要保证第二个项目的订阅主题是第一个项目的发布主题,发布主题是第一个的订阅主题就行!
我们查看启动两台虚拟机查看信息结果: