import yaml import requests import zipfile from pathlib import Path from utils.config_utils import load_config, save_config class UpdateManager: UPDATE_URL = "https://o.nmgjg.com.cn/EZProxy/update.yaml" CONFIG_URL = "https://o.nmgjg.com.cn/EZProxy/v2ray.json" APP_URL = "https://o.nmgjg.com.cn/EZProxy/main.zip" def __init__(self, app_dir): self.app_dir = Path(app_dir) self.local_update_yaml = self.app_dir / "conf" / "update.yaml" self.config = load_config() def check_updates(self): """检查更新""" try: response = requests.get(self.UPDATE_URL, timeout=10) response.raise_for_status() remote = yaml.safe_load(response.text) # 检查配置更新 config_update = self._check_config_update(remote) # 检查应用更新 app_update = self._check_app_update(remote) return { 'config': config_update, 'app': app_update, 'changelog_url': remote.get('changelog_url', '') } except Exception as e: logging.error(f"检查更新失败: {str(e)}") return None def _check_config_update(self, remote): """检查配置模板更新""" local_version = self.config.get('config_version', '0.0') remote_version = remote.get('config_version', '0.0') if remote_version != local_version: return { 'available': True, 'version': remote_version, 'force': remote.get('force_config_update', False) } return {'available': False} def update_config(self): """更新配置模板""" try: response = requests.get(self.CONFIG_URL, timeout=15) response.raise_for_status() template_path = self.app_dir / "conf" / "v2ray_template.json" with open(template_path, 'w') as f: f.write(response.text) # 更新本地版本号 self.config['config_version'] = self._get_remote_version()['config_version'] save_config(self.config) return True except Exception as e: logging.error(f"更新配置失败: {str(e)}") return False def _prepare_update_helper(self): """准备更新助手""" helper_path = self.app_dir / "update_helper.py" temp_dir = get_temp_dir() temp_dir.mkdir(parents=True, exist_ok=True) # 复制更新助手到临时目录 temp_helper = temp_dir / "update_helper.py" with open(helper_path, 'r') as src, open(temp_helper, 'w') as dst: dst.write(src.read()) # 设置执行权限 temp_helper.chmod(0o755) return temp_helper def start_app_update(self): """启动应用更新流程""" temp_dir = get_temp_dir() zip_path = temp_dir / "main.zip" try: # 下载更新包 response = requests.get(self.APP_URL, stream=True, timeout=30) response.raise_for_status() with open(zip_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # 准备更新助手 helper = self._prepare_update_helper() # 启动更新助手(在新进程中) import subprocess subprocess.Popen([ sys.executable, str(helper), str(zip_path), str(self.app_dir), str(temp_dir) ]) return True except Exception as e: logging.error(f"更新准备失败: {str(e)}") return False