Files
Class-Widgets/network_thread.py
2025-06-11 15:48:20 +08:00

484 lines
19 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import shutil
import zipfile # 解压插件zip
from datetime import datetime
import requests
from PyQt5.QtCore import QThread, pyqtSignal, QEventLoop
from loguru import logger
from packaging.version import Version
import conf
import utils
import weather_db as db
from conf import base_directory
from file import config_center
headers = {"User-Agent": "Mozilla/5.0", "Cache-Control": "no-cache"} # 设置请求头
# proxies = {"http": "http://127.0.0.1:10809", "https": "http://127.0.0.1:10809"} # 加速访问
proxies = {"http": None, "https": None}
MIRROR_PATH = f"{base_directory}/config/mirror.json"
PLAZA_REPO_URL = "https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/"
PLAZA_REPO_DIR = "https://api.github.com/repos/Class-Widgets/plugin-plaza/contents/"
threads = []
# 读取镜像配置
mirror_list = []
try:
with open(MIRROR_PATH, 'r', encoding='utf-8') as file:
mirror_dict = json.load(file).get('gh_mirror')
except Exception as e:
logger.error(f"读取镜像配置失败: {e}")
for name in mirror_dict:
mirror_list.append(name)
if config_center.read_conf('Plugin', 'mirror') not in mirror_list: # 如果当前配置不在镜像列表中,则设置为默认镜像
logger.warning(f"当前配置不在镜像列表中,设置为默认镜像: {mirror_list[0]}")
config_center.write_conf('Plugin', 'mirror', mirror_list[0])
class getRepoFileList(QThread): # 获取仓库文件目录
repo_signal = pyqtSignal(dict)
def __init__(
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Banner/banner.json'
):
super().__init__()
self.download_url = url
def run(self):
try:
plugin_info_data = self.get_plugin_info()
self.repo_signal.emit(plugin_info_data)
except Exception as e:
logger.error(f"触发banner信息失败: {e}")
def get_plugin_info(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"获取banner信息失败{response.status_code}")
return {"error": response.status_code}
except Exception as e:
logger.error(f"获取banner信息失败{e}")
return {"error": e}
class getPluginInfo(QThread): # 获取插件信息(json)
repo_signal = pyqtSignal(dict)
def __init__(
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Plugins/plugin_list.json'
):
super().__init__()
self.download_url = url
def run(self):
try:
plugin_info_data = self.get_plugin_info()
self.repo_signal.emit(plugin_info_data)
except Exception as e:
logger.error(f"触发插件信息失败: {e}")
def get_plugin_info(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"获取插件信息失败:{response.status_code}")
return {}
except Exception as e:
logger.error(f"获取插件信息失败:{e}")
return {}
class getTags(QThread): # 获取插件标签(json)
repo_signal = pyqtSignal(dict)
def __init__(
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Plugins/plaza_detail.json'
):
super().__init__()
self.download_url = url
def run(self):
try:
plugin_info_data = self.get_plugin_info()
self.repo_signal.emit(plugin_info_data)
except Exception as e:
logger.error(f"触发Tag信息失败: {e}")
def get_plugin_info(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"获取Tag信息失败{response.status_code}")
return {}
except Exception as e:
logger.error(f"获取Tag信息失败{e}")
return {}
class getImg(QThread): # 获取图片
repo_signal = pyqtSignal(bytes)
def __init__(self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Banner/banner_1.png'):
super().__init__()
self.download_url = url
def run(self):
try:
banner_data = self.get_banner()
if banner_data is not None:
self.repo_signal.emit(banner_data)
else:
with open(f"{base_directory}/img/plaza/banner_pre.png", 'rb') as default_img: # 读取默认图片
self.repo_signal.emit(default_img.read())
except Exception as e:
logger.error(f"触发图片失败: {e}")
def get_banner(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers)
if response.status_code == 200:
return response.content
else:
logger.error(f"获取图片失败:{response.status_code}")
return None
except Exception as e:
logger.error(f"获取图片失败:{e}")
return None
class getReadme(QThread): # 获取README
html_signal = pyqtSignal(str)
def __init__(self, url='https://raw.githubusercontent.com/Class-Widgets/Class-Widgets/main/README.md'):
super().__init__()
self.download_url = url
def run(self):
try:
readme_data = self.get_readme()
self.html_signal.emit(readme_data)
except Exception as e:
logger.error(f"触发README失败: {e}")
def get_readme(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
# print(url)
response = requests.get(url, proxies=proxies)
if response.status_code == 200:
return response.text
else:
logger.error(f"获取README失败{response.status_code}")
return ''
except Exception as e:
logger.error(f"获取README失败{e}")
return ''
class getCity(QThread):
def __init__(self, url='https://qifu-api.baidubce.com/ip/local/geo/v1/district'):
super().__init__()
self.download_url = url
def run(self):
try:
city_data = self.get_city()
config_center.write_conf('Weather', 'city', db.search_code_by_name(city_data))
except Exception as e:
logger.error(f"获取城市失败: {e}")
def get_city(self):
try:
req = requests.get(self.download_url, proxies=proxies)
if req.status_code == 200:
data = req.json()
# {"code":"Success","data":{"continent":"","country":"中国","zipcode":"","owner":"","isp":"","adcode":"","prov":"","city":"","district":""},"ip":"45.192.96.246"}
if data['code'] == 'Success':
data = data['data']
logger.info(f"获取城市成功:{data['city']}, {data['district']}")
return (data['city'], data['district'])
else:
logger.error(f"获取城市失败:{data['message']}")
return ('', '')
else:
logger.error(f"获取城市失败:{req.status_code}")
return ('', '')
except Exception as e:
logger.error(f"获取城市失败:{e}")
return ('', '')
class VersionThread(QThread): # 获取最新版本号
version_signal = pyqtSignal(dict)
_instance_running = False
def __init__(self):
super().__init__()
def run(self):
version = self.get_latest_version()
self.version_signal.emit(version)
@classmethod
def is_running(cls):
return cls._instance_running
@staticmethod
def get_latest_version():
url = "https://classwidgets.rinlit.cn/version.json"
try:
logger.info(f"正在获取版本信息")
response = requests.get(url, proxies=proxies, timeout=30)
logger.debug(f"更新请求响应: {response.status_code}")
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"无法获取版本信息 错误代码:{response.status_code},响应内容: {response.text}")
return {'error': f"请求失败,错误代码:{response.status_code}"}
except requests.exceptions.RequestException as e:
logger.error(f"请求失败,错误详情:{str(e)}")
return {"error": f"请求失败\n{str(e)}"}
class getDownloadUrl(QThread):
# 定义信号,通知下载进度或完成
geturl_signal = pyqtSignal(str)
def __init__(self, username, repo):
super().__init__()
self.username = username
self.repo = repo
def run(self):
try:
url = f"https://api.github.com/repos/{self.username}/{self.repo}/releases/latest"
response = requests.get(url, proxies=proxies)
if response.status_code == 200:
data = response.json()
for asset in data['assets']: # 遍历下载链接
if isinstance(asset, dict) and 'browser_download_url' in asset:
asset_url = asset['browser_download_url']
self.geturl_signal.emit(asset_url)
elif response.status_code == 403: # 触发API限制
logger.warning("到达Github API限制请稍后再试")
response = requests.get('https://api.github.com/users/octocat', proxies=proxies)
reset_time = response.headers.get('X-RateLimit-Reset')
reset_time = datetime.fromtimestamp(int(reset_time))
self.geturl_signal.emit(f"ERROR: 由于请求次数过多到达Github API限制请在{reset_time.minute}分钟后再试")
else:
logger.error(f"网络连接错误:{response.status_code}")
except Exception as e:
logger.error(f"获取下载链接错误: {e}")
self.geturl_signal.emit(f"获取下载链接错误: {e}")
class DownloadAndExtract(QThread): # 下载并解压插件
progress_signal = pyqtSignal(float) # 进度
status_signal = pyqtSignal(str) # 状态
def __init__(self, url, plugin_name='test_114'):
super().__init__()
self.download_url = url
print(self.download_url)
self.cache_dir = "cache"
self.plugin_name = plugin_name
self.extract_dir = conf.PLUGINS_DIR # 插件目录
def run(self):
try:
enabled_plugins = conf.load_plugin_config() # 加载启用的插件
os.makedirs(self.cache_dir, exist_ok=True)
os.makedirs(self.extract_dir, exist_ok=True)
zip_path = os.path.join(self.cache_dir, f'{self.plugin_name}.zip')
self.status_signal.emit("DOWNLOADING")
self.download_file(zip_path)
self.status_signal.emit("EXTRACTING")
self.extract_zip(zip_path)
os.remove(zip_path)
print(enabled_plugins)
if (
self.plugin_name not in enabled_plugins['enabled_plugins']
and config_center.read_conf('Plugin', 'auto_enable_plugin') == '1'
):
logger.info(f"自动启用插件: {self.plugin_name}")
enabled_plugins['enabled_plugins'].append(self.plugin_name)
conf.save_plugin_config(enabled_plugins)
self.status_signal.emit("DONE")
except Exception as e:
self.status_signal.emit(f"错误: {e}")
logger.error(f"插件下载/解压失败: {e}")
def stop(self):
self._running = False
self.terminate()
def download_file(self, file_path):
# time.sleep(555) # 模拟下载时间
try:
self.download_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')] + self.download_url
print(self.download_url)
response = requests.get(self.download_url, stream=True, proxies=proxies)
if response.status_code != 200:
logger.error(f"插件下载失败,错误代码: {response.status_code}")
self.status_signal.emit(f'ERROR: 网络连接错误:{response.status_code}')
return
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
with open(file_path, 'wb') as file:
for chunk in response.iter_content(1024):
file.write(chunk)
downloaded_size += len(chunk)
progress = (downloaded_size / total_size) * 100 if total_size > 0 else 0 # 计算进度
self.progress_signal.emit(progress)
except Exception as e:
self.status_signal.emit(f'ERROR: {e}')
logger.error(f"插件下载错误: {e}")
def extract_zip(self, zip_path):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(self.extract_dir)
for p_dir in os.listdir(self.extract_dir):
if p_dir.startswith(self.plugin_name) and len(p_dir) > len(self.plugin_name):
new_name = p_dir.rsplit('-', 1)[0]
if os.path.exists(os.path.join(self.extract_dir, new_name)):
shutil.copytree(
os.path.join(self.extract_dir, p_dir), os.path.join(self.extract_dir, new_name),
dirs_exist_ok=True)
shutil.rmtree(os.path.join(self.extract_dir, p_dir))
else:
os.rename(os.path.join(self.extract_dir, p_dir), os.path.join(self.extract_dir, new_name))
except Exception as e:
logger.error(f"解压失败: {e}")
def check_update():
global threads
if VersionThread.is_running():
logger.debug("已存在版本检查线程在运行,跳过本检查")
return
# 清理已终止的线程
threads = [t for t in threads if t.isRunning()]
# 创建新的版本检查线程
version_thread = VersionThread()
threads.append(version_thread)
version_thread.version_signal.connect(check_version)
version_thread.start()
def check_version(version): # 检查更新
global threads
for thread in threads:
thread.terminate()
threads = []
if 'error' in version:
utils.tray_icon.push_error_notification(
"检查更新失败!",
f"检查更新失败!\n{version['error']}"
)
return False
channel = int(config_center.read_conf("Other", "version_channel"))
server_version = version['version_release' if channel == 0 else 'version_beta']
local_version = config_center.read_conf("Other", "version")
logger.debug(f"服务端版本: {Version(server_version)},本地版本: {Version(local_version)}")
if Version(server_version) > Version(local_version):
utils.tray_icon.push_update_notification(f"新版本速递:{server_version}\n请在“设置”中了解更多。")
class weatherReportThread(QThread): # 获取最新天气信息
weather_signal = pyqtSignal(dict)
def __init__(self):
super().__init__()
def run(self):
try:
weather_data = self.get_weather_data()
self.weather_signal.emit(weather_data)
except Exception as e:
logger.error(f"触发天气信息失败: {e}")
finally:
self.deleteLater()
@staticmethod
def get_weather_data():
location_key = config_center.read_conf('Weather', 'city')
if location_key == '0':
city_thread = getCity()
loop = QEventLoop()
city_thread.finished.connect(loop.quit)
city_thread.start()
loop.exec_() # 阻塞到完成
location_key = config_center.read_conf('Weather', 'city')
if location_key == '0' or not location_key:
location_key = 101010100
days = 1
key = config_center.read_conf('Weather', 'api_key')
url = db.get_weather_url().format(location_key=location_key, days=days, key=key)
alert_url = db.get_weather_alert_url()
try:
data_group = {'now': {}, 'alert': {}}
response_now = requests.get(url, proxies=proxies) # 禁用代理
if alert_url == 'NotSupported':
logger.warning(f"当前API不支持天气预警信息")
elif alert_url is None:
logger.warning(f"无单独天气预警信息API")
else:
alert_url = alert_url.format(location_key=location_key, key=key)
response_alert = requests.get(alert_url, proxies=proxies)
if response_alert.status_code == 200:
data_alert = response_alert.json()
data_group['alert'] = data_alert
else:
logger.error(f"获取天气预警信息失败:{response_alert.status_code}")
if response_now.status_code == 200:
data = response_now.json()
data_group['now'] = data
return data_group
else:
logger.error(f"获取天气信息失败:{response_now.status_code}")
return {'error': {'info': {'value': '错误', 'unit': response_now.status_code}}}
except requests.exceptions.RequestException as e: # 请求失败
logger.error(f"获取天气信息失败:{e}")
return {'error': {'info': {'value': '错误', 'unit': ''}}}
except Exception as e:
logger.error(f"获取天气信息失败:{e}")
return {'error': {'info': {'value': '错误', 'unit': ''}}}