Seperating Main Plugin Manager class from UI classes

This commit is contained in:
Vishal 2025-08-09 21:01:42 +05:30 committed by GitHub
parent 6b17ba080f
commit 9228ed92e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -850,6 +850,189 @@ class Plugin:
bui.getsound('error').play()
class PluginManager:
def __init__(self):
self.request_headers = HEADERS
self._index = _CACHE.get("index", {})
self._changelog = _CACHE.get("changelog", {})
self.categories = {}
self.module_path = sys.modules[__name__].__file__
self._index_setup_in_progress = False
self._changelog_setup_in_progress = False
async def get_index(self):
if not self._index:
request = urllib.request.Request(
INDEX_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers,
)
response = await async_send_network_request(request)
index = json.loads(response.read())
self.set_index_global_cache(index)
self._index = index
return self._index
async def setup_index(self):
while self._index_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._index_setup_in_progress = not bool(self._index)
index = await self.get_index()
await self.setup_plugin_categories(index)
self._index_setup_in_progress = False
async def get_changelog(self) -> list[str, bool]:
requested = False
if not self._changelog:
request = urllib.request.Request(CHANGELOG_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers)
response = await async_send_network_request(request)
self._changelog = response.read().decode()
requested = True
return [self._changelog, requested]
async def setup_changelog(self, version=None) -> None:
if version is None:
version = PLUGIN_MANAGER_VERSION
while self._changelog_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._changelog_setup_in_progress = not bool(self._changelog)
try:
full_changelog = await self.get_changelog()
# check if the changelog was requested
if full_changelog[1]:
pattern = rf"### {version} \(\d\d-\d\d-\d{{4}}\)\n(.*?)(?=### \d+\.\d+\.\d+|\Z)"
if (len(full_changelog[0].split(version)) > 1):
print(len(full_changelog[0].split(version)))
released_on = full_changelog[0].split(version)[1].split('\n')[0]
matches = re.findall(pattern, full_changelog[0], re.DOTALL)
else:
matches = None
if matches:
changelog = {
'released_on': released_on,
'info': matches[0].strip()
}
else:
changelog = {'released_on': ' (Not Provided)',
'info': f"Changelog entry for version {version} not found."}
else:
changelog = full_changelog[0]
except urllib.error.URLError:
changelog = {'released_on': ' (Not Provided)',
'info': 'Could not get ChangeLog due to Internet Issues.'}
self.set_changelog_global_cache(changelog)
self._changelog_setup_in_progress = False
async def setup_plugin_categories(self, plugin_index):
# A hack to have the "All" category show at the top.
self.categories["All"] = None
requests = []
for meta_url in plugin_index["categories"]:
category = Category(meta_url)
request = category.fetch_metadata()
requests.append(request)
for source in babase.app.config["Community Plugin Manager"]["Custom Sources"]:
source_splits = source.split("@", maxsplit=1)
if len(source_splits) == 1:
# Fallack to `main` if `@branchname` isn't specified in an external source URI.
source_repo, source_tag = source_splits[0], "main"
else:
source_repo, source_tag = source_splits
meta_url = partial_format(
plugin_index["external_source_url"],
repository=source_repo,
)
category = Category(meta_url, tag=source_tag)
request = category.fetch_metadata()
requests.append(request)
categories = await asyncio.gather(*requests)
all_plugins = []
for category in categories:
self.categories[await category.get_name()] = category
all_plugins.extend(await category.get_plugins())
self.categories["All"] = CategoryAll(plugins=all_plugins)
def cleanup(self):
for category in self.categories.values():
if category is not None:
category.cleanup()
self.categories.clear()
self._index.clear()
self._changelog = None
self.unset_index_global_cache()
async def refresh(self):
self.cleanup()
await self.setup_index()
def set_index_global_cache(self, index):
_CACHE["index"] = index
def set_changelog_global_cache(self, changelog):
_CACHE["changelog"] = changelog
def unset_index_global_cache(self):
try:
del _CACHE["index"]
del _CACHE["changelog"]
except KeyError:
pass
async def get_update_details(self):
index = await self.get_index()
for version, info in index["versions"].items():
if info["api_version"] != _app_api_version:
# No point checking a version of the API game doesn't support.
continue
if version == PLUGIN_MANAGER_VERSION:
# We're already on the latest version for the current API.
return
else:
if next(iter(index["versions"])) == version:
# Version on the top is the latest, so no need to specify
# the commit SHA explicitly to GitHub to access the latest file.
commit_sha = None
else:
commit_sha = info["commit_sha"]
return version, commit_sha
async def update(self, to_version=None, commit_sha=None):
index = await self.get_index()
if to_version is None:
to_version, commit_sha = await self.get_update_details()
to_version_info = index["versions"][to_version]
tag = commit_sha or CURRENT_TAG
download_url = index["plugin_manager_url"].format(
content_type="raw",
tag=tag,
)
response = await async_send_network_request(download_url)
content = response.read()
if hashlib.md5(content).hexdigest() != to_version_info["md5sum"]:
raise MD5CheckSumFailed("MD5 checksum failed during plugin manager update.")
with open(self.module_path, "wb") as fout:
fout.write(content)
return to_version_info
async def soft_refresh(self):
pass
class ChangelogWindow(popup.PopupWindow):
def __init__(self, origin_widget):
self.scale_origin = origin_widget.get_screen_space_center()
@ -1445,189 +1628,6 @@ class PluginWindow(popup.PopupWindow):
bui.getsound('shieldUp').play()
class PluginManager:
def __init__(self):
self.request_headers = HEADERS
self._index = _CACHE.get("index", {})
self._changelog = _CACHE.get("changelog", {})
self.categories = {}
self.module_path = sys.modules[__name__].__file__
self._index_setup_in_progress = False
self._changelog_setup_in_progress = False
async def get_index(self):
if not self._index:
request = urllib.request.Request(
INDEX_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers,
)
response = await async_send_network_request(request)
index = json.loads(response.read())
self.set_index_global_cache(index)
self._index = index
return self._index
async def setup_index(self):
while self._index_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._index_setup_in_progress = not bool(self._index)
index = await self.get_index()
await self.setup_plugin_categories(index)
self._index_setup_in_progress = False
async def get_changelog(self) -> list[str, bool]:
requested = False
if not self._changelog:
request = urllib.request.Request(CHANGELOG_META.format(
repository_url=REPOSITORY_URL,
content_type="raw",
tag=CURRENT_TAG
),
headers=self.request_headers)
response = await async_send_network_request(request)
self._changelog = response.read().decode()
requested = True
return [self._changelog, requested]
async def setup_changelog(self, version=None) -> None:
if version is None:
version = PLUGIN_MANAGER_VERSION
while self._changelog_setup_in_progress:
# Avoid making multiple network calls to the same resource in parallel.
# Rather wait for the previous network call to complete.
await asyncio.sleep(0.1)
self._changelog_setup_in_progress = not bool(self._changelog)
try:
full_changelog = await self.get_changelog()
# check if the changelog was requested
if full_changelog[1]:
pattern = rf"### {version} \(\d\d-\d\d-\d{{4}}\)\n(.*?)(?=### \d+\.\d+\.\d+|\Z)"
if (len(full_changelog[0].split(version)) > 1):
print(len(full_changelog[0].split(version)))
released_on = full_changelog[0].split(version)[1].split('\n')[0]
matches = re.findall(pattern, full_changelog[0], re.DOTALL)
else:
matches = None
if matches:
changelog = {
'released_on': released_on,
'info': matches[0].strip()
}
else:
changelog = {'released_on': ' (Not Provided)',
'info': f"Changelog entry for version {version} not found."}
else:
changelog = full_changelog[0]
except urllib.error.URLError:
changelog = {'released_on': ' (Not Provided)',
'info': 'Could not get ChangeLog due to Internet Issues.'}
self.set_changelog_global_cache(changelog)
self._changelog_setup_in_progress = False
async def setup_plugin_categories(self, plugin_index):
# A hack to have the "All" category show at the top.
self.categories["All"] = None
requests = []
for meta_url in plugin_index["categories"]:
category = Category(meta_url)
request = category.fetch_metadata()
requests.append(request)
for source in babase.app.config["Community Plugin Manager"]["Custom Sources"]:
source_splits = source.split("@", maxsplit=1)
if len(source_splits) == 1:
# Fallack to `main` if `@branchname` isn't specified in an external source URI.
source_repo, source_tag = source_splits[0], "main"
else:
source_repo, source_tag = source_splits
meta_url = partial_format(
plugin_index["external_source_url"],
repository=source_repo,
)
category = Category(meta_url, tag=source_tag)
request = category.fetch_metadata()
requests.append(request)
categories = await asyncio.gather(*requests)
all_plugins = []
for category in categories:
self.categories[await category.get_name()] = category
all_plugins.extend(await category.get_plugins())
self.categories["All"] = CategoryAll(plugins=all_plugins)
def cleanup(self):
for category in self.categories.values():
if category is not None:
category.cleanup()
self.categories.clear()
self._index.clear()
self._changelog = None
self.unset_index_global_cache()
async def refresh(self):
self.cleanup()
await self.setup_index()
def set_index_global_cache(self, index):
_CACHE["index"] = index
def set_changelog_global_cache(self, changelog):
_CACHE["changelog"] = changelog
def unset_index_global_cache(self):
try:
del _CACHE["index"]
del _CACHE["changelog"]
except KeyError:
pass
async def get_update_details(self):
index = await self.get_index()
for version, info in index["versions"].items():
if info["api_version"] != _app_api_version:
# No point checking a version of the API game doesn't support.
continue
if version == PLUGIN_MANAGER_VERSION:
# We're already on the latest version for the current API.
return
else:
if next(iter(index["versions"])) == version:
# Version on the top is the latest, so no need to specify
# the commit SHA explicitly to GitHub to access the latest file.
commit_sha = None
else:
commit_sha = info["commit_sha"]
return version, commit_sha
async def update(self, to_version=None, commit_sha=None):
index = await self.get_index()
if to_version is None:
to_version, commit_sha = await self.get_update_details()
to_version_info = index["versions"][to_version]
tag = commit_sha or CURRENT_TAG
download_url = index["plugin_manager_url"].format(
content_type="raw",
tag=tag,
)
response = await async_send_network_request(download_url)
content = response.read()
if hashlib.md5(content).hexdigest() != to_version_info["md5sum"]:
raise MD5CheckSumFailed("MD5 checksum failed during plugin manager update.")
with open(self.module_path, "wb") as fout:
fout.write(content)
return to_version_info
async def soft_refresh(self):
pass
class PluginCustomSourcesWindow(popup.PopupWindow):
def __init__(self, origin_widget):
self.selected_source = None