Files
yusheng-android/BaseModule/src/main/java/com/xscm/moduleutil/service/MqttConnect.java
梁小江 4d2c1a5ace 1:取消所有的切换后台、im重连给服务段发送接口
2:修改1058,所有房间添加展示离线
2025-12-02 14:11:40 +08:00

214 lines
6.7 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.xscm.moduleutil.service;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;
import com.blankj.utilcode.util.ActivityUtils;
import com.blankj.utilcode.util.LogUtils;
import com.google.android.gms.common.api.Api;
import com.hjq.toast.ToastUtils;
import com.xscm.moduleutil.utils.logger.DataLogger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.ArrayList;
public class MqttConnect {
private String HOST;
private String Tag = "MQTT";
private String clientId = "";
private static MqttClient mqttClient = null;
private Context context;
// 订阅主题
public static String shutdown = "";
public static String update_app = "";
public static String qx_hour_ranking = "";
public static String qx_redpacket_arrive = "";//红包飘屏的主题
Handler handler = new Handler(Looper.getMainLooper());
String[] topic;
int[] qos = {1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; // 消息质量
private static MqttConnect instance;
public MqttConnect(Context context, String host, String clientId) {
this.HOST = host;
this.context = context;
this.clientId = clientId;
// 这里是你自己需要订阅的主题
shutdown = "qx_room_topic"; // 房间飘屏
update_app = "qx_xunlehui"; // 巡乐会飘屏
qx_hour_ranking = "qx_hour_ranking";//小时榜飘屏
qx_redpacket_arrive = "qx_redpacket_arrive"; //红包飘屏的主题
ArrayList<String> topicList = new ArrayList<>();
topicList.add(shutdown);
topicList.add(update_app);
topicList.add(qx_hour_ranking);
topicList.add(qx_redpacket_arrive);
topic = topicList.toArray(new String[0]);
}
/**
* 单列模式,只能实例化一次
*
* @param context
* @param host
* @param clientId
* @return
*/
public static synchronized MqttConnect getInstance(Context context, String host, String clientId) {
if (instance == null) {
instance = new MqttConnect(context, host, clientId);
}
return instance;
}
/**
* 客户端connect连接mqtt服务器
**/
public void mqttClient() {
close();
// 使用线程或线程池
new Thread(() -> {
try {
MqttConnectOptions options = mqttConnectOptions();
mqttClient.setCallback(new MqttInitCallback(context, HOST, clientId));
// 在子线程连接不会阻塞UI
mqttClient.connect(options);
// 订阅也在子线程
// sub(topic, qos);
sub(shutdown);
sub(update_app);
sub(qx_hour_ranking);
sub(qx_redpacket_arrive);
// UI操作回到主线程
ActivityUtils.getTopActivity().runOnUiThread(() -> uiTip("MQTT连接成功"));
} catch (Exception e) {
ActivityUtils.getTopActivity().runOnUiThread(() -> {
uiTip("MQTT连接失败,准备重连: " + e.getMessage());
});
// 延迟重连
handler.postDelayed(() -> mqttClient(), 3000);
}
}).start();
}
/**
* 在主线程弹出消息
*
* @param msg
*/
private void uiTip(String msg) {
LogUtils.d(Tag, msg);
}
/**
* MQTT连接参数设置
*/
private MqttConnectOptions mqttConnectOptions()
throws MqttException {
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("public");
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(10);
return options;
}
/**
* 关闭MQTT连接
*/
public void close() {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.close();
mqttClient.disconnect();
mqttClient = null;
} catch (MqttException e) {
LogUtils.e(Tag, "关闭MQTT连接报错" + e.getMessage());
// ToastUtils.show("关闭MQTT连接报错");
}
} else {
LogUtils.d(Tag, "Mqtt已关闭");
}
}
/**
* 向某个主题发布消息 默认qos1
*/
public static void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某个主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos0、1、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅某一个主题 此方法默认的的Qos等级为1
*
* @param topic 主题
*/
public void sub(String topic) {
try {
mqttClient.subscribe(topic);
} catch (MqttException e) {
LogUtils.e(Tag, "MQTT主题订阅失败" + e.getMessage());
// uiTip("MQTT主题订阅失败");
}
}
/**
* 订阅某一个主题可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量0最多发送一次不保证消息能够到达接收端也不负责重发
* 1至少发送一次确保消息能够到达接收端但可能会导致消息重复
* 2确保消息恰好被接收一次
*/
public void sub(String[] topic, int[] qos) {
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
LogUtils.e(Tag, "订阅主题失败:" + e.getMessage());
}
}
// 原文链接https://blog.csdn.net/Fyx1987496919/article/details/140516525
}