More CLeaning Up

This commit is contained in:
Vishal 2025-08-05 06:27:53 +05:30 committed by GitHub
parent 81db33ccce
commit 1e47a0dd6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -23,7 +23,7 @@ import pathlib
import hashlib import hashlib
import contextlib import contextlib
from typing import cast from typing import cast, override
from datetime import datetime from datetime import datetime
# Modules used for overriding AllSettingsWindow # Modules used for overriding AllSettingsWindow
@ -838,6 +838,183 @@ class Plugin:
bui.getsound('error').play() bui.getsound('error').play()
class PluginManager:
def __init__(self):
self.request_headers = HEADERS
self._index = _CACHE.get("index", {})
self._changelog = _CACHE.get("changelog", {})
self.categories = {}
self.module_path = sys.modules[__name__].__file__
self._index_setup_in_progress = False
self._changelog_setup_in_progress = False
async def get_index(self):
if not self._index:
request = urllib.request.Request(
INDEX_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers,
)
response = await async_send_network_request(request)
index = json.loads(response.read())
self.set_index_global_cache(index)
self._index = index
return self._index
async def setup_index(self):
while self._index_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._index_setup_in_progress = not bool(self._index)
index = await self.get_index()
await self.setup_plugin_categories(index)
self._index_setup_in_progress = False
async def get_changelog(self) -> str:
requested = False
if not self._changelog:
request = urllib.request.Request(CHANGELOG_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers)
response = await async_send_network_request(request)
self._changelog = response.read().decode()
requested = True
return [self._changelog, requested]
async def setup_changelog(self, version=None) -> None:
if version is None:
version = PLUGIN_MANAGER_VERSION
while self._changelog_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._changelog_setup_in_progress = not bool(self._changelog)
try:
full_changelog = await self.get_changelog()
if full_changelog[1]:
pattern = rf"### {version} \(\d\d-\d\d-\d{{4}}\)\n(.*?)(?=### \d+\.\d+\.\d+|\Z)"
released_on = full_changelog[0].split(version)[1].split('\n')[0]
matches = re.findall(pattern, full_changelog[0], re.DOTALL)
if matches:
changelog = {
'released_on': released_on,
'info': matches[0].strip()
}
else:
changelog = {'released_on': ' (Not Provided)',
'info': f"Changelog entry for version {version} not found."}
else:
changelog = full_changelog[0]
except urllib.error.URLError:
changelog = {'released_on': ' (Not Provided)',
'info': 'Could not get ChangeLog due to Internet Issues.'}
self.set_changelog_global_cache(changelog)
self._changelog_setup_in_progress = False
async def setup_plugin_categories(self, plugin_index):
# A hack to have the "All" category show at the top.
self.categories["All"] = None
requests = []
for meta_url in plugin_index["categories"]:
category = Category(meta_url)
request = category.fetch_metadata()
requests.append(request)
for source in babase.app.config["Community Plugin Manager"]["Custom Sources"]:
source_splits = source.split("@", maxsplit=1)
if len(source_splits) == 1:
# Fallack to `main` if `@branchname` isn't specified in an external source URI.
source_repo, source_tag = source_splits[0], "main"
else:
source_repo, source_tag = source_splits
meta_url = partial_format(
plugin_index["external_source_url"],
repository=source_repo,
)
category = Category(meta_url, tag=source_tag)
request = category.fetch_metadata()
requests.append(request)
categories = await asyncio.gather(*requests)
all_plugins = []
for category in categories:
self.categories[await category.get_name()] = category
all_plugins.extend(await category.get_plugins())
self.categories["All"] = CategoryAll(plugins=all_plugins)
def cleanup(self):
for category in self.categories.values():
if category is not None:
category.cleanup()
self.categories.clear()
self._index.clear()
self._changelog = None
self.unset_index_global_cache()
async def refresh(self):
self.cleanup()
await self.setup_index()
def set_index_global_cache(self, index):
_CACHE["index"] = index
def set_changelog_global_cache(self, changelog):
_CACHE["changelog"] = changelog
def unset_index_global_cache(self):
try:
del _CACHE["index"]
del _CACHE["changelog"]
except KeyError:
pass
async def get_update_details(self):
index = await self.get_index()
for version, info in index["versions"].items():
if info["api_version"] != _app_api_version:
# No point checking a version of the API game doesn't support.
continue
if version == PLUGIN_MANAGER_VERSION:
# We're already on the latest version for the current API.
return
else:
if next(iter(index["versions"])) == version:
# Version on the top is the latest, so no need to specify
# the commit SHA explicitly to GitHub to access the latest file.
commit_sha = None
else:
commit_sha = info["commit_sha"]
return version, commit_sha
async def update(self, to_version=None, commit_sha=None):
index = await self.get_index()
if to_version is None:
to_version, commit_sha = await self.get_update_details()
to_version_info = index["versions"][to_version]
tag = commit_sha or CURRENT_TAG
download_url = index["plugin_manager_url"].format(
content_type="raw",
tag=tag,
)
response = await async_send_network_request(download_url)
content = response.read()
if hashlib.md5(content).hexdigest() != to_version_info["md5sum"]:
raise MD5CheckSumFailed("MD5 checksum failed during plugin manager update.")
with open(self.module_path, "wb") as fout:
fout.write(content)
return to_version_info
async def soft_refresh(self):
pass
class ChangelogWindow(popup.PopupWindow): class ChangelogWindow(popup.PopupWindow):
def __init__(self, origin_widget): def __init__(self, origin_widget):
self.scale_origin = origin_widget.get_screen_space_center() self.scale_origin = origin_widget.get_screen_space_center()
@ -1362,181 +1539,7 @@ class PluginWindow(popup.PopupWindow):
bui.getsound('shieldUp').play() bui.getsound('shieldUp').play()
class PluginManager:
def __init__(self):
self.request_headers = HEADERS
self._index = _CACHE.get("index", {})
self._changelog = _CACHE.get("changelog", {})
self.categories = {}
self.module_path = sys.modules[__name__].__file__
self._index_setup_in_progress = False
self._changelog_setup_in_progress = False
async def get_index(self):
if not self._index:
request = urllib.request.Request(
INDEX_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers,
)
response = await async_send_network_request(request)
index = json.loads(response.read())
self.set_index_global_cache(index)
self._index = index
return self._index
async def setup_index(self):
while self._index_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._index_setup_in_progress = not bool(self._index)
index = await self.get_index()
await self.setup_plugin_categories(index)
self._index_setup_in_progress = False
async def get_changelog(self) -> str:
requested = False
if not self._changelog:
request = urllib.request.Request(CHANGELOG_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers)
response = await async_send_network_request(request)
self._changelog = response.read().decode()
requested = True
return [self._changelog, requested]
async def setup_changelog(self, version=None) -> None:
if version is None:
version = PLUGIN_MANAGER_VERSION
while self._changelog_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._changelog_setup_in_progress = not bool(self._changelog)
try:
full_changelog = await self.get_changelog()
if full_changelog[1]:
pattern = rf"### {version} \(\d\d-\d\d-\d{{4}}\)\n(.*?)(?=### \d+\.\d+\.\d+|\Z)"
released_on = full_changelog[0].split(version)[1].split('\n')[0]
matches = re.findall(pattern, full_changelog[0], re.DOTALL)
if matches:
changelog = {
'released_on': released_on,
'info': matches[0].strip()
}
else:
changelog = {'released_on': ' (Not Provided)',
'info': f"Changelog entry for version {version} not found."}
else:
changelog = full_changelog[0]
except urllib.error.URLError:
changelog = {'released_on': ' (Not Provided)',
'info': 'Could not get ChangeLog due to Internet Issues.'}
self.set_changelog_global_cache(changelog)
self._changelog_setup_in_progress = False
async def setup_plugin_categories(self, plugin_index):
# A hack to have the "All" category show at the top.
self.categories["All"] = None
requests = []
for meta_url in plugin_index["categories"]:
category = Category(meta_url)
request = category.fetch_metadata()
requests.append(request)
for source in babase.app.config["Community Plugin Manager"]["Custom Sources"]:
source_splits = source.split("@", maxsplit=1)
if len(source_splits) == 1:
# Fallack to `main` if `@branchname` isn't specified in an external source URI.
source_repo, source_tag = source_splits[0], "main"
else:
source_repo, source_tag = source_splits
meta_url = partial_format(
plugin_index["external_source_url"],
repository=source_repo,
)
category = Category(meta_url, tag=source_tag)
request = category.fetch_metadata()
requests.append(request)
categories = await asyncio.gather(*requests)
all_plugins = []
for category in categories:
self.categories[await category.get_name()] = category
all_plugins.extend(await category.get_plugins())
self.categories["All"] = CategoryAll(plugins=all_plugins)
def cleanup(self):
for category in self.categories.values():
if category is not None:
category.cleanup()
self.categories.clear()
self._index.clear()
self._changelog = None
self.unset_index_global_cache()
async def refresh(self):
self.cleanup()
await self.setup_index()
def set_index_global_cache(self, index):
_CACHE["index"] = index
def set_changelog_global_cache(self, changelog):
_CACHE["changelog"] = changelog
def unset_index_global_cache(self):
try:
del _CACHE["index"]
del _CACHE["changelog"]
except KeyError:
pass
async def get_update_details(self):
index = await self.get_index()
for version, info in index["versions"].items():
if info["api_version"] != _app_api_version:
# No point checking a version of the API game doesn't support.
continue
if version == PLUGIN_MANAGER_VERSION:
# We're already on the latest version for the current API.
return
else:
if next(iter(index["versions"])) == version:
# Version on the top is the latest, so no need to specify
# the commit SHA explicitly to GitHub to access the latest file.
commit_sha = None
else:
commit_sha = info["commit_sha"]
return version, commit_sha
async def update(self, to_version=None, commit_sha=None):
index = await self.get_index()
if to_version is None:
to_version, commit_sha = await self.get_update_details()
to_version_info = index["versions"][to_version]
tag = commit_sha or CURRENT_TAG
download_url = index["plugin_manager_url"].format(
content_type="raw",
tag=tag,
)
response = await async_send_network_request(download_url)
content = response.read()
if hashlib.md5(content).hexdigest() != to_version_info["md5sum"]:
raise MD5CheckSumFailed("MD5 checksum failed during plugin manager update.")
with open(self.module_path, "wb") as fout:
fout.write(content)
return to_version_info
async def soft_refresh(self):
pass
class PluginSourcesWindow(popup.PopupWindow): class PluginSourcesWindow(popup.PopupWindow):
@ -1763,7 +1766,11 @@ class PluginCategoryWindow(popup.PopupMenuWindow):
class PluginManagerWindow(bui.MainWindow): class PluginManagerWindow(bui.MainWindow):
def __init__(self, transition: str = "in_right", origin_widget: bui.Widget = None): def __init__(
self,
transition: str = "in_right",
origin_widget: bui.Widget = None
):
self.plugin_manager = PluginManager() self.plugin_manager = PluginManager()
self.category_selection_button = None self.category_selection_button = None
self.selected_category = 'All' self.selected_category = 'All'
@ -1857,6 +1864,16 @@ class PluginManagerWindow(bui.MainWindow):
size=48, size=48,
) )
@override
def get_main_window_state(self) -> bui.MainWindowState:
# Support recreating our window for back/refresh purposes.
cls = type(self)
return bui.BasicMainWindowState(
create_call=lambda transition, origin_widget: cls(
transition=transition, origin_widget=origin_widget
)
)
def spin(self, show=False): def spin(self, show=False):
w = self._loading_spinner w = self._loading_spinner
p = self._root_widget p = self._root_widget