Files
midi-android/moduleUtil/src/main/java/com/xscm/moduleutil/service/MyMqttService.java

695 lines
25 KiB
Java

package com.xscm.moduleutil.service;
import android.app.Notification;
import android.app.NotificationChannel;
import android.app.NotificationManager;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.pm.ServiceInfo;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.IBinder;
import android.os.Looper;
import android.util.Log;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.blankj.utilcode.util.LogUtils;
import com.blankj.utilcode.util.ServiceUtils;
import com.orhanobut.logger.Logger;
import com.xscm.moduleutil.R;
import com.xscm.moduleutil.event.RoomGiftRunable;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MyMqttService extends Service implements MyEmqttConnectListener, MyEmqttMesgListener, MyEmqttSubscribeListener {
private final static String TAG = "lxj";
private static int qos = 2;
// private static String HOST ="tcp://81.70.45.221";//正式
private static String HOST = "tcp://1.13.181.248";//测试
// private static String HOST = "tcp://47.120.21.132";//测试
private static MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mMqttConnectOptions;
private static boolean b = true;
private static MyEmqttConnectListener mMyEmqttConnectListener;
private static MyEmqttMesgListener mMyEmqttMesgListener;
private static MyEmqttSubscribeListener mMyEmqttSubscribeListener;
private static final String TOPIC_BOSS = "qx_room_topic";
private static final int NOTIFICATION_ID = 1;
// 添加后台线程处理
private HandlerThread mqttHandlerThread;
private Handler mqttHandler;
private ExecutorService messageExecutorService;
// 添加连接重试限制
private static final int MAX_RETRY_ATTEMPTS = 5;
private int retryCount = 0;
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) { // API 31+
// 设置前台服务类型为 "connectedDevice" 或其他合适类型
setForegroundServiceBehavior(ServiceInfo.FOREGROUND_SERVICE_TYPE_MANIFEST);
}
// ⚠️ 必须在这个方法开始就调用 startForeground()
Notification notification = createNotification();
startForeground(NOTIFICATION_ID, notification);
return START_STICKY;
}
private void setForegroundServiceBehavior(int foregroundServiceTypeManifest) {
// 空实现,仅用于兼容性处理
}
@Override
public void onCreate() {
super.onCreate();
// 创建专用的HandlerThread处理MQTT操作
mqttHandlerThread = new HandlerThread("MqttServiceThread");
mqttHandlerThread.start();
mqttHandler = new Handler(mqttHandlerThread.getLooper());
// 创建线程池处理消息
messageExecutorService = Executors.newCachedThreadPool();
try {
init();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
private Notification createNotification() {
// 创建你的前台通知
// Notification.Builder builder = null;
// if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
// builder = new Notification.Builder(this, "channel_id")
// .setContentTitle("MQTT Service")
// .setSmallIcon(R.mipmap.default_avatar);
// }
// if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
// NotificationChannel channel = new NotificationChannel(
// "channel_id", "MQTT Channel",
// NotificationManager.IMPORTANCE_LOW);
// NotificationManager manager = getSystemService(NotificationManager.class);
// manager.createNotificationChannel(channel);
// }
// return builder.build();
// 创建你的前台通知
Notification.Builder builder = null;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
builder = new Notification.Builder(this, "mqtt_channel")
.setContentTitle("消息服务")
.setContentText("正在接收实时消息")
.setSmallIcon(R.mipmap.default_avatar);
}
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
NotificationChannel channel = new NotificationChannel(
"mqtt_channel", "MQTT Channel",
NotificationManager.IMPORTANCE_LOW);
NotificationManager manager = getSystemService(NotificationManager.class);
if (manager != null) {
manager.createNotificationChannel(channel);
}
}
return builder != null ? builder.build() : null;
}
/**
* 开启服务
*/
public static void startService(Context mContext) {
b = true;
boolean serviceRunning = ServiceUtils.isServiceRunning(MyMqttService.class.getCanonicalName());
if (!serviceRunning) {
mContext.startService(new Intent(mContext, MyMqttService.class));
}
}
public static void stopService(Context context) {
b = false;
boolean serviceRunning = ServiceUtils.isServiceRunning(MyMqttService.class.getCanonicalName());
if (serviceRunning) {
try {
context.stopService(new Intent(context, MyMqttService.class));
} catch (Exception e) {
Log.e(TAG, "Failed to stop MQTT service", e);
}
}
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
/**
* 发布 (模拟其他客户端发布消息)
*
* @param message 消息
*/
public static void publish(String topic, String message) {
if (mqttAndroidClient == null) {
Logger.e(TAG, "mqttAndroidClient is null", "发送失败");
return;
}
// 在后台线程执行发布操作
Handler handler = new Handler(Looper.getMainLooper());
handler.post(() -> {
try {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
IMqttDeliveryToken publish = mqttAndroidClient.publish(topic, message.getBytes(), qos, false);
publish.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Logger.e(TAG, "发送消息", "发送成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Logger.e(TAG, "发送消息", "发送失败: " + exception.getMessage());
}
});
} catch (Exception e) {
Logger.e(TAG, "发送消息", "发送异常: " + e.getMessage());
}
});
}
public static void subscribe(String topic) {
// 在后台线程执行订阅操作
Handler handler = new Handler(Looper.getMainLooper());
handler.post(() -> {
try {
if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
IMqttToken subToken = mqttAndroidClient.subscribe(topic, qos);
subToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
if (mMyEmqttSubscribeListener != null) {
mMyEmqttSubscribeListener.onSubscribeSuccess(topic);
}
Logger.e(TAG, "订阅成功:" + topic);
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
if (!TOPIC_BOSS.equals(topic) && mMyEmqttSubscribeListener != null) {
mMyEmqttSubscribeListener.onSubscribeFailure();
}
Logger.e(TAG, "订阅失败:" + topic + ", error: " + exception.getMessage());
}
});
}
} catch (Exception e) {
Log.e(TAG, "订阅异常:" + topic, e);
}
});
}
public static void cleanSubscribe(String topic) {
// 在后台线程执行取消订阅操作
Handler handler = new Handler(Looper.getMainLooper());
handler.post(() -> {
try {
if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
IMqttToken subToken = mqttAndroidClient.unsubscribe(topic);
subToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Logger.e(TAG, "取消成功" + topic);
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Logger.e(TAG, "取消失败" + topic + ", error: " + exception.getMessage());
}
});
}
} catch (Exception e) {
Log.e(TAG, "取消订阅异常:" + topic, e);
}
});
}
/**
* 初始化
*/
private void init() throws MqttException {
String CLIENTID = "android-" + MqttClient.generateClientId();
mqttAndroidClient = new MqttAndroidClient(this, HOST, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
mMqttConnectOptions.setKeepAliveInterval(10); //设置心跳包发送间隔,单位:秒
mMqttConnectOptions.setUserName("public"); //设置用户名
// mMqttConnectOptions.setPassword(new char[0]); //设置密码
mMqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
if (mqttAndroidClient != null && !mqttAndroidClient.isConnected()) {
doClientConnection();
}
}
private void doClientConnection() throws MqttException {
// 在后台线程执行连接操作
mqttHandler.post(() -> {
if (mqttAndroidClient != null && !mqttAndroidClient.isConnected() && isConnectIsNomarl() && b) {
try {
mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
} catch (Exception e) {
Log.e(TAG, "MQTT连接异常", e);
handleConnectionFailure();
}
}
});
}
private void handleConnectionFailure() {
retryCount++;
if (retryCount < MAX_RETRY_ATTEMPTS) {
// 延迟重试
mqttHandler.postDelayed(() -> {
if (b) {
try {
doClientConnection();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}, 5000); // 5秒后重试
} else {
Log.w(TAG, "达到最大重试次数,停止重试");
retryCount = 0;
}
}
public static void closeConnection() {
Handler handler = new Handler(Looper.getMainLooper());
handler.post(() -> {
try {
if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
IMqttToken disconnect = mqttAndroidClient.disconnect();
disconnect.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Logger.e(TAG, "断开链接", "断开链接成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Logger.e(TAG, "断开链接", "断开链接失败" + exception.getMessage());
}
});
}
} catch (Exception e) {
Log.e(TAG, "关闭连接异常", e);
}
});
}
/**
* 判断网络是否连接
*/
private boolean isConnectIsNomarl() {
try {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
if (connectivityManager != null) {
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "当前网络名称:" + name);
return true;
} else {
Log.i(TAG, "没有可用网络");
return false;
}
}
} catch (Exception e) {
Log.e(TAG, "检查网络状态异常", e);
}
return false;
}
//MQTT是否连接成功的监听
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
retryCount = 0; // 重置重试计数
if (mMyEmqttConnectListener != null) {
mMyEmqttConnectListener.onConnectSuccess();
}
Logger.e(TAG, "链接状态:", "链接成功");
subscribe(TOPIC_BOSS);
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
if (mMyEmqttConnectListener != null) {
mMyEmqttConnectListener.onConnectFailure();
}
// if (arg0 instanceof MqttException) {
//// Logger.e(TAG, "链接状态:", "链接失败" + ((MqttException) arg1).getMessage());
// }
// 在后台线程处理重连
mqttHandler.postDelayed(() -> {
if (b) {
handleConnectionFailure();
}
}, 1000);
}
};
//订阅主题的回调
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 将消息处理放到后台线程执行
messageExecutorService.execute(() -> {
try {
String messageStr = message.toString();
Logger.e(TAG, "收到的消息", "主题:" + topic + " 收到的消息:" + messageStr);
// 处理消息
receiveMessage(topic, messageStr);
// 通知监听器
if (mMyEmqttMesgListener != null) {
// 切换到主线程通知
new Handler(Looper.getMainLooper()).post(() -> {
mMyEmqttMesgListener.messageArrived(topic, messageStr);
});
}
} catch (Exception e) {
Log.e(TAG, "处理MQTT消息异常", e);
}
});
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
LogUtils.e("deliveryComplete---------");
}
@Override
public void connectionLost(Throwable arg0) {
if (mMyEmqttConnectListener != null) {
mMyEmqttConnectListener.onConnectFailure();
}
Logger.e(TAG, "链接状态:", "链接断开: " + arg0.getMessage());
// 在后台线程处理重连
mqttHandler.postDelayed(() -> {
if (b) {
try {
doClientConnection();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}, 1000);
}
};
private void receiveMessage(String topic, String data) {
try {
String newdata = data;//TextLengthUtil.decode(data);
JSONObject jsonObject = JSON.parseObject(newdata);
int type = jsonObject.getIntValue("type");
String message = jsonObject.getString("msg");
// 将事件处理放到主线程执行
new Handler(Looper.getMainLooper()).post(() -> {
processMessageType(type, message);
});
} catch (Exception e) {
Log.e(TAG, "解析MQTT消息异常", e);
}
}
private void processMessageType(int type, String message) {
switch (type) {
case 3001://抢糖果游戏
break;
case 5001://延时一秒推送房间-人气变化
break;
case 5003://延时一秒推送房间-坐骑进场特效
break;
case 5004://延时一秒推送房间-爵位用户进场特效
break;
case 5005://推送房间-上麦申请人数变化
Logger.e("环信5005", message);
break;
case 5007://推送房间-用户是否禁言 1禁言2解禁
break;
case 5011://推送房间-是否封麦 1封麦2解封
break;
case 5013://推送房间-清空单个麦位心动值
case 5014://推送房间-清空所有麦位心动值
break;
case 5015://推送房间-设置房间管理员
break;
case 5016://推送房间-删除房间管理员
break;
case 5017://用户开关麦
break;
case 5019://推送所有人-横幅礼物通知
new RoomGiftRunable(message).run();
break;
case 5020://推送房间-聊天室礼物通知
Logger.e("环信5020", message);
break;
case 5030:
case 5021://推送所有人-小猫钓鱼钓到大礼物时通知
break;
case 5022://推送房间-房间密码变化通知 0取消密码1设置或修改密码
break;
case 5023://推送房间-房间心动值开关变化通知 1开2关
break;
case 5024://推送房间-上麦模式变化通知 1自由2排麦
break;
case 5025://推送房间-修改房间名称
break;
case 5027://推送房间-周星用户进场特效
break;
case 5028://推送房间-修改房间背景
break;
case 5029://推送房间-修改房间公告
break;
case 5032://推送房间-上麦
Logger.e("环信5032", message);
break;
case 5033://推送房间-下麦
Logger.e("环信5033", message);
break;
case 5034://踢出房间
Logger.e("环信5034", message);
break;
case 5035://推送单独用户-定向推向给上麦的用户
break;
case 5036://推送房间-用户禁麦 1禁麦2解禁
break;
case 5037://推送房间-用户进入房间
break;
case 5038://麦位倒计时
break;
case 5039://扔骰子
break;
case 5040://开通守护推送
break;
case 5041://发送表情
break;
case 5042://上传即构日志
break;
case 5043://公屏状态
break;
case 5044://开球
break;
case 5045://弃球
break;
case 5046://亮球
break;
case 5047://调音
break;
case 5050://推送
break;
case 5051://需求变化
break;
case 5054://房主模式切换
break;
case 5055://离开房间
Logger.e("环信5055", message);
break;
case 5056://房主加入
break;
case 5057://房间浇水礼物推送
break;
case 5058://切换相亲房状态
break;
case 5059://相亲房礼物动画
break;
case 5060://房间玫瑰爱神礼物推送
break;
case 5061://交友房心动值变化
Logger.e("环信5061", message);
break;
case 5062://交友房换麦
Logger.e("环信5062", message);
break;
case 5063://进入小黑屋
Logger.e("环信5063", message);
break;
case 5064://退出小黑屋
Logger.e("环信5064", message);
break;
case 5065://点击开始后进行提示弹框
Logger.e("环信5065", message);
break;
case 5066://cp对数
Logger.e("环信5066", message);
break;
case 5067://延迟时间
Logger.e("环信5067", message);
break;
case 5068://房间内广播
Logger.e("环信5068", message);
break;
case 5069://房间内换麦
Logger.e("环信5069", message);
break;
case 10001: //房间红包
break;
case 10002: //雨开始
break;
case 10003: //打开红包
break;
case 7001: //奖池进度更新
break;
case 7002://cp时间到
Logger.e("环信7002", message);
break;
case 5070:
Logger.e("环信5070", message);
break;
}
}
@Override
public void onDestroy() {
b = false;
try {
// 清理资源
if (messageExecutorService != null) {
messageExecutorService.shutdown();
try {
if (!messageExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
messageExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
messageExecutorService.shutdownNow();
}
}
if (mqttHandlerThread != null) {
mqttHandlerThread.quitSafely();
}
cleanSubscribe(TOPIC_BOSS);
if (mqttAndroidClient != null) {
mqttAndroidClient.disconnect(); //断开连接
mqttAndroidClient.unregisterResources();
mqttAndroidClient = null;
}
stopForeground(true); // 停止前台服务
Logger.e(TAG, "服务关闭", "资源释放成功");
} catch (Exception e) {
Log.e(TAG, "服务关闭异常", e);
}
super.onDestroy();
}
public static void addMyEmqttMesgListener(MyEmqttMesgListener myEmqttMesgListener) {
mMyEmqttMesgListener = myEmqttMesgListener;
}
public static void addMyEmqttConnectListener(MyEmqttConnectListener myEmqttConnectListener) {
mMyEmqttConnectListener = myEmqttConnectListener;
}
public static void addMyEmqttSubscribeListener(MyEmqttSubscribeListener myEmqttSubscribeListener) {
mMyEmqttSubscribeListener = myEmqttSubscribeListener;
}
@Override
public void onConnectSuccess() {
}
@Override
public void onConnectFailure() {
}
@Override
public void messageArrived(String topic, String mesg) {
}
@Override
public void onSubscribeSuccess(String topic) {
}
@Override
public void onSubscribeFailure() {
}
}