484 lines
19 KiB
Python
Executable File
484 lines
19 KiB
Python
Executable File
import json
|
||
import os
|
||
import shutil
|
||
import zipfile # 解压插件zip
|
||
from datetime import datetime
|
||
|
||
import requests
|
||
from PyQt5.QtCore import QThread, pyqtSignal, QEventLoop
|
||
from loguru import logger
|
||
from packaging.version import Version
|
||
|
||
import conf
|
||
import utils
|
||
import weather_db as db
|
||
from conf import base_directory
|
||
from file import config_center
|
||
|
||
headers = {"User-Agent": "Mozilla/5.0", "Cache-Control": "no-cache"} # 设置请求头
|
||
# proxies = {"http": "http://127.0.0.1:10809", "https": "http://127.0.0.1:10809"} # 加速访问
|
||
proxies = {"http": None, "https": None}
|
||
|
||
MIRROR_PATH = f"{base_directory}/config/mirror.json"
|
||
PLAZA_REPO_URL = "https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/"
|
||
PLAZA_REPO_DIR = "https://api.github.com/repos/Class-Widgets/plugin-plaza/contents/"
|
||
threads = []
|
||
|
||
# 读取镜像配置
|
||
mirror_list = []
|
||
try:
|
||
with open(MIRROR_PATH, 'r', encoding='utf-8') as file:
|
||
mirror_dict = json.load(file).get('gh_mirror')
|
||
except Exception as e:
|
||
logger.error(f"读取镜像配置失败: {e}")
|
||
|
||
for name in mirror_dict:
|
||
mirror_list.append(name)
|
||
|
||
if config_center.read_conf('Plugin', 'mirror') not in mirror_list: # 如果当前配置不在镜像列表中,则设置为默认镜像
|
||
logger.warning(f"当前配置不在镜像列表中,设置为默认镜像: {mirror_list[0]}")
|
||
config_center.write_conf('Plugin', 'mirror', mirror_list[0])
|
||
|
||
|
||
class getRepoFileList(QThread): # 获取仓库文件目录
|
||
repo_signal = pyqtSignal(dict)
|
||
|
||
def __init__(
|
||
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Banner/banner.json'
|
||
):
|
||
super().__init__()
|
||
self.download_url = url
|
||
|
||
def run(self):
|
||
try:
|
||
plugin_info_data = self.get_plugin_info()
|
||
self.repo_signal.emit(plugin_info_data)
|
||
except Exception as e:
|
||
logger.error(f"触发banner信息失败: {e}")
|
||
|
||
def get_plugin_info(self):
|
||
try:
|
||
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
|
||
url = f"{mirror_url}{self.download_url}"
|
||
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data
|
||
else:
|
||
logger.error(f"获取banner信息失败:{response.status_code}")
|
||
return {"error": response.status_code}
|
||
except Exception as e:
|
||
logger.error(f"获取banner信息失败:{e}")
|
||
return {"error": e}
|
||
|
||
|
||
class getPluginInfo(QThread): # 获取插件信息(json)
|
||
repo_signal = pyqtSignal(dict)
|
||
|
||
def __init__(
|
||
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Plugins/plugin_list.json'
|
||
):
|
||
super().__init__()
|
||
self.download_url = url
|
||
|
||
def run(self):
|
||
try:
|
||
plugin_info_data = self.get_plugin_info()
|
||
self.repo_signal.emit(plugin_info_data)
|
||
except Exception as e:
|
||
logger.error(f"触发插件信息失败: {e}")
|
||
|
||
def get_plugin_info(self):
|
||
try:
|
||
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
|
||
url = f"{mirror_url}{self.download_url}"
|
||
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data
|
||
else:
|
||
logger.error(f"获取插件信息失败:{response.status_code}")
|
||
return {}
|
||
except Exception as e:
|
||
logger.error(f"获取插件信息失败:{e}")
|
||
return {}
|
||
|
||
|
||
class getTags(QThread): # 获取插件标签(json)
|
||
repo_signal = pyqtSignal(dict)
|
||
|
||
def __init__(
|
||
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Plugins/plaza_detail.json'
|
||
):
|
||
super().__init__()
|
||
self.download_url = url
|
||
|
||
def run(self):
|
||
try:
|
||
plugin_info_data = self.get_plugin_info()
|
||
self.repo_signal.emit(plugin_info_data)
|
||
except Exception as e:
|
||
logger.error(f"触发Tag信息失败: {e}")
|
||
|
||
def get_plugin_info(self):
|
||
try:
|
||
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
|
||
url = f"{mirror_url}{self.download_url}"
|
||
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data
|
||
else:
|
||
logger.error(f"获取Tag信息失败:{response.status_code}")
|
||
return {}
|
||
except Exception as e:
|
||
logger.error(f"获取Tag信息失败:{e}")
|
||
return {}
|
||
|
||
|
||
class getImg(QThread): # 获取图片
|
||
repo_signal = pyqtSignal(bytes)
|
||
|
||
def __init__(self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Banner/banner_1.png'):
|
||
super().__init__()
|
||
self.download_url = url
|
||
|
||
def run(self):
|
||
try:
|
||
banner_data = self.get_banner()
|
||
if banner_data is not None:
|
||
self.repo_signal.emit(banner_data)
|
||
else:
|
||
with open(f"{base_directory}/img/plaza/banner_pre.png", 'rb') as default_img: # 读取默认图片
|
||
self.repo_signal.emit(default_img.read())
|
||
except Exception as e:
|
||
logger.error(f"触发图片失败: {e}")
|
||
|
||
def get_banner(self):
|
||
try:
|
||
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
|
||
url = f"{mirror_url}{self.download_url}"
|
||
response = requests.get(url, proxies=proxies, headers=headers)
|
||
if response.status_code == 200:
|
||
return response.content
|
||
else:
|
||
logger.error(f"获取图片失败:{response.status_code}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取图片失败:{e}")
|
||
return None
|
||
|
||
|
||
class getReadme(QThread): # 获取README
|
||
html_signal = pyqtSignal(str)
|
||
|
||
def __init__(self, url='https://raw.githubusercontent.com/Class-Widgets/Class-Widgets/main/README.md'):
|
||
super().__init__()
|
||
self.download_url = url
|
||
|
||
def run(self):
|
||
try:
|
||
readme_data = self.get_readme()
|
||
self.html_signal.emit(readme_data)
|
||
except Exception as e:
|
||
logger.error(f"触发README失败: {e}")
|
||
|
||
def get_readme(self):
|
||
try:
|
||
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
|
||
url = f"{mirror_url}{self.download_url}"
|
||
# print(url)
|
||
response = requests.get(url, proxies=proxies)
|
||
if response.status_code == 200:
|
||
return response.text
|
||
else:
|
||
logger.error(f"获取README失败:{response.status_code}")
|
||
return ''
|
||
except Exception as e:
|
||
logger.error(f"获取README失败:{e}")
|
||
return ''
|
||
|
||
class getCity(QThread):
|
||
|
||
def __init__(self, url='https://qifu-api.baidubce.com/ip/local/geo/v1/district'):
|
||
super().__init__()
|
||
self.download_url = url
|
||
|
||
def run(self):
|
||
try:
|
||
city_data = self.get_city()
|
||
config_center.write_conf('Weather', 'city', db.search_code_by_name(city_data))
|
||
except Exception as e:
|
||
logger.error(f"获取城市失败: {e}")
|
||
|
||
def get_city(self):
|
||
try:
|
||
req = requests.get(self.download_url, proxies=proxies)
|
||
if req.status_code == 200:
|
||
data = req.json()
|
||
# {"code":"Success","data":{"continent":"","country":"中国","zipcode":"","owner":"","isp":"","adcode":"","prov":"","city":"","district":""},"ip":"45.192.96.246"}
|
||
if data['code'] == 'Success':
|
||
data = data['data']
|
||
logger.info(f"获取城市成功:{data['city']}, {data['district']}")
|
||
return (data['city'], data['district'])
|
||
else:
|
||
logger.error(f"获取城市失败:{data['message']}")
|
||
return ('', '')
|
||
else:
|
||
logger.error(f"获取城市失败:{req.status_code}")
|
||
return ('', '')
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取城市失败:{e}")
|
||
return ('', '')
|
||
|
||
class VersionThread(QThread): # 获取最新版本号
|
||
version_signal = pyqtSignal(dict)
|
||
_instance_running = False
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
def run(self):
|
||
version = self.get_latest_version()
|
||
self.version_signal.emit(version)
|
||
|
||
@classmethod
|
||
def is_running(cls):
|
||
return cls._instance_running
|
||
|
||
@staticmethod
|
||
def get_latest_version():
|
||
url = "https://classwidgets.rinlit.cn/version.json"
|
||
try:
|
||
logger.info(f"正在获取版本信息")
|
||
response = requests.get(url, proxies=proxies, timeout=30)
|
||
logger.debug(f"更新请求响应: {response.status_code}")
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data
|
||
else:
|
||
logger.error(f"无法获取版本信息 错误代码:{response.status_code},响应内容: {response.text}")
|
||
return {'error': f"请求失败,错误代码:{response.status_code}"}
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"请求失败,错误详情:{str(e)}")
|
||
return {"error": f"请求失败\n{str(e)}"}
|
||
|
||
|
||
class getDownloadUrl(QThread):
|
||
# 定义信号,通知下载进度或完成
|
||
geturl_signal = pyqtSignal(str)
|
||
|
||
def __init__(self, username, repo):
|
||
super().__init__()
|
||
self.username = username
|
||
self.repo = repo
|
||
|
||
def run(self):
|
||
try:
|
||
url = f"https://api.github.com/repos/{self.username}/{self.repo}/releases/latest"
|
||
response = requests.get(url, proxies=proxies)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
for asset in data['assets']: # 遍历下载链接
|
||
if isinstance(asset, dict) and 'browser_download_url' in asset:
|
||
asset_url = asset['browser_download_url']
|
||
self.geturl_signal.emit(asset_url)
|
||
elif response.status_code == 403: # 触发API限制
|
||
logger.warning("到达Github API限制,请稍后再试")
|
||
response = requests.get('https://api.github.com/users/octocat', proxies=proxies)
|
||
reset_time = response.headers.get('X-RateLimit-Reset')
|
||
reset_time = datetime.fromtimestamp(int(reset_time))
|
||
self.geturl_signal.emit(f"ERROR: 由于请求次数过多,到达Github API限制,请在{reset_time.minute}分钟后再试")
|
||
else:
|
||
logger.error(f"网络连接错误:{response.status_code}")
|
||
except Exception as e:
|
||
logger.error(f"获取下载链接错误: {e}")
|
||
self.geturl_signal.emit(f"获取下载链接错误: {e}")
|
||
|
||
|
||
class DownloadAndExtract(QThread): # 下载并解压插件
|
||
progress_signal = pyqtSignal(float) # 进度
|
||
status_signal = pyqtSignal(str) # 状态
|
||
|
||
def __init__(self, url, plugin_name='test_114'):
|
||
super().__init__()
|
||
self.download_url = url
|
||
print(self.download_url)
|
||
self.cache_dir = "cache"
|
||
self.plugin_name = plugin_name
|
||
self.extract_dir = conf.PLUGINS_DIR # 插件目录
|
||
|
||
def run(self):
|
||
try:
|
||
enabled_plugins = conf.load_plugin_config() # 加载启用的插件
|
||
|
||
os.makedirs(self.cache_dir, exist_ok=True)
|
||
os.makedirs(self.extract_dir, exist_ok=True)
|
||
|
||
zip_path = os.path.join(self.cache_dir, f'{self.plugin_name}.zip')
|
||
|
||
self.status_signal.emit("DOWNLOADING")
|
||
self.download_file(zip_path)
|
||
self.status_signal.emit("EXTRACTING")
|
||
self.extract_zip(zip_path)
|
||
os.remove(zip_path)
|
||
print(enabled_plugins)
|
||
|
||
if (
|
||
self.plugin_name not in enabled_plugins['enabled_plugins']
|
||
and config_center.read_conf('Plugin', 'auto_enable_plugin') == '1'
|
||
):
|
||
logger.info(f"自动启用插件: {self.plugin_name}")
|
||
enabled_plugins['enabled_plugins'].append(self.plugin_name)
|
||
conf.save_plugin_config(enabled_plugins)
|
||
|
||
self.status_signal.emit("DONE")
|
||
except Exception as e:
|
||
self.status_signal.emit(f"错误: {e}")
|
||
logger.error(f"插件下载/解压失败: {e}")
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
self.terminate()
|
||
|
||
def download_file(self, file_path):
|
||
# time.sleep(555) # 模拟下载时间
|
||
try:
|
||
self.download_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')] + self.download_url
|
||
print(self.download_url)
|
||
response = requests.get(self.download_url, stream=True, proxies=proxies)
|
||
if response.status_code != 200:
|
||
logger.error(f"插件下载失败,错误代码: {response.status_code}")
|
||
self.status_signal.emit(f'ERROR: 网络连接错误:{response.status_code}')
|
||
return
|
||
|
||
total_size = int(response.headers.get('content-length', 0))
|
||
downloaded_size = 0
|
||
|
||
with open(file_path, 'wb') as file:
|
||
for chunk in response.iter_content(1024):
|
||
file.write(chunk)
|
||
downloaded_size += len(chunk)
|
||
progress = (downloaded_size / total_size) * 100 if total_size > 0 else 0 # 计算进度
|
||
self.progress_signal.emit(progress)
|
||
except Exception as e:
|
||
self.status_signal.emit(f'ERROR: {e}')
|
||
logger.error(f"插件下载错误: {e}")
|
||
|
||
def extract_zip(self, zip_path):
|
||
try:
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
zip_ref.extractall(self.extract_dir)
|
||
|
||
for p_dir in os.listdir(self.extract_dir):
|
||
if p_dir.startswith(self.plugin_name) and len(p_dir) > len(self.plugin_name):
|
||
new_name = p_dir.rsplit('-', 1)[0]
|
||
if os.path.exists(os.path.join(self.extract_dir, new_name)):
|
||
shutil.copytree(
|
||
os.path.join(self.extract_dir, p_dir), os.path.join(self.extract_dir, new_name),
|
||
dirs_exist_ok=True)
|
||
shutil.rmtree(os.path.join(self.extract_dir, p_dir))
|
||
else:
|
||
os.rename(os.path.join(self.extract_dir, p_dir), os.path.join(self.extract_dir, new_name))
|
||
except Exception as e:
|
||
logger.error(f"解压失败: {e}")
|
||
|
||
|
||
def check_update():
|
||
global threads
|
||
|
||
if VersionThread.is_running():
|
||
logger.debug("已存在版本检查线程在运行,跳过本检查")
|
||
return
|
||
|
||
# 清理已终止的线程
|
||
threads = [t for t in threads if t.isRunning()]
|
||
|
||
# 创建新的版本检查线程
|
||
version_thread = VersionThread()
|
||
threads.append(version_thread)
|
||
version_thread.version_signal.connect(check_version)
|
||
version_thread.start()
|
||
|
||
|
||
def check_version(version): # 检查更新
|
||
global threads
|
||
for thread in threads:
|
||
thread.terminate()
|
||
threads = []
|
||
if 'error' in version:
|
||
utils.tray_icon.push_error_notification(
|
||
"检查更新失败!",
|
||
f"检查更新失败!\n{version['error']}"
|
||
)
|
||
return False
|
||
|
||
channel = int(config_center.read_conf("Other", "version_channel"))
|
||
server_version = version['version_release' if channel == 0 else 'version_beta']
|
||
local_version = config_center.read_conf("Other", "version")
|
||
logger.debug(f"服务端版本: {Version(server_version)},本地版本: {Version(local_version)}")
|
||
if Version(server_version) > Version(local_version):
|
||
utils.tray_icon.push_update_notification(f"新版本速递:{server_version}\n请在“设置”中了解更多。")
|
||
|
||
class weatherReportThread(QThread): # 获取最新天气信息
|
||
weather_signal = pyqtSignal(dict)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def run(self):
|
||
try:
|
||
weather_data = self.get_weather_data()
|
||
self.weather_signal.emit(weather_data)
|
||
except Exception as e:
|
||
logger.error(f"触发天气信息失败: {e}")
|
||
finally:
|
||
self.deleteLater()
|
||
|
||
@staticmethod
|
||
def get_weather_data():
|
||
location_key = config_center.read_conf('Weather', 'city')
|
||
if location_key == '0':
|
||
city_thread = getCity()
|
||
loop = QEventLoop()
|
||
city_thread.finished.connect(loop.quit)
|
||
city_thread.start()
|
||
loop.exec_() # 阻塞到完成
|
||
location_key = config_center.read_conf('Weather', 'city')
|
||
if location_key == '0' or not location_key:
|
||
location_key = 101010100
|
||
days = 1
|
||
key = config_center.read_conf('Weather', 'api_key')
|
||
url = db.get_weather_url().format(location_key=location_key, days=days, key=key)
|
||
alert_url = db.get_weather_alert_url()
|
||
try:
|
||
data_group = {'now': {}, 'alert': {}}
|
||
response_now = requests.get(url, proxies=proxies) # 禁用代理
|
||
if alert_url == 'NotSupported':
|
||
logger.warning(f"当前API不支持天气预警信息")
|
||
elif alert_url is None:
|
||
logger.warning(f"无单独天气预警信息API")
|
||
else:
|
||
alert_url = alert_url.format(location_key=location_key, key=key)
|
||
response_alert = requests.get(alert_url, proxies=proxies)
|
||
|
||
if response_alert.status_code == 200:
|
||
data_alert = response_alert.json()
|
||
data_group['alert'] = data_alert
|
||
else:
|
||
logger.error(f"获取天气预警信息失败:{response_alert.status_code}")
|
||
|
||
if response_now.status_code == 200:
|
||
data = response_now.json()
|
||
data_group['now'] = data
|
||
return data_group
|
||
else:
|
||
logger.error(f"获取天气信息失败:{response_now.status_code}")
|
||
return {'error': {'info': {'value': '错误', 'unit': response_now.status_code}}}
|
||
except requests.exceptions.RequestException as e: # 请求失败
|
||
logger.error(f"获取天气信息失败:{e}")
|
||
return {'error': {'info': {'value': '错误', 'unit': ''}}}
|
||
except Exception as e:
|
||
logger.error(f"获取天气信息失败:{e}")
|
||
return {'error': {'info': {'value': '错误', 'unit': ''}}}
|