146 lines
5.4 KiB
Python
146 lines
5.4 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
ADB工具封装模块
|
|||
|
|
|
|||
|
|
该模块提供了ADB工具的封装,用于与Android设备进行通信。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import subprocess
|
|||
|
|
import logging
|
|||
|
|
from typing import List, Optional
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
|
|||
|
|
from config import Config
|
|||
|
|
|
|||
|
|
# 初始化日志
|
|||
|
|
logger = logging.getLogger("auto_clean")
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class DeviceInfo:
|
|||
|
|
"""设备信息数据类"""
|
|||
|
|
serial: str
|
|||
|
|
model: str
|
|||
|
|
product: str
|
|||
|
|
device: str
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ADBTool:
|
|||
|
|
"""ADB工具封装类"""
|
|||
|
|
|
|||
|
|
def __init__(self, adb_path: str = Config.ADB_PATH):
|
|||
|
|
self.adb_path = adb_path
|
|||
|
|
logger.debug(f"初始化ADBTool,ADB路径: {self.adb_path}")
|
|||
|
|
self._check_adb_exists()
|
|||
|
|
|
|||
|
|
def _check_adb_exists(self) -> None:
|
|||
|
|
"""检查ADB工具是否存在"""
|
|||
|
|
logger.debug("开始检查ADB工具是否存在")
|
|||
|
|
try:
|
|||
|
|
result = subprocess.run([self.adb_path, "version"], capture_output=True, text=True, check=True)
|
|||
|
|
logger.debug(f"ADB版本信息: {result.stdout.strip()}")
|
|||
|
|
logger.info("ADB工具检测成功")
|
|||
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|||
|
|
logger.error("ADB工具未找到,请确保ADB已安装并添加到系统PATH")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
def run_command(self, command: List[str], timeout: int = Config.ADB_TIMEOUT) -> str:
|
|||
|
|
"""运行ADB命令"""
|
|||
|
|
full_command = [self.adb_path] + command
|
|||
|
|
logger.debug(f"执行命令: {' '.join(full_command)}, 超时时间: {timeout}秒")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
result = subprocess.run(
|
|||
|
|
full_command,
|
|||
|
|
capture_output=True,
|
|||
|
|
text=True,
|
|||
|
|
timeout=timeout,
|
|||
|
|
check=True
|
|||
|
|
)
|
|||
|
|
logger.debug(f"命令执行成功,输出: {result.stdout.strip()}")
|
|||
|
|
return result.stdout.strip()
|
|||
|
|
except subprocess.CalledProcessError as e:
|
|||
|
|
logger.error(f"命令执行失败: {e.stderr}")
|
|||
|
|
logger.debug(f"失败命令: {' '.join(full_command)}")
|
|||
|
|
raise
|
|||
|
|
except subprocess.TimeoutExpired:
|
|||
|
|
logger.error(f"命令执行超时({timeout}秒)")
|
|||
|
|
logger.debug(f"超时命令: {' '.join(full_command)}")
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def get_connected_devices(self) -> List[DeviceInfo]:
|
|||
|
|
"""获取已连接的设备列表"""
|
|||
|
|
logger.debug("开始获取已连接的设备列表")
|
|||
|
|
try:
|
|||
|
|
output = self.run_command(["devices", "-l"])
|
|||
|
|
logger.debug(f"设备列表原始输出: {output}")
|
|||
|
|
|
|||
|
|
devices = []
|
|||
|
|
|
|||
|
|
for line in output.split('\n')[1:]:
|
|||
|
|
if line.strip():
|
|||
|
|
parts = line.split()
|
|||
|
|
serial = parts[0]
|
|||
|
|
device_info = {}
|
|||
|
|
|
|||
|
|
for part in parts[1:]:
|
|||
|
|
if ':' in part:
|
|||
|
|
key, value = part.split(':', 1)
|
|||
|
|
device_info[key] = value
|
|||
|
|
|
|||
|
|
device = DeviceInfo(
|
|||
|
|
serial=serial,
|
|||
|
|
model=device_info.get('model', 'unknown'),
|
|||
|
|
product=device_info.get('product', 'unknown'),
|
|||
|
|
device=device_info.get('device', 'unknown')
|
|||
|
|
)
|
|||
|
|
devices.append(device)
|
|||
|
|
logger.debug(f"发现设备: {serial} - {device.model}")
|
|||
|
|
|
|||
|
|
logger.info(f"共检测到 {len(devices)} 台设备")
|
|||
|
|
return devices
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取设备列表失败: {e}")
|
|||
|
|
logger.debug(f"获取设备列表失败详情: {str(e)}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def reboot_to_recovery(self, serial: Optional[str] = None) -> None:
|
|||
|
|
"""重启设备到Recovery模式"""
|
|||
|
|
logger.debug(f"重启设备到Recovery模式,设备序列号: {serial}")
|
|||
|
|
command = ["reboot", "recovery"]
|
|||
|
|
if serial:
|
|||
|
|
command = ["-s", serial] + command
|
|||
|
|
|
|||
|
|
self.run_command(command)
|
|||
|
|
logger.info("设备正在重启到Recovery模式...")
|
|||
|
|
|
|||
|
|
def wait_for_recovery_mode(self, serial: Optional[str] = None, timeout: int = Config.RECOVERY_WAIT_TIMEOUT) -> bool:
|
|||
|
|
"""等待设备进入Recovery模式"""
|
|||
|
|
logger.debug(f"等待设备进入Recovery模式,超时时间: {timeout}秒,设备序列号: {serial}")
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
while time.time() - start_time < timeout:
|
|||
|
|
try:
|
|||
|
|
command = ["shell", "getprop", "ro.bootmode"]
|
|||
|
|
if serial:
|
|||
|
|
command = ["-s", serial] + command
|
|||
|
|
|
|||
|
|
output = self.run_command(command, timeout=5)
|
|||
|
|
logger.debug(f"Bootmode检查输出: {output}")
|
|||
|
|
|
|||
|
|
if "recovery" in output.lower():
|
|||
|
|
logger.info("设备已进入Recovery模式")
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.debug(f"检查Recovery模式时出现异常: {str(e)}")
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
time.sleep(2)
|
|||
|
|
elapsed = int(time.time() - start_time)
|
|||
|
|
logger.debug(f"等待Recovery模式中... 已等待 {elapsed}/{timeout}秒")
|
|||
|
|
|
|||
|
|
logger.error(f"等待Recovery模式超时({timeout}秒)")
|
|||
|
|
return False
|