146 lines
5.4 KiB
Python
146 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
ADB工具封装模块
|
||
|
||
该模块提供了ADB工具的封装,用于与Android设备进行通信。
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import subprocess
|
||
import logging
|
||
from typing import List, Optional
|
||
from dataclasses import dataclass
|
||
|
||
from config import Config
|
||
|
||
# 初始化日志
|
||
logger = logging.getLogger("auto_clean")
|
||
|
||
|
||
@dataclass
|
||
class DeviceInfo:
|
||
"""设备信息数据类"""
|
||
serial: str
|
||
model: str
|
||
product: str
|
||
device: str
|
||
|
||
|
||
class ADBTool:
|
||
"""ADB工具封装类"""
|
||
|
||
def __init__(self, adb_path: str = Config.ADB_PATH):
|
||
self.adb_path = adb_path
|
||
logger.debug(f"初始化ADBTool,ADB路径: {self.adb_path}")
|
||
self._check_adb_exists()
|
||
|
||
def _check_adb_exists(self) -> None:
|
||
"""检查ADB工具是否存在"""
|
||
logger.debug("开始检查ADB工具是否存在")
|
||
try:
|
||
result = subprocess.run([self.adb_path, "version"], capture_output=True, text=True, check=True)
|
||
logger.debug(f"ADB版本信息: {result.stdout.strip()}")
|
||
logger.info("ADB工具检测成功")
|
||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||
logger.error("ADB工具未找到,请确保ADB已安装并添加到系统PATH")
|
||
sys.exit(1)
|
||
|
||
def run_command(self, command: List[str], timeout: int = Config.ADB_TIMEOUT) -> str:
|
||
"""运行ADB命令"""
|
||
full_command = [self.adb_path] + command
|
||
logger.debug(f"执行命令: {' '.join(full_command)}, 超时时间: {timeout}秒")
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
full_command,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout,
|
||
check=True
|
||
)
|
||
logger.debug(f"命令执行成功,输出: {result.stdout.strip()}")
|
||
return result.stdout.strip()
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"命令执行失败: {e.stderr}")
|
||
logger.debug(f"失败命令: {' '.join(full_command)}")
|
||
raise
|
||
except subprocess.TimeoutExpired:
|
||
logger.error(f"命令执行超时({timeout}秒)")
|
||
logger.debug(f"超时命令: {' '.join(full_command)}")
|
||
raise
|
||
|
||
def get_connected_devices(self) -> List[DeviceInfo]:
|
||
"""获取已连接的设备列表"""
|
||
logger.debug("开始获取已连接的设备列表")
|
||
try:
|
||
output = self.run_command(["devices", "-l"])
|
||
logger.debug(f"设备列表原始输出: {output}")
|
||
|
||
devices = []
|
||
|
||
for line in output.split('\n')[1:]:
|
||
if line.strip():
|
||
parts = line.split()
|
||
serial = parts[0]
|
||
device_info = {}
|
||
|
||
for part in parts[1:]:
|
||
if ':' in part:
|
||
key, value = part.split(':', 1)
|
||
device_info[key] = value
|
||
|
||
device = DeviceInfo(
|
||
serial=serial,
|
||
model=device_info.get('model', 'unknown'),
|
||
product=device_info.get('product', 'unknown'),
|
||
device=device_info.get('device', 'unknown')
|
||
)
|
||
devices.append(device)
|
||
logger.debug(f"发现设备: {serial} - {device.model}")
|
||
|
||
logger.info(f"共检测到 {len(devices)} 台设备")
|
||
return devices
|
||
except Exception as e:
|
||
logger.error(f"获取设备列表失败: {e}")
|
||
logger.debug(f"获取设备列表失败详情: {str(e)}")
|
||
return []
|
||
|
||
def reboot_to_recovery(self, serial: Optional[str] = None) -> None:
|
||
"""重启设备到Recovery模式"""
|
||
logger.debug(f"重启设备到Recovery模式,设备序列号: {serial}")
|
||
command = ["reboot", "recovery"]
|
||
if serial:
|
||
command = ["-s", serial] + command
|
||
|
||
self.run_command(command)
|
||
logger.info("设备正在重启到Recovery模式...")
|
||
|
||
def wait_for_recovery_mode(self, serial: Optional[str] = None, timeout: int = Config.RECOVERY_WAIT_TIMEOUT) -> bool:
|
||
"""等待设备进入Recovery模式"""
|
||
logger.debug(f"等待设备进入Recovery模式,超时时间: {timeout}秒,设备序列号: {serial}")
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < timeout:
|
||
try:
|
||
command = ["shell", "getprop", "ro.bootmode"]
|
||
if serial:
|
||
command = ["-s", serial] + command
|
||
|
||
output = self.run_command(command, timeout=5)
|
||
logger.debug(f"Bootmode检查输出: {output}")
|
||
|
||
if "recovery" in output.lower():
|
||
logger.info("设备已进入Recovery模式")
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"检查Recovery模式时出现异常: {str(e)}")
|
||
pass
|
||
|
||
time.sleep(2)
|
||
elapsed = int(time.time() - start_time)
|
||
logger.debug(f"等待Recovery模式中... 已等待 {elapsed}/{timeout}秒")
|
||
|
||
logger.error(f"等待Recovery模式超时({timeout}秒)")
|
||
return False |