Files
EZProxy/child/core/update_manager.py
2025-11-10 17:29:11 +08:00

115 lines
3.9 KiB
Python

import yaml
import requests
import zipfile
from pathlib import Path
from utils.config_utils import load_config, save_config
class UpdateManager:
UPDATE_URL = "https://o.nmgjg.com.cn/EZProxy/update.yaml"
CONFIG_URL = "https://o.nmgjg.com.cn/EZProxy/v2ray.json"
APP_URL = "https://o.nmgjg.com.cn/EZProxy/main.zip"
def __init__(self, app_dir):
self.app_dir = Path(app_dir)
self.local_update_yaml = self.app_dir / "conf" / "update.yaml"
self.config = load_config()
def check_updates(self):
"""检查更新"""
try:
response = requests.get(self.UPDATE_URL, timeout=10)
response.raise_for_status()
remote = yaml.safe_load(response.text)
# 检查配置更新
config_update = self._check_config_update(remote)
# 检查应用更新
app_update = self._check_app_update(remote)
return {
'config': config_update,
'app': app_update,
'changelog_url': remote.get('changelog_url', '')
}
except Exception as e:
logging.error(f"检查更新失败: {str(e)}")
return None
def _check_config_update(self, remote):
"""检查配置模板更新"""
local_version = self.config.get('config_version', '0.0')
remote_version = remote.get('config_version', '0.0')
if remote_version != local_version:
return {
'available': True,
'version': remote_version,
'force': remote.get('force_config_update', False)
}
return {'available': False}
def update_config(self):
"""更新配置模板"""
try:
response = requests.get(self.CONFIG_URL, timeout=15)
response.raise_for_status()
template_path = self.app_dir / "conf" / "v2ray_template.json"
with open(template_path, 'w') as f:
f.write(response.text)
# 更新本地版本号
self.config['config_version'] = self._get_remote_version()['config_version']
save_config(self.config)
return True
except Exception as e:
logging.error(f"更新配置失败: {str(e)}")
return False
def _prepare_update_helper(self):
"""准备更新助手"""
helper_path = self.app_dir / "update_helper.py"
temp_dir = get_temp_dir()
temp_dir.mkdir(parents=True, exist_ok=True)
# 复制更新助手到临时目录
temp_helper = temp_dir / "update_helper.py"
with open(helper_path, 'r') as src, open(temp_helper, 'w') as dst:
dst.write(src.read())
# 设置执行权限
temp_helper.chmod(0o755)
return temp_helper
def start_app_update(self):
"""启动应用更新流程"""
temp_dir = get_temp_dir()
zip_path = temp_dir / "main.zip"
try:
# 下载更新包
response = requests.get(self.APP_URL, stream=True, timeout=30)
response.raise_for_status()
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 准备更新助手
helper = self._prepare_update_helper()
# 启动更新助手(在新进程中)
import subprocess
subprocess.Popen([
sys.executable,
str(helper),
str(zip_path),
str(self.app_dir),
str(temp_dir)
])
return True
except Exception as e:
logging.error(f"更新准备失败: {str(e)}")
return False