Files
yusheng-android/BaseModule/src/main/java/com/xscm/moduleutil/service/MqttConnect.java

222 lines
7.1 KiB
Java
Raw Normal View History

2025-10-24 17:55:15 +08:00
package com.xscm.moduleutil.service;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;
import com.blankj.utilcode.util.LogUtils;
import com.google.android.gms.common.api.Api;
import com.hjq.toast.ToastUtils;
import com.xscm.moduleutil.utils.logger.DataLogger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.ArrayList;
public class MqttConnect {
private String HOST;
private String Tag = "MQTT";
private String clientId = "";
private static MqttClient mqttClient = null;
private Context context;
// 订阅主题
public static String shutdown = "";
public static String update_app = "";
public static String qx_hour_ranking = "";
public static String qx_redpacket_arrive="";//红包飘屏的主题
Handler handler = new Handler(Looper.getMainLooper());
String[] topic;
int[] qos = {1,2,3,0,0,0,0,0,0,0,0,0,0}; // 消息质量
private static MqttConnect instance;
public MqttConnect(Context context, String host, String clientId) {
this.HOST = host;
this.context = context;
this.clientId = clientId;
// 这里是你自己需要订阅的主题
shutdown = "qx_room_topic"; // 关机
update_app = "qx_xunlehui"; // 发送更新APP
// qx_hour_ranking = "qx_hour_ranking";
qx_hour_ranking = "qx_hour_ranking";
qx_redpacket_arrive="qx_redpacket_arrive";
ArrayList<String> topicList = new ArrayList<>();
topicList.add(shutdown);
topicList.add(update_app);
topicList.add(qx_hour_ranking);
topicList.add(qx_redpacket_arrive);
topic = topicList.toArray(new String[0]);
}
/**
* 单列模式,只能实例化一次
* @param context
* @param host
* @param clientId
* @return
*/
public static synchronized MqttConnect getInstance(Context context, String host, String clientId) {
if (instance == null) {
instance = new MqttConnect(context, host, clientId);
}
return instance;
}
/**
* 客户端connect连接mqtt服务器
**/
public void mqttClient()
{
// close();
// handler.postDelayed(new Runnable() {
// @Override
// public void run() {
try {
// uiTip("MQTT开始连接");
MqttConnectOptions options = mqttConnectOptions();
mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId));
mqttClient.connect(options);
// sub(topic,qos);
sub(shutdown);
sub(update_app);
sub(qx_hour_ranking);
sub(qx_redpacket_arrive);
// uiTip("MQTT连接成功");
}catch (MqttException e){
// uiTip("MQTT连接失败,准备重连。。。:"+e.getMessage());
handler.postDelayed(new Runnable() {
@Override
public void run() {
2025-11-11 15:07:14 +08:00
LogUtils.e(Tag,"开始重连。。。");
2025-10-24 17:55:15 +08:00
mqttClient();
}
},3000);
}
// }
// },200);
}
/**
* 在主线程弹出消息
* @param msg
*/
private void uiTip(String msg){
2025-11-11 15:07:14 +08:00
LogUtils.d(Tag,msg);
2025-10-24 17:55:15 +08:00
handler.post(new Runnable() {
@Override
public void run() {
// Toast.makeText(context.getApplicationContext(), msg, Toast.LENGTH_SHORT).show();
LogUtils.e("mqtt","连接成功");
ToastUtils.show(msg);
}
});
}
/**
* MQTT连接参数设置
*/
private MqttConnectOptions mqttConnectOptions()
throws MqttException {
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("public");
options.setConnectionTimeout(10);
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(10);
return options;
}
/**
* 关闭MQTT连接
*/
public void close(){
if(mqttClient != null && mqttClient.isConnected()){
try {
mqttClient.close();
mqttClient.disconnect();
mqttClient = null;
} catch (MqttException e) {
2025-11-11 15:07:14 +08:00
LogUtils.e(Tag,"关闭MQTT连接报错"+e.getMessage());
2025-10-24 17:55:15 +08:00
// ToastUtils.show("关闭MQTT连接报错");
}
}else {
2025-11-11 15:07:14 +08:00
LogUtils.d(Tag,"Mqtt已关闭");
2025-10-24 17:55:15 +08:00
}
}
/**
* 向某个主题发布消息 默认qos1
*/
public static void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某个主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos012
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅某一个主题 此方法默认的的Qos等级为1
*
* @param topic 主题
*/
public void sub(String topic){
try {
mqttClient.subscribe(topic);
} catch (MqttException e) {
2025-11-11 15:07:14 +08:00
LogUtils.e(Tag,"MQTT主题订阅失败" + e.getMessage());
2025-10-24 17:55:15 +08:00
// uiTip("MQTT主题订阅失败");
}
}
/**
* 订阅某一个主题可携带Qos
*
* @param topic 所要订阅的主题
* @param qos
* 消息质量0最多发送一次不保证消息能够到达接收端也不负责重发
* 1至少发送一次确保消息能够到达接收端但可能会导致消息重复
* 2确保消息恰好被接收一次
*/
public void sub(String[] topic, int[] qos){
try {
mqttClient.subscribe(topic, qos);
}catch (MqttException e){
2025-11-11 15:07:14 +08:00
LogUtils.e(Tag,"订阅主题失败:"+e.getMessage());
2025-10-24 17:55:15 +08:00
}
}
// 原文链接https://blog.csdn.net/Fyx1987496919/article/details/140516525
}