From 5174e7fc57ede63c971a1ff33b9cab490de3eb4a Mon Sep 17 00:00:00 2001 From: brostos <67740566+brostosjoined@users.noreply.github.com> Date: Sat, 29 Jul 2023 02:29:34 +0300 Subject: [PATCH] Delete discord_richpresence.py --- plugins/utilities/discord_richpresence.py | 918 ---------------------- 1 file changed, 918 deletions(-) delete mode 100644 plugins/utilities/discord_richpresence.py diff --git a/plugins/utilities/discord_richpresence.py b/plugins/utilities/discord_richpresence.py deleted file mode 100644 index 285aa40..0000000 --- a/plugins/utilities/discord_richpresence.py +++ /dev/null @@ -1,918 +0,0 @@ -# Released under the MIT and Apache License. See LICENSE for details. -# -"""placeholder :clown:""" - -# ba_meta require api 8 -#!"Made to you by @brostos & @Dliwk" - - -from __future__ import annotations -from urllib.request import Request, urlopen, urlretrieve -from pathlib import Path -from os import getcwd, remove -from bauiv1lib.popup import PopupWindow -from babase._mgen.enums import TimeType - -import asyncio -import http.client -import ast -import uuid -import json -import time -import threading -import shutil -import hashlib -import babase -import _babase -import bascenev1 as bs -import bascenev1lib -import bauiv1 as bui - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from typing import Any, Tuple - - -ANDROID = babase.app.classic.platform == "android" -DIRPATH = Path(f"{_babase.app.python_directory_user}/image_id.json") - -if ANDROID: # !can add ios in future - - # Installing websocket - def get_module(): - install_path = Path(f"{getcwd()}/ba_data/python") # For the guys like me on windows - path = Path(f"{install_path}/websocket.tar.gz") - file_path = Path(f"{install_path}/websocket") - source_dir = Path(f"{install_path}/websocket-client-1.6.1/websocket") - if not file_path.exists(): - url = "https://files.pythonhosted.org/packages/b1/34/3a5cae1e07d9566ad073fa6d169bf22c03a3ba7b31b3c3422ec88d039108/websocket-client-1.6.1.tar.gz" - try: - filename, headers = urlretrieve(url, filename=path) - with open(filename, "rb") as f: - content = f.read() - assert hashlib.md5(content).hexdigest() == "86bc69b61947943627afc1b351c0b5db" - shutil.unpack_archive(filename, install_path) - remove(path) - shutil.copytree(source_dir, file_path) - shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1")) - except Exception as e: - if type(e) == shutil.Error: - shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1")) - else: - pass - get_module() - - from websocket import WebSocketConnectionClosedException - import websocket - - start_time = time.time() - - class PresenceUpdate: - def __init__(self): - self.ws = websocket.WebSocketApp("wss://gateway.discord.gg/?encoding=json&v=10", - on_open=self.on_open, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close) - self.heartbeat_interval = int(41250) - self.resume_gateway_url: str | None = None - self.session_id: str | None = None - self.stop_heartbeat_thread = threading.Event() - self.do_once = True - self.state: str | None = "In Game" - self.details: str | None = "Main Menu" - self.start_timestamp = time.time() - self.large_image_key: str | None = "bombsquadicon" - self.large_image_text: str | None = "BombSquad Icon" - self.small_image_key: str | None = None - self.small_image_text: str | None = ( - f"{_babase.app.classic.platform.capitalize()}({_babase.app.version})") - self.media_proxy = "mp:/app-assets/963434684669382696/{}.png" - self.identify: bool = False - self.party_id: str = str(uuid.uuid4()) - self.party_size = 1 - self.party_max = 8 - - def presence(self): - with open(DIRPATH, "r") as maptxt: - largetxt = json.load(maptxt)[self.large_image_key] - with open(DIRPATH, "r") as maptxt: - smalltxt = json.load(maptxt)[self.small_image_key] - - presencepayload = { - "op": 3, - "d": { - "since": None, # used to show how long the user went idle will add afk to work with this and then set the status to idle - "status": "online", - "afk": "false", - "activities": [ - { - "name": "BombSquad", - "type": 0, - "application_id": "963434684669382696", - "state": self.state, - "details": self.details, - "timestamps": { - "start": start_time - }, - "party": { - "id": self.party_id, - "size": [self.party_size, self.party_max] - }, - "assets": { - "large_image": self.media_proxy.format(largetxt), - "large_text": self.large_image_text, - "small_image": self.media_proxy.format(smalltxt), - "small_text": self.small_image_text, - }, - "client_info": { - "version": 0, - "os": "android", - "client": "mobile", - }, - "buttons": ["Discord Server", "Download BombSquad"], - "metadata": { - "button_urls": [ - "https://discord.gg/bombsquad-ballistica-official-1001896771347304639", - "https://bombsquad-community.web.app/download", - ] - }, - } - ], - }, - } - try: - self.ws.send(json.dumps(presencepayload)) - except WebSocketConnectionClosedException: - pass - - def on_message(self, ws, message): - message = json.loads(message) - try: - self.heartbeat_interval = message["d"]["heartbeat_interval"] - except: - pass - try: - self.resume_gateway_url = message["d"]["resume_gateway_url"] - self.session_id = message["d"]["session_id"] - except: - pass - - def on_error(self, ws, error): - babase.print_exception(error) - - def on_close(self, ws, close_status_code, close_msg): - print("Closed Discord Connection Successfully") - - def on_open(self, ws): - print("Connected to Discord Websocket") - - def heartbeats(): - """Sending heartbeats to keep the connection alive""" - if self.do_once: - heartbeat_payload = { - "op": 1, - "d": 251, - } # step two keeping connection alive by sending heart beats and receiving opcode 11 - self.ws.send(json.dumps(heartbeat_payload)) - self.do_once = False - - def identify(): - """Identifying to the gateway and enable by using user token and the intents we will be using e.g 256->For Presence""" - with open(f"{getcwd()}/token.txt", 'r') as f: - token = bytes.fromhex(f.read()).decode('utf-8') - identify_payload = { - "op": 2, - "d": { - "token": token, - "properties": { - "os": "linux", - "browser": "Discord Android", - "device": "android", - }, - "intents": 256, - }, - } # step 3 send an identify - self.ws.send(json.dumps(identify_payload)) - identify() - while True: - heartbeat_payload = {"op": 1, "d": self.heartbeat_interval} - - try: - self.ws.send(json.dumps(heartbeat_payload)) - time.sleep(self.heartbeat_interval / 1000) - except: - pass - - if self.stop_heartbeat_thread.is_set(): - self.stop_heartbeat_thread.clear() - break - - threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start() - - def start(self): - if Path(f"{getcwd()}/token.txt").exists(): - threading.Thread(target=self.ws.run_forever, daemon=True, name="websocket").start() - - def close(self): - self.stop_heartbeat_thread.set() - self.do_once = True - self.ws.close() - - -if not ANDROID: - # installing pypresence - def get_module(): - install_path = Path(f"{getcwd()}/ba_data/python") - path = Path(f"{install_path}/pypresence.tar.gz") - file_path = Path(f"{install_path}/pypresence") - source_dir = Path(f"{install_path}/pypresence-4.3.0/pypresence") - if not file_path.exists(): - url = "https://files.pythonhosted.org/packages/f4/2e/d110f862720b5e3ba1b0b719657385fc4151929befa2c6981f48360aa480/pypresence-4.3.0.tar.gz" - try: - filename, headers = urlretrieve(url, filename=path) - with open(filename, "rb") as f: - content = f.read() - assert hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7" - shutil.unpack_archive(filename, install_path) - shutil.copytree(source_dir, file_path) - shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0")) - remove(path) - except: - pass - - # Make modifications for it to work on windows - if babase.app.classic.platform == "windows": - with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file: - data = file.readlines() - data[45] = """ -def get_event_loop(force_fresh=False): - loop = asyncio.ProactorEventLoop() if sys.platform == 'win32' else asyncio.new_event_loop() - if force_fresh: - return loop - try: - running = asyncio.get_running_loop() - except RuntimeError: - return loop - if running.is_closed(): - return loop - else: - if sys.platform in ('linux', 'darwin'): - return running - if sys.platform == 'win32': - if isinstance(running, asyncio.ProactorEventLoop): - return running - else: - return loop""" - - with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "w") as file: - for number, line in enumerate(data): - if number not in range(46, 56): - file.write(line) - get_module() - - from pypresence import PipeClosed, DiscordError, DiscordNotFound - from pypresence.utils import get_event_loop - import pypresence - import socket - - DEBUG = True - - _last_server_addr = 'localhost' - _last_server_port = 43210 - - def print_error(err: str, include_exception: bool = False) -> None: - if DEBUG: - if include_exception: - babase.print_exception(err) - else: - babase.print_error(err) - else: - print(f"ERROR in discordrp.py: {err}") - - def log(msg: str) -> None: - if DEBUG: - print(f"LOG in discordrp.py: {msg}") - - def _run_overrides() -> None: - old_init = bs.Activity.__init__ - - def new_init(self, *args: Any, **kwargs: Any) -> None: # type: ignore - old_init(self, *args, **kwargs) - self._discordrp_start_time = time.mktime(time.localtime()) - - bs.Activity.__init__ = new_init # type: ignore - - old_connect = bs.connect_to_party - - def new_connect(*args, **kwargs) -> None: # type: ignore - global _last_server_addr - global _last_server_port - old_connect(*args, **kwargs) - c = kwargs.get("address") or args[0] - _last_server_port = kwargs.get("port") or args[1] - - bs.connect_to_party = new_connect - - start_time = time.time() - - class RpcThread(threading.Thread): - def __init__(self): - super().__init__(name="RpcThread") - self.rpc = pypresence.Presence(963434684669382696) - self.state: str | None = "In Game" - self.details: str | None = "Main Menu" - self.start_timestamp = time.mktime(time.localtime()) - self.large_image_key: str | None = "bombsquadicon" - self.large_image_text: str | None = "BombSquad Icon" - self.small_image_key: str | None = None - self.small_image_text: str | None = None - self.party_id: str = str(uuid.uuid4()) - self.party_size = 1 - self.party_max = 8 - self.join_secret: str | None = None - self._last_update_time: float = 0 - self._last_secret_update_time: float = 0 - self._last_connect_time: float = 0 - self.should_close = False - - @staticmethod - def is_discord_running(): - for i in range(6463, 6473): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(0.01) - try: - conn = s.connect_ex(('localhost', i)) - s.close() - if (conn == 0): - s.close() - return (True) - except: - s.close() - return (False) - - def _generate_join_secret(self): - # resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text - connection_info = bs.get_connection_to_host_info() - if connection_info: - addr = _last_server_addr - port = _last_server_port - else: - try: - with urlopen( - "https://legacy.ballistica.net/bsAccessCheck" - ) as resp: - resp = resp.read().decode() - resp = ast.literal_eval(resp) - addr = resp["address"] - port = 43210 - secret_dict = { - "format_version": 1, - "hostname": addr, - "port": port, - } - self.join_secret = json.dumps(secret_dict) - except: - pass - - def _update_secret(self): - threading.Thread(target=self._generate_join_secret, daemon=True).start() - self._last_secret_update_time = time.time() - - def run(self) -> None: - asyncio.set_event_loop(get_event_loop()) - while not self.should_close: - if time.time() - self._last_update_time > 0.1: - self._do_update_presence() - if time.time() - self._last_secret_update_time > 15: - self._update_secret() - # if time.time() - self._last_connect_time > 120 and is_discord_running(): #!Eric please add module manager(pip) - # self._reconnect() - time.sleep(0.03) - - def _subscribe(self, event: str, **args): - self.rpc.send_data( - 1, - { - "nonce": f"{time.time():.20f}", - "cmd": "SUBSCRIBE", - "evt": event, - "args": args, - }, - ) - data = self.rpc.loop.run_until_complete(self.rpc.read_output()) - self.handle_event(data) - - def _subscribe_events(self): - self._subscribe("ACTIVITY_JOIN") - self._subscribe("ACTIVITY_JOIN_REQUEST") - - # def _update_presence(self) -> None: - # self._last_update_time = time.time() - # try: - # self._do_update_presence() - # except (AttributeError, AssertionError): - # try: - # self._reconnect() - # except Exception: - # print_error("failed to update presence", include_exception= True) - - def _reconnect(self) -> None: - self.rpc.connect() - self._subscribe_events() - self._do_update_presence() - self._last_connect_time = time.time() - - def _do_update_presence(self) -> None: - if RpcThread.is_discord_running(): - self._last_update_time = time.time() - try: - data = self.rpc.update( - state=self.state or " ", - details=self.details, - start=start_time, - large_image=self.large_image_key, - large_text=self.large_image_text, - small_image=self.small_image_key, - small_text=self.small_image_text, - party_id=self.party_id, - party_size=[self.party_size, self.party_max], - join=self.join_secret, - # buttons = [ #!cant use buttons together with join - # { - # "label": "Discord Server", - # "url": "https://ballistica.net/discord" - # }, - # { - # "label": "Download Bombsquad", - # "url": "https://bombsquad.ga/download"} - # ] - ) - - self.handle_event(data) - except (PipeClosed, DiscordError, AssertionError, AttributeError): - try: - self._reconnect() - except (DiscordNotFound, DiscordError): - pass - - def handle_event(self, data): - evt = data["evt"] - if evt is None: - return - - data = data.get("data", {}) - - if evt == "ACTIVITY_JOIN": - secret = data.get("secret") - try: - server = json.loads(secret) - format_version = server["format_version"] - except Exception: - babase.print_exception("discordrp: unknown activity join format") - else: - try: - if format_version == 1: - hostname = server["hostname"] - port = server["port"] - self._connect_to_party(hostname, port) - except Exception: - babase.print_exception( - f"discordrp: incorrect activity join data, {format_version=}" - ) - - elif evt == "ACTIVITY_JOIN_REQUEST": - user = data.get("user", {}) - uid = user.get("id") - username = user.get("username") - discriminator = user.get("discriminator", None) - avatar = user.get("avatar") - self.on_join_request(username, uid, discriminator, avatar) - - def _connect_to_party(self, hostname, port) -> None: - babase.pushcall( - babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True - ) - - def on_join_request(self, username, uid, discriminator, avatar) -> None: - del uid # unused - del avatar # unused - babase.pushcall( - babase.Call( - bui.screenmessage, - "Discord: {} wants to join!".format(username), - color=(0.0, 1.0, 0.0), - ), - from_other_thread=True, - ) - babase.pushcall(lambda: bui.getsound('bellMed').play(), from_other_thread=True) - - -class Discordlogin(PopupWindow): - - def __init__(self): - # pylint: disable=too-many-locals - _uiscale = bui.app.ui_v1.uiscale - self._transitioning_out = False - s = 1.25 if _uiscale is babase.UIScale.SMALL else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3 - self._width = 380 * s - self._height = 150 + 150 * s - self.path = Path(f"{getcwd()}/token.txt") - bg_color = (0.5, 0.4, 0.6) - log_btn_colour = (0.10, 0.95, 0.10) if not self.path.exists() else (1.00, 0.15, 0.15) - log_txt = "LOG IN" if not self.path.exists() else "LOG OUT" - - # creates our _root_widget - PopupWindow.__init__(self, - position=(0.0, 0.0), - size=(self._width, self._height), - scale=(2.1 if _uiscale is babase.UIScale.SMALL else 1.5 - if _uiscale is babase.UIScale.MEDIUM else 1.0), - bg_color=bg_color) - - self._cancel_button = bui.buttonwidget( - parent=self.root_widget, - position=(25, self._height - 40), - size=(50, 50), - scale=0.58, - label='', - color=bg_color, - on_activate_call=self._on_cancel_press, - autoselect=True, - icon=bui.gettexture('crossOut'), - iconscale=1.2) - - bui.imagewidget(parent=self.root_widget, - position=(180, self._height - 55), - size=(32 * s, 32 * s), - texture=bui.gettexture("discordLogo"), - color=(10 - 0.32, 10 - 0.39, 10 - 0.96)) - - self.email_widget = bui.textwidget(parent=self.root_widget, - text="Email/Phone Number", - size=(400, 70), - position=(50, 180), - h_align='left', - v_align='center', - editable=True, - scale=0.8, - autoselect=True, - maxwidth=220) - - self.password_widget = bui.textwidget(parent=self.root_widget, - text="Password", - size=(400, 70), - position=(50, 120), - h_align='left', - v_align='center', - editable=True, - scale=0.8, - autoselect=True, - maxwidth=220) - - bui.containerwidget(edit=self.root_widget, - cancel_button=self._cancel_button) - - bui.textwidget( - parent=self.root_widget, - position=(265, self._height - 37), - size=(0, 0), - h_align='center', - v_align='center', - scale=1.0, - text="Discord", - maxwidth=200, - color=(0.80, 0.80, 0.80)) - - bui.textwidget( - parent=self.root_widget, - position=(265, self._height - 78), - size=(0, 0), - h_align='center', - v_align='center', - scale=1.0, - text="💀Use at your own risk💀\n ⚠️discord account might get terminated⚠️", - maxwidth=200, - color=(1.00, 0.15, 0.15)) - - self._login_button = bui.buttonwidget( - parent=self.root_widget, - position=(120, 65), - size=(400, 80), - scale=0.58, - label=log_txt, - color=log_btn_colour, - on_activate_call=self.login, - autoselect=True) - - def _on_cancel_press(self) -> None: - self._transition_out() - - def _transition_out(self) -> None: - if not self._transitioning_out: - self._transitioning_out = True - bui.containerwidget(edit=self.root_widget, transition='out_scale') - - def on_bascenev1libup_cancel(self) -> None: - bui.getsound('swish').play() - self._transition_out() - - def login(self): - if not self.path.exists(): - json_data = { - 'login': bui.textwidget(query=self.email_widget), - 'password': bui.textwidget(query=self.password_widget), - 'undelete': False, - 'captcha_key': None, - 'login_source': None, - 'gift_code_sku_id': None, - } - headers = { - 'user-agent': "Mozilla/5.0", - 'content-type': "application/json", - } - - conn = http.client.HTTPSConnection("discord.com") - - payload = json.dumps(json_data) - # conn.request("POST", "/api/v9/auth/login", payload, headers) - # res = conn.getresponse().read() - - try: - conn.request("POST", "/api/v9/auth/login", payload, headers) - res = conn.getresponse().read() - token = json.loads(res)['token'].encode().hex().encode() - with open(self.path, 'wb') as f: - f.write(token) - bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20)) - bui.getsound('shieldUp').play() - self.on_bascenev1libup_cancel() - except: - bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15)) - bui.getsound('error').play() - - conn.close() - else: - remove(self.path) - bui.getsound('shieldDown').play() - bui.screenmessage("Account successfully removed!!", (0.10, 0.10, 1.00)) - self.on_bascenev1libup_cancel() - PresenceUpdate().ws.close() - - -run_once = False - - -def get_once_asset(): - global run_once - if run_once: - return - response = Request( - "https://discordapp.com/api/oauth2/applications/963434684669382696/assets", - headers={"User-Agent": "Mozilla/5.0"}, - ) - try: - with urlopen(response) as assets: - assets = json.loads(assets.read()) - asset = [] - asset_id = [] - for x in assets: - dem = x["name"] - don = x["id"] - asset_id.append(don) - asset.append(dem) - asset_id_dict = dict(zip(asset, asset_id)) - - with open(DIRPATH, "w") as imagesets: - jsonfile = json.dumps(asset_id_dict) - json.dump(asset_id_dict, imagesets, indent=4) - except: - pass - run_once = True - - -def get_class(): - if ANDROID: - return PresenceUpdate() - elif not ANDROID: - return RpcThread() - - -# ba_meta export babase.Plugin -class DiscordRP(babase.Plugin): - def __init__(self) -> None: - self.update_timer: bs.Timer | None = None - self.rpc_thread = get_class() - self._last_server_info: str | None = None - - if not ANDROID: - _run_overrides() - get_once_asset() - - def on_app_running(self) -> None: - if not ANDROID: - self.rpc_thread.start() - self.update_timer = bs.AppTimer( - 1, bs.WeakCall(self.update_status), repeat=True - ) - if ANDROID: - self.rpc_thread.start() - self.update_timer = bs.AppTimer( - 4, bs.WeakCall(self.update_status), repeat=True - ) - - def has_settings_ui(self): - return True - - def show_settings_ui(self, button): - if not ANDROID: - bui.screenmessage("Nothing here achievement!!!", (0.26, 0.65, 0.94)) - bui.getsound('achievement').play() - if ANDROID: - Discordlogin() - - def on_app_shutdown(self) -> None: - if not ANDROID and self.rpc_thread.is_discord_running(): - self.rpc_thread.rpc.close() - self.rpc_thread.should_close = True - - def on_app_pause(self) -> None: - self.rpc_thread.close() - - def on_app_resume(self) -> None: - global start_time - start_time = time.time() - self.rpc_thread.start() - - def _get_current_activity_name(self) -> str | None: - act = bs.get_foreground_host_activity() - if isinstance(act, bs.GameActivity): - return act.name - - this = "Lobby" - name: str | None = ( - act.__class__.__name__.replace("Activity", "") - .replace("ScoreScreen", "Ranking") - .replace("Coop", "") - .replace("MultiTeam", "") - .replace("Victory", "") - .replace("EndSession", "") - .replace("Transition", "") - .replace("Draw", "") - .replace("FreeForAll", "") - .replace("Join", this) - .replace("Team", "") - .replace("Series", "") - .replace("CustomSession", "Custom Session(mod)") - ) - - if name == "MainMenu": - name = "Main Menu" - if name == this: - self.rpc_thread.large_image_key = "lobby" - self.rpc_thread.large_image_text = "Bombing up" - #self.rpc_thread.small_image_key = "lobbysmall" - if name == "Ranking": - self.rpc_thread.large_image_key = "ranking" - self.rpc_thread.large_image_text = "Viewing Results" - return name - - def _get_current_map_name(self) -> Tuple[str | None, str | None]: - act = bs.get_foreground_host_activity() - if isinstance(act, bs.GameActivity): - texname = act.map.get_preview_texture_name() - if texname: - return act.map.name, texname.lower().removesuffix("preview") - return None, None - - def update_status(self) -> None: - roster = bs.get_game_roster() - connection_info = bs.get_connection_to_host_info() - - self.rpc_thread.large_image_key = "bombsquadicon" - self.rpc_thread.large_image_text = "BombSquad" - self.rpc_thread.small_image_key = _babase.app.classic.platform - self.rpc_thread.small_image_text = ( - f"{_babase.app.classic.platform.capitalize()}({_babase.app.version})" - ) - connection_info = bs.get_connection_to_host_info() - if not ANDROID: - svinfo = str(connection_info) - if self._last_server_info != svinfo: - self._last_server_info = svinfo - self.rpc_thread.party_id = str(uuid.uuid4()) - self.rpc_thread._update_secret() - if connection_info != {}: - servername = connection_info["name"] - self.rpc_thread.details = "Online" - self.rpc_thread.party_size = max( - 1, sum(len(client["players"]) for client in roster) - ) - self.rpc_thread.party_max = max(8, self.rpc_thread.party_size) - if len(servername) == 19 and "Private Party" in servername: - self.rpc_thread.state = "Private Party" - elif servername == "": # A local game joinable from the internet - try: - offlinename = json.loads(bs.get_game_roster()[0]["spec_string"])[ - "n" - ] - if len(offlinename) > 19: # Thanks Rikko - self.rpc_thread.state = offlinename[slice(19)] + "..." - else: - self.rpc_thread.state = offlinename - except IndexError: - pass - else: - if len(servername) > 19: - self.rpc_thread.state = servername[slice(19)] + ".." - else: - self.rpc_thread.state = servername[slice(19)] - - if connection_info == {}: - self.rpc_thread.details = "Local" # ! replace with something like ballistica github cause - self.rpc_thread.state = self._get_current_activity_name() - self.rpc_thread.party_size = max(1, len(roster)) - self.rpc_thread.party_max = max(1, bs.get_public_party_max_size()) - - if ( - bs.get_foreground_host_session() is not None - and self.rpc_thread.details == "Local" - ): - session = ( - bs.get_foreground_host_session() - .__class__.__name__.replace("MainMenuSession", "") - .replace("EndSession", "") - .replace("FreeForAllSession", ": FFA") # ! for session use small image key - .replace("DualTeamSession", ": Teams") - .replace("CoopSession", ": Coop") - ) - #! self.rpc_thread.small_image_key = session.lower() - self.rpc_thread.details = f"{self.rpc_thread.details} {session}" - if ( - self.rpc_thread.state == "NoneType" - ): # sometimes the game just breaks which means its not really watching replay FIXME - self.rpc_thread.state = "Watching Replay" - self.rpc_thread.large_image_key = "replay" - self.rpc_thread.large_image_text = "Viewing Awesomeness" - #!self.rpc_thread.small_image_key = "replaysmall" - - act = bs.get_foreground_host_activity() - session = bs.get_foreground_host_session() - if act: - from bascenev1lib.game.elimination import EliminationGame - from bascenev1lib.game.thelaststand import TheLastStandGame - from bascenev1lib.game.meteorshower import MeteorShowerGame - - # noinspection PyUnresolvedReferences,PyProtectedMember - try: - self.rpc_thread.start_timestamp = act._discordrp_start_time # type: ignore - except AttributeError: - # This can be the case if plugin launched AFTER activity - # has been created; in that case let's assume it was - # created just now. - self.rpc_thread.start_timestamp = act._discordrp_start_time = time.mktime( # type: ignore - time.localtime() - ) - if isinstance(act, EliminationGame): - alive_count = len([p for p in act.players if p.lives > 0]) - self.rpc_thread.details += f" ({alive_count} players left)" - elif isinstance(act, TheLastStandGame): - # noinspection PyProtectedMember - points = act._score - self.rpc_thread.details += f" ({points} points)" - elif isinstance(act, MeteorShowerGame): - with bs.ContextRef(act): - sec = bs.time() - act._timer.getstarttime() - secfmt = "" - if sec < 60: - secfmt = f"{sec:.2f}" - else: - secfmt = f"{int(sec) // 60:02}:{sec:.2f}" - self.rpc_thread.details += f" ({secfmt})" - - # if isinstance(session, ba.DualTeamSession): - # scores = ':'.join([ - # str(t.customdata['score']) - # for t in session.sessionteams - # ]) - # self.rpc_thread.details += f' ({scores})' - - mapname, short_map_name = self._get_current_map_name() - if mapname: - with open(DIRPATH, 'r') as asset_dict: - asset_keys = json.load(asset_dict).keys() - if short_map_name in asset_keys: - self.rpc_thread.large_image_text = mapname - self.rpc_thread.large_image_key = short_map_name - self.rpc_thread.small_image_key = 'bombsquadlogo2' - self.rpc_thread.small_image_text = 'BombSquad' - - if _babase.get_idle_time() / (1000 * 60) % 60 >= 0.4: - self.rpc_thread.details = f"AFK in {self.rpc_thread.details}" - if not ANDROID: - self.rpc_thread.large_image_key = ( - "https://media.tenor.com/uAqNn6fv7x4AAAAM/bombsquad-spaz.gif" - ) - if ANDROID and Path(f"{getcwd()}/token.txt").exists(): - self.rpc_thread.presence()