package com.xscm.moduleutil.service; import android.app.Notification; import android.app.NotificationChannel; import android.app.NotificationManager; import android.content.Context; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Build; import android.os.Handler; import android.os.HandlerThread; import android.os.Looper; import android.util.Log; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.blankj.utilcode.util.LogUtils; import com.orhanobut.logger.Logger; import com.xscm.moduleutil.R; import com.xscm.moduleutil.event.RoomGiftRunable; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MyMqttService implements MyEmqttConnectListener, MyEmqttMesgListener, MyEmqttSubscribeListener { private final static String TAG = "lxj"; private static int qos = 2; private static String HOST = "tcp://1.13.181.248";//测试 private static MqttAndroidClient mqttAndroidClient; private MqttConnectOptions mMqttConnectOptions; private static boolean b = true; // 使用单例模式 private static MyMqttService instance; // 使用线程安全的集合存储监听器 private static final CopyOnWriteArrayList messageListeners = new CopyOnWriteArrayList<>(); private static final CopyOnWriteArrayList connectListeners = new CopyOnWriteArrayList<>(); private static final CopyOnWriteArrayList subscribeListeners = new CopyOnWriteArrayList<>(); private static final String TOPIC_BOSS = "qx_room_topic"; private static final String TOPIC_XLH = "qx_xunlehui"; // 添加后台线程处理 private HandlerThread mqttHandlerThread; private Handler mqttHandler; private ExecutorService messageExecutorService; // 添加连接重试限制 private static final int MAX_RETRY_ATTEMPTS = 5; private int retryCount = 0; // 服务状态 private static boolean isServiceRunning = false; private Context mContext; // 私有构造函数 private MyMqttService(Context context) { this.mContext = context.getApplicationContext(); initService(); } public static MyMqttService getInstance(Context context) { if (instance == null) { synchronized (MyMqttService.class) { if (instance == null) { instance = new MyMqttService(context); } } } return instance; } private void initService() { isServiceRunning = true; // 创建专用的HandlerThread处理MQTT操作 mqttHandlerThread = new HandlerThread("MqttServiceThread"); mqttHandlerThread.start(); mqttHandler = new Handler(mqttHandlerThread.getLooper()); // 创建线程池处理消息 messageExecutorService = Executors.newCachedThreadPool(); try { init(); } catch (MqttException e) { Log.e(TAG, "MQTT初始化异常", e); } } /** * 启动MQTT连接 */ public void startService() { b = true; if (!isServiceRunning) { initService(); } } /** * 停止MQTT服务 */ public void stopService() { b = false; isServiceRunning = false; cleanup(); } /** * 初始化 */ private void init() throws MqttException { String CLIENTID = "android-" + MqttClient.generateClientId(); mqttAndroidClient = new MqttAndroidClient(mContext, HOST, CLIENTID); mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调 mMqttConnectOptions = new MqttConnectOptions(); mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存 mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒 mMqttConnectOptions.setKeepAliveInterval(10); //设置心跳包发送间隔,单位:秒 mMqttConnectOptions.setUserName("public"); //设置用户名 mMqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1); if (mqttAndroidClient != null && !mqttAndroidClient.isConnected()) { doClientConnection(); } } private void doClientConnection() throws MqttException { // 在后台线程执行连接操作 mqttHandler.post(() -> { if (mqttAndroidClient != null && !mqttAndroidClient.isConnected() && isConnectIsNomarl() && b) { try { mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener); } catch (Exception e) { Log.e(TAG, "MQTT连接异常", e); handleConnectionFailure(); } } }); } private void handleConnectionFailure() { retryCount++; if (retryCount < MAX_RETRY_ATTEMPTS) { // 延迟重试 mqttHandler.postDelayed(() -> { if (b) { try { doClientConnection(); } catch (MqttException e) { Log.e(TAG, "MQTT连接异常", e); } } }, 5000); // 5秒后重试 } else { Log.w(TAG, "达到最大重试次数,停止重试"); retryCount = 0; } } public static void closeConnection() { Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { IMqttToken disconnect = mqttAndroidClient.disconnect(); disconnect.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "断开链接", "断开链接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "断开链接", "断开链接失败" + exception.getMessage()); } }); } } catch (Exception e) { Log.e(TAG, "关闭连接异常", e); } }); } /** * 判断网络是否连接 */ private boolean isConnectIsNomarl() { try { ConnectivityManager connectivityManager = (ConnectivityManager) mContext.getSystemService(Context.CONNECTIVITY_SERVICE); if (connectivityManager != null) { NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isAvailable()) { String name = info.getTypeName(); Log.i(TAG, "当前网络名称:" + name); return true; } else { Log.i(TAG, "没有可用网络"); return false; } } } catch (Exception e) { Log.e(TAG, "检查网络状态异常", e); } return false; } /** * 发布 (模拟其他客户端发布消息) * * @param message 消息 */ public static void publish(String topic, String message) { if (mqttAndroidClient == null) { Logger.e(TAG, "mqttAndroidClient is null", "发送失败"); return; } // 在后台线程执行发布操作 Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息 IMqttDeliveryToken publish = mqttAndroidClient.publish(topic, message.getBytes(), qos, false); publish.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "发送消息", "发送成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "发送消息", "发送失败: " + exception.getMessage()); } }); } catch (Exception e) { Logger.e(TAG, "发送消息", "发送异常: " + e.getMessage()); } }); } public static void subscribe(String topic) { // 在后台线程执行订阅操作 Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { IMqttToken subToken = mqttAndroidClient.subscribe(topic, qos); subToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { for (MyEmqttSubscribeListener listener : subscribeListeners) { if (listener != null) { listener.onSubscribeSuccess(topic); } } Logger.e(TAG, "订阅成功:" + topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { for (MyEmqttSubscribeListener listener : subscribeListeners) { if (listener != null && !TOPIC_BOSS.equals(topic) && !TOPIC_XLH.equals(topic)) { listener.onSubscribeFailure(); } } Logger.e(TAG, "订阅失败:" + topic + ", error: " + exception.getMessage()); } }); } } catch (Exception e) { Log.e(TAG, "订阅异常:" + topic, e); } }); } public static void cleanSubscribe(String topic) { // 在后台线程执行取消订阅操作 Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { IMqttToken subToken = mqttAndroidClient.unsubscribe(topic); subToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "取消成功" + topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "取消失败" + topic + ", error: " + exception.getMessage()); } }); } } catch (Exception e) { Log.e(TAG, "取消订阅异常:" + topic, e); } }); } //MQTT是否连接成功的监听 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { retryCount = 0; // 重置重试计数 // 通知所有连接监听器 for (MyEmqttConnectListener listener : connectListeners) { if (listener != null) { listener.onConnectSuccess(); } } Logger.e(TAG, "链接状态:", "链接成功"); subscribe(TOPIC_BOSS); subscribe(TOPIC_XLH); } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { // 通知所有连接监听器 for (MyEmqttConnectListener listener : connectListeners) { if (listener != null) { listener.onConnectFailure(); } } Logger.e(TAG, "链接状态:", "链接失败: " + arg1.getMessage()); // 在后台线程处理重连 mqttHandler.postDelayed(() -> { if (b) { handleConnectionFailure(); } }, 1000); } }; //订阅主题的回调 private MqttCallback mqttCallback = new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 将消息处理放到后台线程执行 messageExecutorService.execute(() -> { try { String messageStr = message.toString(); Logger.e(TAG, "收到的消息", "主题:" + topic + " 收到的消息:" + messageStr); if (topic.equals(TOPIC_BOSS)) { // 处理消息 // receiveMessage(topic, messageStr); // new Handler(Looper.getMainLooper()).post(() -> { receiveMessage(topic, messageStr); // }); // 通知监听器 // for (MyEmqttMesgListener listener : messageListeners) { // if (listener != null) { // // 切换到主线程通知 //// new Handler(Looper.getMainLooper()).post(() -> { ////// listener.messageArrived(topic, messageStr); //// }); // } // } } else if (topic.equals(TOPIC_XLH)) { // receiveXlhMessage(messageStr); // new Handler(Looper.getMainLooper()).post(() -> { receiveXlhMessage(messageStr); // }); } } catch (Exception e) { Log.e(TAG, "处理MQTT消息异常", e); } }); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { LogUtils.e("deliveryComplete---------"); } @Override public void connectionLost(Throwable arg0) { // 通知所有连接监听器 for (MyEmqttConnectListener listener : connectListeners) { if (listener != null) { listener.onConnectFailure(); } } Logger.e(TAG, "链接状态:", "链接断开: " + arg0.getMessage()); // 在后台线程处理重连 mqttHandler.postDelayed(() -> { if (b) { try { doClientConnection(); } catch (MqttException e) { Log.e(TAG, "MQTT连接异常", e); } } }, 1000); } }; private void receiveXlhMessage(String messageStr) { try { String newdata = messageStr; JSONObject jsonObject = JSON.parseObject(newdata); int type = jsonObject.getIntValue("type"); String message = jsonObject.getString("msg"); // 将事件处理放到主线程执行 new Handler(Looper.getMainLooper()).post(() -> { processMessageType(type, message); }); } catch (Exception e) { Log.e(TAG, "解析MQTT消息异常", e); } } private void receiveMessage(String topic, String data) { try { String newdata = data; JSONObject jsonObject = JSON.parseObject(newdata); int type = jsonObject.getIntValue("type"); String message = jsonObject.getString("msg"); // 将事件处理放到主线程执行 new Handler(Looper.getMainLooper()).post(() -> { processMessageType(type, message); }); } catch (Exception e) { Log.e(TAG, "解析MQTT消息异常", e); } } private void processMessageType(int type, String message) { switch (type) { case 3001://抢糖果游戏 break; case 5001://延时一秒推送房间-人气变化 break; case 5003://延时一秒推送房间-坐骑进场特效 break; case 5004://延时一秒推送房间-爵位用户进场特效 break; case 5005://推送房间-上麦申请人数变化 Logger.e("环信5005", message); break; case 5007://推送房间-用户是否禁言 1禁言2解禁 break; case 5011://推送房间-是否封麦 1封麦2解封 break; case 5013://推送房间-清空单个麦位心动值 case 5014://推送房间-清空所有麦位心动值 break; case 5015://推送房间-设置房间管理员 break; case 5016://推送房间-删除房间管理员 break; case 5017://用户开关麦 break; case 5019://推送所有人-横幅礼物通知 new RoomGiftRunable(message).run(); break; case 5020://推送房间-聊天室礼物通知 Logger.e("环信5020", message); break; case 5030: case 5021://推送所有人-小猫钓鱼钓到大礼物时通知 break; case 5022://推送房间-房间密码变化通知 0取消密码1设置或修改密码 break; case 5023://推送房间-房间心动值开关变化通知 1开2关 break; case 5024://推送房间-上麦模式变化通知 1自由2排麦 break; case 5025://推送房间-修改房间名称 break; case 5027://推送房间-周星用户进场特效 break; case 5028://推送房间-修改房间背景 break; case 5029://推送房间-修改房间公告 break; case 5032://推送房间-上麦 Logger.e("环信5032", message); break; case 5033://推送房间-下麦 Logger.e("环信5033", message); break; case 5034://踢出房间 Logger.e("环信5034", message); break; case 5035://推送单独用户-定向推向给上麦的用户 break; case 5036://推送房间-用户禁麦 1禁麦2解禁 break; case 5037://推送房间-用户进入房间 break; case 5038://麦位倒计时 break; case 5039://扔骰子 break; case 5040://开通守护推送 break; case 5041://发送表情 break; case 5042://上传即构日志 break; case 5043://公屏状态 break; case 5044://开球 break; case 5045://弃球 break; case 5046://亮球 break; case 5047://调音 break; case 5050://推送 break; case 5051://需求变化 break; case 5054://房主模式切换 break; case 5055://离开房间 Logger.e("环信5055", message); break; case 5056://房主加入 break; case 5057://房间浇水礼物推送 break; case 5058://切换相亲房状态 break; case 5059://相亲房礼物动画 break; case 5060://房间玫瑰爱神礼物推送 break; case 5061://交友房心动值变化 Logger.e("环信5061", message); break; case 5062://交友房换麦 Logger.e("环信5062", message); break; case 5063://进入小黑屋 Logger.e("环信5063", message); break; case 5064://退出小黑屋 Logger.e("环信5064", message); break; case 5065://点击开始后进行提示弹框 Logger.e("环信5065", message); break; case 5066://cp对数 Logger.e("环信5066", message); break; case 5067://延迟时间 Logger.e("环信5067", message); break; case 5068://房间内广播 Logger.e("环信5068", message); break; case 5069://房间内换麦 Logger.e("环信5069", message); break; case 10001: //房间红包 break; case 10002: //雨开始 break; case 10003: //打开红包 break; case 7001: //奖池进度更新 break; case 7002://cp时间到 Logger.e("环信7002", message); break; case 5070: Logger.e("环信5070", message); break; } } /** * 清理资源 */ public void cleanup() { try { // 清理资源 if (messageExecutorService != null) { messageExecutorService.shutdown(); try { if (!messageExecutorService.awaitTermination(5, TimeUnit.SECONDS)) { messageExecutorService.shutdownNow(); } } catch (InterruptedException e) { messageExecutorService.shutdownNow(); Thread.currentThread().interrupt(); } } if (mqttHandlerThread != null) { mqttHandlerThread.quitSafely(); } cleanSubscribe(TOPIC_BOSS); cleanSubscribe(TOPIC_XLH); if (mqttAndroidClient != null) { mqttAndroidClient.disconnect(); //断开连接 mqttAndroidClient.unregisterResources(); mqttAndroidClient = null; } Logger.e(TAG, "服务关闭", "资源释放成功"); } catch (Exception e) { Log.e(TAG, "服务关闭异常", e); } } // 修改监听器管理方法 public static void addMyEmqttMesgListener(MyEmqttMesgListener myEmqttMesgListener) { if (myEmqttMesgListener != null && !messageListeners.contains(myEmqttMesgListener)) { messageListeners.add(myEmqttMesgListener); } } public static void removeMyEmqttMesgListener(MyEmqttMesgListener myEmqttMesgListener) { messageListeners.remove(myEmqttMesgListener); } public static void addMyEmqttConnectListener(MyEmqttConnectListener myEmqttConnectListener) { if (myEmqttConnectListener != null && !connectListeners.contains(myEmqttConnectListener)) { connectListeners.add(myEmqttConnectListener); } } public static void removeMyEmqttConnectListener(MyEmqttConnectListener myEmqttConnectListener) { connectListeners.remove(myEmqttConnectListener); } public static void addMyEmqttSubscribeListener(MyEmqttSubscribeListener myEmqttSubscribeListener) { if (myEmqttSubscribeListener != null && !subscribeListeners.contains(myEmqttSubscribeListener)) { subscribeListeners.add(myEmqttSubscribeListener); } } public static void removeMyEmqttSubscribeListener(MyEmqttSubscribeListener myEmqttSubscribeListener) { subscribeListeners.remove(myEmqttSubscribeListener); } @Override public void onConnectSuccess() { // 实现接口方法 } @Override public void onConnectFailure() { // 实现接口方法 } @Override public void messageArrived(String topic, String mesg) { LogUtils.e("lxj", "messageArrived:" + mesg); } @Override public void onSubscribeSuccess(String topic) { LogUtils.e("lxj", "onSubscribeSuccess:" + topic); } @Override public void onSubscribeFailure() { LogUtils.e("lxj", "onSubscribeFailure"); } }