package com.xscm.moduleutil.service; import android.app.Notification; import android.app.NotificationChannel; import android.app.NotificationManager; import android.app.Service; import android.content.Context; import android.content.Intent; import android.content.pm.ServiceInfo; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Build; import android.os.Handler; import android.os.HandlerThread; import android.os.IBinder; import android.os.Looper; import android.util.Log; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.blankj.utilcode.util.LogUtils; import com.blankj.utilcode.util.ServiceUtils; import com.orhanobut.logger.Logger; import com.xscm.moduleutil.R; import com.xscm.moduleutil.event.RoomGiftRunable; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jetbrains.annotations.Nullable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MyMqttService extends Service implements MyEmqttConnectListener, MyEmqttMesgListener, MyEmqttSubscribeListener { private final static String TAG = "lxj"; private static int qos = 2; // private static String HOST ="tcp://81.70.45.221";//正式 private static String HOST = "tcp://47.120.21.132";//测试 private static MqttAndroidClient mqttAndroidClient; private MqttConnectOptions mMqttConnectOptions; private static boolean b = true; private static MyEmqttConnectListener mMyEmqttConnectListener; private static MyEmqttMesgListener mMyEmqttMesgListener; private static MyEmqttSubscribeListener mMyEmqttSubscribeListener; private static final String TOPIC_BOSS = "qx_room_topic"; private static final int NOTIFICATION_ID = 1; // 添加后台线程处理 private HandlerThread mqttHandlerThread; private Handler mqttHandler; private ExecutorService messageExecutorService; // 添加连接重试限制 private static final int MAX_RETRY_ATTEMPTS = 5; private int retryCount = 0; @Override public int onStartCommand(Intent intent, int flags, int startId) { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) { // API 31+ // 设置前台服务类型为 "connectedDevice" 或其他合适类型 setForegroundServiceBehavior(ServiceInfo.FOREGROUND_SERVICE_TYPE_MANIFEST); } // ⚠️ 必须在这个方法开始就调用 startForeground() Notification notification = createNotification(); startForeground(NOTIFICATION_ID, notification); return START_STICKY; } private void setForegroundServiceBehavior(int foregroundServiceTypeManifest) { // 空实现,仅用于兼容性处理 } @Override public void onCreate() { super.onCreate(); // 创建专用的HandlerThread处理MQTT操作 mqttHandlerThread = new HandlerThread("MqttServiceThread"); mqttHandlerThread.start(); mqttHandler = new Handler(mqttHandlerThread.getLooper()); // 创建线程池处理消息 messageExecutorService = Executors.newCachedThreadPool(); try { init(); } catch (MqttException e) { throw new RuntimeException(e); } } private Notification createNotification() { // 创建你的前台通知 // Notification.Builder builder = null; // if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { // builder = new Notification.Builder(this, "channel_id") // .setContentTitle("MQTT Service") // .setSmallIcon(R.mipmap.default_avatar); // } // if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { // NotificationChannel channel = new NotificationChannel( // "channel_id", "MQTT Channel", // NotificationManager.IMPORTANCE_LOW); // NotificationManager manager = getSystemService(NotificationManager.class); // manager.createNotificationChannel(channel); // } // return builder.build(); // 创建你的前台通知 Notification.Builder builder = null; if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { builder = new Notification.Builder(this, "mqtt_channel") .setContentTitle("消息服务") .setContentText("正在接收实时消息") .setSmallIcon(R.mipmap.default_avatar); } if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { NotificationChannel channel = new NotificationChannel( "mqtt_channel", "MQTT Channel", NotificationManager.IMPORTANCE_LOW); NotificationManager manager = getSystemService(NotificationManager.class); if (manager != null) { manager.createNotificationChannel(channel); } } return builder != null ? builder.build() : null; } /** * 开启服务 */ public static void startService(Context mContext) { b = true; boolean serviceRunning = ServiceUtils.isServiceRunning(MyMqttService.class.getCanonicalName()); if (!serviceRunning) { mContext.startService(new Intent(mContext, MyMqttService.class)); } } public static void stopService(Context context) { b = false; boolean serviceRunning = ServiceUtils.isServiceRunning(MyMqttService.class.getCanonicalName()); if (serviceRunning) { try { context.stopService(new Intent(context, MyMqttService.class)); } catch (Exception e) { Log.e(TAG, "Failed to stop MQTT service", e); } } } @Nullable @Override public IBinder onBind(Intent intent) { return null; } /** * 发布 (模拟其他客户端发布消息) * * @param message 消息 */ public static void publish(String topic, String message) { if (mqttAndroidClient == null) { Logger.e(TAG, "mqttAndroidClient is null", "发送失败"); return; } // 在后台线程执行发布操作 Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息 IMqttDeliveryToken publish = mqttAndroidClient.publish(topic, message.getBytes(), qos, false); publish.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "发送消息", "发送成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "发送消息", "发送失败: " + exception.getMessage()); } }); } catch (Exception e) { Logger.e(TAG, "发送消息", "发送异常: " + e.getMessage()); } }); } public static void subscribe(String topic) { // 在后台线程执行订阅操作 Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { IMqttToken subToken = mqttAndroidClient.subscribe(topic, qos); subToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { if (mMyEmqttSubscribeListener != null) { mMyEmqttSubscribeListener.onSubscribeSuccess(topic); } Logger.e(TAG, "订阅成功:" + topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { if (!TOPIC_BOSS.equals(topic) && mMyEmqttSubscribeListener != null) { mMyEmqttSubscribeListener.onSubscribeFailure(); } Logger.e(TAG, "订阅失败:" + topic + ", error: " + exception.getMessage()); } }); } } catch (Exception e) { Log.e(TAG, "订阅异常:" + topic, e); } }); } public static void cleanSubscribe(String topic) { // 在后台线程执行取消订阅操作 Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { IMqttToken subToken = mqttAndroidClient.unsubscribe(topic); subToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "取消成功" + topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "取消失败" + topic + ", error: " + exception.getMessage()); } }); } } catch (Exception e) { Log.e(TAG, "取消订阅异常:" + topic, e); } }); } /** * 初始化 */ private void init() throws MqttException { String CLIENTID = "android-" + MqttClient.generateClientId(); mqttAndroidClient = new MqttAndroidClient(this, HOST, CLIENTID); mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调 mMqttConnectOptions = new MqttConnectOptions(); mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存 mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒 mMqttConnectOptions.setKeepAliveInterval(10); //设置心跳包发送间隔,单位:秒 mMqttConnectOptions.setUserName("public"); //设置用户名 // mMqttConnectOptions.setPassword(new char[0]); //设置密码 mMqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1); if (mqttAndroidClient != null && !mqttAndroidClient.isConnected()) { doClientConnection(); } } private void doClientConnection() throws MqttException { // 在后台线程执行连接操作 mqttHandler.post(() -> { if (mqttAndroidClient != null && !mqttAndroidClient.isConnected() && isConnectIsNomarl() && b) { try { mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener); } catch (Exception e) { Log.e(TAG, "MQTT连接异常", e); handleConnectionFailure(); } } }); } private void handleConnectionFailure() { retryCount++; if (retryCount < MAX_RETRY_ATTEMPTS) { // 延迟重试 mqttHandler.postDelayed(() -> { if (b) { try { doClientConnection(); } catch (MqttException e) { throw new RuntimeException(e); } } }, 5000); // 5秒后重试 } else { Log.w(TAG, "达到最大重试次数,停止重试"); retryCount = 0; } } public static void closeConnection() { Handler handler = new Handler(Looper.getMainLooper()); handler.post(() -> { try { if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) { IMqttToken disconnect = mqttAndroidClient.disconnect(); disconnect.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Logger.e(TAG, "断开链接", "断开链接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Logger.e(TAG, "断开链接", "断开链接失败" + exception.getMessage()); } }); } } catch (Exception e) { Log.e(TAG, "关闭连接异常", e); } }); } /** * 判断网络是否连接 */ private boolean isConnectIsNomarl() { try { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); if (connectivityManager != null) { NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isAvailable()) { String name = info.getTypeName(); Log.i(TAG, "当前网络名称:" + name); return true; } else { Log.i(TAG, "没有可用网络"); return false; } } } catch (Exception e) { Log.e(TAG, "检查网络状态异常", e); } return false; } //MQTT是否连接成功的监听 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { retryCount = 0; // 重置重试计数 if (mMyEmqttConnectListener != null) { mMyEmqttConnectListener.onConnectSuccess(); } Logger.e(TAG, "链接状态:", "链接成功"); subscribe(TOPIC_BOSS); } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { if (mMyEmqttConnectListener != null) { mMyEmqttConnectListener.onConnectFailure(); } // if (arg0 instanceof MqttException) { //// Logger.e(TAG, "链接状态:", "链接失败" + ((MqttException) arg1).getMessage()); // } // 在后台线程处理重连 mqttHandler.postDelayed(() -> { if (b) { handleConnectionFailure(); } }, 1000); } }; //订阅主题的回调 private MqttCallback mqttCallback = new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 将消息处理放到后台线程执行 messageExecutorService.execute(() -> { try { String messageStr = message.toString(); Logger.e(TAG, "收到的消息", "主题:" + topic + " 收到的消息:" + messageStr); // 处理消息 receiveMessage(topic, messageStr); // 通知监听器 if (mMyEmqttMesgListener != null) { // 切换到主线程通知 new Handler(Looper.getMainLooper()).post(() -> { mMyEmqttMesgListener.messageArrived(topic, messageStr); }); } } catch (Exception e) { Log.e(TAG, "处理MQTT消息异常", e); } }); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { LogUtils.e("deliveryComplete---------"); } @Override public void connectionLost(Throwable arg0) { if (mMyEmqttConnectListener != null) { mMyEmqttConnectListener.onConnectFailure(); } Logger.e(TAG, "链接状态:", "链接断开: " + arg0.getMessage()); // 在后台线程处理重连 mqttHandler.postDelayed(() -> { if (b) { try { doClientConnection(); } catch (MqttException e) { throw new RuntimeException(e); } } }, 1000); } }; private void receiveMessage(String topic, String data) { try { String newdata = data;//TextLengthUtil.decode(data); JSONObject jsonObject = JSON.parseObject(newdata); int type = jsonObject.getIntValue("type"); String message = jsonObject.getString("msg"); // 将事件处理放到主线程执行 new Handler(Looper.getMainLooper()).post(() -> { processMessageType(type, message); }); } catch (Exception e) { Log.e(TAG, "解析MQTT消息异常", e); } } private void processMessageType(int type, String message) { switch (type) { case 3001://抢糖果游戏 break; case 5001://延时一秒推送房间-人气变化 break; case 5003://延时一秒推送房间-坐骑进场特效 break; case 5004://延时一秒推送房间-爵位用户进场特效 break; case 5005://推送房间-上麦申请人数变化 Logger.e("环信5005", message); break; case 5007://推送房间-用户是否禁言 1禁言2解禁 break; case 5011://推送房间-是否封麦 1封麦2解封 break; case 5013://推送房间-清空单个麦位心动值 case 5014://推送房间-清空所有麦位心动值 break; case 5015://推送房间-设置房间管理员 break; case 5016://推送房间-删除房间管理员 break; case 5017://用户开关麦 break; case 5019://推送所有人-横幅礼物通知 new RoomGiftRunable(message).run(); break; case 5020://推送房间-聊天室礼物通知 Logger.e("环信5020", message); break; case 5030: case 5021://推送所有人-小猫钓鱼钓到大礼物时通知 break; case 5022://推送房间-房间密码变化通知 0取消密码1设置或修改密码 break; case 5023://推送房间-房间心动值开关变化通知 1开2关 break; case 5024://推送房间-上麦模式变化通知 1自由2排麦 break; case 5025://推送房间-修改房间名称 break; case 5027://推送房间-周星用户进场特效 break; case 5028://推送房间-修改房间背景 break; case 5029://推送房间-修改房间公告 break; case 5032://推送房间-上麦 Logger.e("环信5032", message); break; case 5033://推送房间-下麦 Logger.e("环信5033", message); break; case 5034://踢出房间 Logger.e("环信5034", message); break; case 5035://推送单独用户-定向推向给上麦的用户 break; case 5036://推送房间-用户禁麦 1禁麦2解禁 break; case 5037://推送房间-用户进入房间 break; case 5038://麦位倒计时 break; case 5039://扔骰子 break; case 5040://开通守护推送 break; case 5041://发送表情 break; case 5042://上传即构日志 break; case 5043://公屏状态 break; case 5044://开球 break; case 5045://弃球 break; case 5046://亮球 break; case 5047://调音 break; case 5050://推送 break; case 5051://需求变化 break; case 5054://房主模式切换 break; case 5055://离开房间 Logger.e("环信5055", message); break; case 5056://房主加入 break; case 5057://房间浇水礼物推送 break; case 5058://切换相亲房状态 break; case 5059://相亲房礼物动画 break; case 5060://房间玫瑰爱神礼物推送 break; case 5061://交友房心动值变化 Logger.e("环信5061", message); break; case 5062://交友房换麦 Logger.e("环信5062", message); break; case 5063://进入小黑屋 Logger.e("环信5063", message); break; case 5064://退出小黑屋 Logger.e("环信5064", message); break; case 5065://点击开始后进行提示弹框 Logger.e("环信5065", message); break; case 5066://cp对数 Logger.e("环信5066", message); break; case 5067://延迟时间 Logger.e("环信5067", message); break; case 5068://房间内广播 Logger.e("环信5068", message); break; case 5069://房间内换麦 Logger.e("环信5069", message); break; case 10001: //房间红包 break; case 10002: //雨开始 break; case 10003: //打开红包 break; case 7001: //奖池进度更新 break; case 7002://cp时间到 Logger.e("环信7002", message); break; case 5070: Logger.e("环信5070", message); break; } } @Override public void onDestroy() { b = false; try { // 清理资源 if (messageExecutorService != null) { messageExecutorService.shutdown(); try { if (!messageExecutorService.awaitTermination(5, TimeUnit.SECONDS)) { messageExecutorService.shutdownNow(); } } catch (InterruptedException e) { messageExecutorService.shutdownNow(); } } if (mqttHandlerThread != null) { mqttHandlerThread.quitSafely(); } cleanSubscribe(TOPIC_BOSS); if (mqttAndroidClient != null) { mqttAndroidClient.disconnect(); //断开连接 mqttAndroidClient.unregisterResources(); mqttAndroidClient = null; } stopForeground(true); // 停止前台服务 Logger.e(TAG, "服务关闭", "资源释放成功"); } catch (Exception e) { Log.e(TAG, "服务关闭异常", e); } super.onDestroy(); } public static void addMyEmqttMesgListener(MyEmqttMesgListener myEmqttMesgListener) { mMyEmqttMesgListener = myEmqttMesgListener; } public static void addMyEmqttConnectListener(MyEmqttConnectListener myEmqttConnectListener) { mMyEmqttConnectListener = myEmqttConnectListener; } public static void addMyEmqttSubscribeListener(MyEmqttSubscribeListener myEmqttSubscribeListener) { mMyEmqttSubscribeListener = myEmqttSubscribeListener; } @Override public void onConnectSuccess() { } @Override public void onConnectFailure() { } @Override public void messageArrived(String topic, String mesg) { } @Override public void onSubscribeSuccess(String topic) { } @Override public void onSubscribeFailure() { } }