#!/usr/bin/env python3 """ ADB工具封装模块 该模块提供了ADB工具的封装,用于与Android设备进行通信。 """ import os import sys import time import subprocess import logging from typing import List, Optional from dataclasses import dataclass from config import Config # 初始化日志 logger = logging.getLogger("auto_clean") @dataclass class DeviceInfo: """设备信息数据类""" serial: str model: str product: str device: str class ADBTool: """ADB工具封装类""" def __init__(self, adb_path: str = Config.ADB_PATH): self.adb_path = adb_path logger.debug(f"初始化ADBTool,ADB路径: {self.adb_path}") self._check_adb_exists() def _check_adb_exists(self) -> None: """检查ADB工具是否存在""" logger.debug("开始检查ADB工具是否存在") try: result = subprocess.run([self.adb_path, "version"], capture_output=True, text=True, check=True) logger.debug(f"ADB版本信息: {result.stdout.strip()}") logger.info("ADB工具检测成功") except (subprocess.CalledProcessError, FileNotFoundError): logger.error("ADB工具未找到,请确保ADB已安装并添加到系统PATH") sys.exit(1) def run_command(self, command: List[str], timeout: int = Config.ADB_TIMEOUT) -> str: """运行ADB命令""" full_command = [self.adb_path] + command logger.debug(f"执行命令: {' '.join(full_command)}, 超时时间: {timeout}秒") try: result = subprocess.run( full_command, capture_output=True, text=True, timeout=timeout, check=True ) logger.debug(f"命令执行成功,输出: {result.stdout.strip()}") return result.stdout.strip() except subprocess.CalledProcessError as e: logger.error(f"命令执行失败: {e.stderr}") logger.debug(f"失败命令: {' '.join(full_command)}") raise except subprocess.TimeoutExpired: logger.error(f"命令执行超时({timeout}秒)") logger.debug(f"超时命令: {' '.join(full_command)}") raise def get_connected_devices(self) -> List[DeviceInfo]: """获取已连接的设备列表""" logger.debug("开始获取已连接的设备列表") try: output = self.run_command(["devices", "-l"]) logger.debug(f"设备列表原始输出: {output}") devices = [] for line in output.split('\n')[1:]: if line.strip(): parts = line.split() serial = parts[0] device_info = {} for part in parts[1:]: if ':' in part: key, value = part.split(':', 1) device_info[key] = value device = DeviceInfo( serial=serial, model=device_info.get('model', 'unknown'), product=device_info.get('product', 'unknown'), device=device_info.get('device', 'unknown') ) devices.append(device) logger.debug(f"发现设备: {serial} - {device.model}") logger.info(f"共检测到 {len(devices)} 台设备") return devices except Exception as e: logger.error(f"获取设备列表失败: {e}") logger.debug(f"获取设备列表失败详情: {str(e)}") return [] def reboot_to_recovery(self, serial: Optional[str] = None) -> None: """重启设备到Recovery模式""" logger.debug(f"重启设备到Recovery模式,设备序列号: {serial}") command = ["reboot", "recovery"] if serial: command = ["-s", serial] + command self.run_command(command) logger.info("设备正在重启到Recovery模式...") def wait_for_recovery_mode(self, serial: Optional[str] = None, timeout: int = Config.RECOVERY_WAIT_TIMEOUT) -> bool: """等待设备进入Recovery模式""" logger.debug(f"等待设备进入Recovery模式,超时时间: {timeout}秒,设备序列号: {serial}") start_time = time.time() while time.time() - start_time < timeout: try: command = ["shell", "getprop", "ro.bootmode"] if serial: command = ["-s", serial] + command output = self.run_command(command, timeout=5) logger.debug(f"Bootmode检查输出: {output}") if "recovery" in output.lower(): logger.info("设备已进入Recovery模式") return True except Exception as e: logger.debug(f"检查Recovery模式时出现异常: {str(e)}") pass time.sleep(2) elapsed = int(time.time() - start_time) logger.debug(f"等待Recovery模式中... 已等待 {elapsed}/{timeout}秒") logger.error(f"等待Recovery模式超时({timeout}秒)") return False