# Released under the MIT and Apache License. See LICENSE for details. # """placeholder :clown:""" # ba_meta require api 8 #!"Made to you by @brostos & @Dliwk" # TODO # - Update to the latest libs and test then release and add the module in the mod file as bytes from __future__ import annotations from urllib.request import Request, urlopen, urlretrieve from pathlib import Path from os import getcwd, remove from bauiv1lib.popup import PopupWindow from babase._mgen.enums import TimeType import asyncio import http.client import ast import uuid import json import time import threading import shutil import hashlib import babase import _babase import bascenev1 as bs import bascenev1lib import bauiv1 as bui from baenv import TARGET_BALLISTICA_BUILD as build_number from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Tuple MAPNAME_ID = { "bombsquadicon": "963448129900908595", "zigzagpreview": "963448133130522624", "tiptoppreview": "963448133168279582", "towerdpreview": "963448135886200912", "thepadpreview": "963448137916248084", "steprightuppreview": "963448141728862248", "roundaboutpreview": "963448143997972550", "rampagepreview": "963448146422296676", "monkeyfacepreview": "963448151182831626", "footballstadiumpreview": "963448158719983646", "doomshroompreview": "963448160993292368", "cragcastlepreview": "963448163048513536", "courtyardpreview": "963448166127120504", "bridgitpreview": "963448169180565654", "biggpreview": "963448172905127996", "alwayslandpreview": "963448174163423252", "bigg": "1013013392455376977", "bridgit": "1013013400139333632", "courtyard": "1013013410776096788", "cragcastle": "1013013423132528700", "doomshroom": "1013013438223622224", "footballstadium": "1013013452517810226", "hockeystadium": "1013013464060547112", "monkeyface": "1013013477721383023", "rampage": "1013013484830728273", "roundabout": "1013013508323037264", "steprightup": "1013013567768907826", "thepad": "1013013577197699163", "tiptop": "1013013593089904721", "towerd": "1013013604531970131", "zigzag": "1013013618188619816", "bombsquadlogo2": "1013016083701190726", "windows": "1084050785488338984", "linux": "1084078945944739920", "lobby": "1084180821973418226", "ranking": "1084224689272004719", "rampagelevelcolor": "1086989941541703741", "landmine": "1087000404866371766", "rgbstripes": "1087000416492990474", "shrapnel1color": "1087151233225195590", "bonescolor": "1087151164077899928", "bridgitlevelcolor": "1087151178674094182", "crossout": "1087151197963681902", "naturebackgroundcolor": "1087151209896476782", "zigzaglevelcolor": "1087151253206876241", "zoeicon": "1087151266989363240", "bg": "1087564057890000906", "alwayslandlevelcolor": "1087564765406167080", "hockeystadiumpreview": "1087574349285961768", "mac": "1087584375287336992", "flyer": "1087584543147561051", "replay": "1087592122393301102", "coop": "1097697042891018311", "ffa": "1097697050214269008", "lobbysmall": "1097697055926923335", "replaysmall": "1097697062746853386", "teams": "1097697068727935036", "bacongreece": "1097700754623565894", "basketballstadium": "1097700771501441167", "flapland": "1097700783622979664", "alwaysland": "1097700794213613610", "hoveringwood": "1097700802321199224", "jrmponslaught": "1097700810479124520", "jrmprunaround": "1097700817194205286", "lakefrigid": "1097700828023898203", "mushfeud": "1097700836920000594", "pillar_bases": "1097700846340407427", "powerup_factory": "1097700854422851656", "snowballpit": "1097700869673341009", "stoneishfort": "1097700887826272308", "toiletdonut": "1097700898584666193", "whereeaglesdare": "1097700904972587109", "android": "1097728392280932453", } ANDROID = babase.app.classic.platform == "android" APP_VERSION = _babase.app.version if build_number < 21282 else _babase.app.env.version if ANDROID: # !can add ios in future # Installing websocket def get_module(): install_path = Path( f"{getcwd()}/ba_data/python" ) # For the guys like me on windows path = Path(f"{install_path}/websocket.tar.gz") file_path = Path(f"{install_path}/websocket") source_dir = Path(f"{install_path}/websocket-client-1.6.1/websocket") if not Path(f"{file_path}/__init__.py").exists(): # YouKnowDev url = "https://files.pythonhosted.org/packages/b1/34/3a5cae1e07d9566ad073fa6d169bf22c03a3ba7b31b3c3422ec88d039108/websocket-client-1.6.1.tar.gz" try: # fix issue where the file delete themselves try: shutil.rmtree(file_path) except: pass filename, headers = urlretrieve(url, filename=path) with open(filename, "rb") as f: content = f.read() assert ( hashlib.md5(content).hexdigest() == "86bc69b61947943627afc1b351c0b5db" ) shutil.unpack_archive(filename, install_path) remove(path) shutil.copytree(source_dir, file_path) shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1")) except Exception as e: if type(e) == shutil.Error: shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1")) else: pass get_module() from websocket import WebSocketConnectionClosedException import websocket start_time = time.time() class PresenceUpdate: def __init__(self): self.ws = websocket.WebSocketApp( "wss://gateway.discord.gg/?encoding=json&v=10", on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, ) self.heartbeat_interval = int(41250) self.resume_gateway_url: str | None = None self.session_id: str | None = None self.stop_heartbeat_thread = threading.Event() self.do_once: bool = True self.state: str | None = "In Game" self.details: str | None = "Main Menu" self.start_timestamp = time.time() self.large_image_key: str | None = "bombsquadicon" self.large_image_text: str | None = "BombSquad Icon" self.small_image_key: str | None = None self.small_image_text: str | None = ( f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})" ) self.media_proxy = "mp:/app-assets/963434684669382696/{}.png" self.identify: bool = False self.party_id: str = str(uuid.uuid4()) self.party_size = 1 self.party_max = 8 def presence(self): largetxt = MAPNAME_ID[self.large_image_key] smalltxt = MAPNAME_ID[self.small_image_key] presencepayload = { "op": 3, "d": { "since": None, # used to show how long the user went idle will add afk to work with this and then set the status to idle "status": "online", "afk": "false", "activities": [ { "name": "BombSquad", "type": 0, "application_id": "963434684669382696", "state": self.state, "details": self.details, "timestamps": {"start": start_time}, "party": { "id": self.party_id, "size": [self.party_size, self.party_max], }, "assets": { "large_image": self.media_proxy.format(largetxt), "large_text": self.large_image_text, "small_image": self.media_proxy.format(smalltxt), "small_text": self.small_image_text, }, "client_info": { "version": 0, "os": "android", "client": "mobile", }, "buttons": ["Discord Server", "Download BombSquad"], "metadata": { "button_urls": [ "https://discord.gg/bombsquad-ballistica-official-1001896771347304639", "https://bombsquad-community.web.app/download", ] }, } ], }, } try: self.ws.send(json.dumps(presencepayload)) except WebSocketConnectionClosedException: pass def on_message(self, ws, message): message = json.loads(message) try: self.heartbeat_interval = message["d"]["heartbeat_interval"] except: pass try: self.resume_gateway_url = message["d"]["resume_gateway_url"] self.session_id = message["d"]["session_id"] except: pass def on_error(self, ws, error): babase.print_exception(error) def on_close(self, ws, close_status_code, close_msg): ( print("Closed Discord Connection Successfully") if close_status_code == 1000 else print( f"Closed Discord Connection with code {close_status_code} and message {close_msg}" ) ) def on_open(self, ws): print("Connected to Discord Websocket") def heartbeats(): """Sending heartbeats to keep the connection alive""" if self.do_once: heartbeat_payload = { "op": 1, "d": 251, } # step two keeping connection alive by sending heart beats and receiving opcode 11 self.ws.send(json.dumps(heartbeat_payload)) self.do_once = False def identify(): """Identifying to the gateway and enable by using user token and the intents we will be using e.g 256->For Presence""" byt_tkn = babase.app.config.get("token") token = bytes.fromhex(byt_tkn).decode("utf-8") identify_payload = { "op": 2, "d": { "token": token, "properties": { "os": "linux", "browser": "Discord Android", "device": "android", }, "intents": 256, }, } # step 3 send an identify self.ws.send(json.dumps(identify_payload)) identify() while True: heartbeat_payload = {"op": 1, "d": self.heartbeat_interval} try: self.ws.send(json.dumps(heartbeat_payload)) time.sleep(self.heartbeat_interval / 1000) except: pass if self.stop_heartbeat_thread.is_set(): self.stop_heartbeat_thread.clear() break threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start() def start(self): if ( Path( f"{_babase.app.env.python_directory_user}/__pycache__/token.txt" ).exists() or Path(f"{getcwd()}/token.txt").exists() ): try: with open(f"{getcwd()}/token.txt", 'r') as f: token = bytes.fromhex(f.read()).decode('utf-8') except FileNotFoundError: with open(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt", 'r') as f: token = bytes.fromhex(f.read()).decode('utf-8') babase.app.config["token"] = token babase.app.config.commit() if babase.app.config.get("token"): try: while True: urlopen('http://www.google.com', timeout=5) threading.Thread( target=self.ws.run_forever, daemon=True, name="websocket" ).start() return except Exception: return def close(self): self.stop_heartbeat_thread.set() self.do_once = True self.ws.close() if not ANDROID: # installing pypresence def get_module(): install_path = Path(f"{getcwd()}/ba_data/python") path = Path(f"{install_path}/pypresence.tar.gz") file_path = Path(f"{install_path}/pypresence") source_dir = Path(f"{install_path}/pypresence-4.3.0/pypresence") if not file_path.exists(): url = "https://files.pythonhosted.org/packages/f4/2e/d110f862720b5e3ba1b0b719657385fc4151929befa2c6981f48360aa480/pypresence-4.3.0.tar.gz" try: filename, headers = urlretrieve(url, filename=path) with open(filename, "rb") as f: content = f.read() assert ( hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7" ) shutil.unpack_archive(filename, install_path) shutil.copytree(source_dir, file_path) shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0")) remove(path) except: pass get_module() from pypresence import ( DiscordError, DiscordNotFound, ) from pypresence.utils import get_event_loop import pypresence import socket DEBUG = True def print_error(err: str, include_exception: bool = False) -> None: if DEBUG: if include_exception: babase.print_exception(err) else: babase.print_error(err) else: print(f"ERROR in discordrp.py: {err}") def log(msg: str) -> None: if DEBUG: print(f"LOG in discordrp.py: {msg}") def _run_overrides() -> None: old_init = bs.Activity.__init__ def new_init(self, *args: Any, **kwargs: Any) -> None: # type: ignore old_init(self, *args, **kwargs) self._discordrp_start_time = time.mktime(time.localtime()) bs.Activity.__init__ = new_init # type: ignore old_connect = bs.connect_to_party def new_connect(*args, **kwargs) -> None: # type: ignore global _last_server_addr global _last_server_port old_connect(*args, **kwargs) _last_server_addr = kwargs.get("address") or args[0] _last_server_port = kwargs.get("port") or args[1] bs.connect_to_party = new_connect start_time = time.time() class RpcThread(threading.Thread): def __init__(self): super().__init__(name="RpcThread") self.rpc = pypresence.Presence(963434684669382696) self.state: str | None = "In Game" self.details: str | None = "Main Menu" self.start_timestamp = time.mktime(time.localtime()) self.large_image_key: str | None = "bombsquadicon" self.large_image_text: str | None = "BombSquad Icon" self.small_image_key: str | None = None self.small_image_text: str | None = None self.party_id: str = str(uuid.uuid4()) self.party_size = 1 self.party_max = 8 self.join_secret: str | None = None self._last_update_time: float = 0 self._last_secret_update_time: float = 0 self._last_connect_time: float = 0 self.should_close = False @staticmethod def is_discord_running(): for i in range(6463, 6473): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.01) try: conn = s.connect_ex(("localhost", i)) s.close() if conn == 0: s.close() return True except Exception as e: raise (e) s.close() return False def _generate_join_secret(self): # resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text try: connection_info = bs.get_connection_to_host_info if build_number < 21727 else bs.get_connection_to_host_info_2 if connection_info.__call__(): addr = _last_server_addr port = _last_server_port else: with urlopen("https://legacy.ballistica.net/bsAccessCheck") as resp: resp = resp.read().decode() resp = ast.literal_eval(resp) addr = resp["address"] port = resp["port"] addr, port = addr, port secret_dict = { "format_version": 1, "hostname": addr, "port": port, } self.join_secret = json.dumps(secret_dict) print(self.join_secret) except Exception as e: print(e) pass def _update_secret(self): #! use in game thread threading.Thread(target=self._generate_join_secret, daemon=True).start() self._last_secret_update_time = time.time() def run(self) -> None: asyncio.set_event_loop(get_event_loop()) while not self.should_close: if time.time() - self._last_update_time > 0.1: self._do_update_presence() if time.time() - self._last_secret_update_time > 15: self._update_secret() # if time.time() - self._last_connect_time > 120 and is_discord_running(): #!Eric please add module manager(pip) # self._reconnect() time.sleep(0.03) def _subscribe(self, event: str, **args): self.rpc.send_data( 1, { "nonce": f"{time.time():.20f}", "cmd": "SUBSCRIBE", "evt": event, "args": args, }, ) data = self.rpc.loop.run_until_complete(self.rpc.read_output()) self.handle_event(data) def _subscribe_events(self): self._subscribe("ACTIVITY_JOIN") self._subscribe("ACTIVITY_JOIN_REQUEST") # def _update_presence(self) -> None: # self._last_update_time = time.time() # try: # self._do_update_presence() # except (AttributeError, AssertionError): # try: # self._reconnect() # except Exception: # print_error("failed to update presence", include_exception= True) def _reconnect(self) -> None: self.rpc.connect() self._subscribe_events() self._do_update_presence() self._last_connect_time = time.time() def _do_update_presence(self) -> None: if RpcThread.is_discord_running(): self._last_update_time = time.time() try: data = self.rpc.update( state=self.state or " ", details=self.details, start=start_time, large_image=self.large_image_key, large_text=self.large_image_text, small_image=self.small_image_key, small_text=self.small_image_text, party_id=self.party_id, party_size=[self.party_size, self.party_max], join=self.join_secret) # buttons = [ #!cant use buttons together with join # { # "label": "Discord Server", # "url": "https://ballistica.net/discord" # }, # { # "label": "Download Bombsquad", # "url": "https://bombsquad.ga/download"} # ] self.handle_event(data) except Exception: try: self._reconnect() except (DiscordNotFound, DiscordError): pass def handle_event(self, data): evt = data["evt"] if evt is None: return data = data.get("data", {}) if evt == "ACTIVITY_JOIN": secret = data.get("secret") try: server = json.loads(secret) format_version = server["format_version"] except Exception: babase.print_exception("discordrp: unknown activity join format") else: try: if format_version == 1: hostname = server["hostname"] port = server["port"] self._connect_to_party(hostname, port) except Exception: babase.print_exception( f"discordrp: incorrect activity join data, {format_version=}" ) elif evt == "ACTIVITY_JOIN_REQUEST": user = data.get("user", {}) uid = user.get("id") username = user.get("username") discriminator = user.get("discriminator", None) avatar = user.get("avatar") self.on_join_request(username, uid, discriminator, avatar) def _connect_to_party(self, hostname, port) -> None: babase.pushcall( babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True ) def on_join_request(self, username, uid, discriminator, avatar) -> None: del uid # unused del avatar # unused babase.pushcall( babase.Call( bui.screenmessage, "Discord: {}{} wants to join!".format( username, discriminator if discriminator != "#0" else "" ), color=(0.0, 1.0, 0.0), ), from_other_thread=True, ) babase.pushcall( lambda: bui.getsound("bellMed").play(), from_other_thread=True ) class Discordlogin(PopupWindow): def __init__(self): # pylint: disable=too-many-locals _uiscale = bui.app.ui_v1.uiscale self._transitioning_out = False s = ( 1.25 if _uiscale is babase.UIScale.SMALL else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3 ) self._width = 380 * s self._height = 150 + 150 * s bg_color = (0.5, 0.4, 0.6) log_btn_colour = ( (0.10, 0.95, 0.10) if not babase.app.config.get("token") else (1.00, 0.15, 0.15) ) log_txt = "LOG IN" if not babase.app.config.get("token") else "LOG OUT" self.code = False self.resp = "Placeholder" self.headers = { "user-agent": "Mozilla/5.0", "content-type": "application/json", } # creates our _root_widget PopupWindow.__init__( self, position=(0.0, 0.0), size=(self._width, self._height), scale=( 2.1 if _uiscale is babase.UIScale.SMALL else 1.5 if _uiscale is babase.UIScale.MEDIUM else 1.0 ), bg_color=bg_color, ) self._cancel_button = bui.buttonwidget( parent=self.root_widget, position=(25, self._height - 40), size=(50, 50), scale=0.58, label="", color=bg_color, on_activate_call=self._on_cancel_press, autoselect=True, icon=bui.gettexture("crossOut"), iconscale=1.2, ) bui.imagewidget( parent=self.root_widget, position=(180, self._height - 55), size=(32 * s, 32 * s), texture=bui.gettexture("discordLogo"), color=(10 - 0.32, 10 - 0.39, 10 - 0.96), ) self.email_widget = bui.textwidget( parent=self.root_widget, text="Email/Phone Number", size=(400, 70), position=(50, 180), h_align="left", v_align="center", editable=True, scale=0.8, autoselect=True, maxwidth=220, ) self.password_widget = bui.textwidget( parent=self.root_widget, text="Password", size=(400, 70), position=(50, 120), h_align="left", v_align="center", editable=True, scale=0.8, autoselect=True, maxwidth=220, ) bui.containerwidget(edit=self.root_widget, cancel_button=self._cancel_button) bui.textwidget( parent=self.root_widget, position=(265, self._height - 37), size=(0, 0), h_align="center", v_align="center", scale=1.0, text="Discord", maxwidth=200, color=(0.80, 0.80, 0.80), ) bui.textwidget( parent=self.root_widget, position=(265, self._height - 78), size=(0, 0), h_align="center", v_align="center", scale=1.0, text="💀Use at your own risk💀\n ⚠️discord account might get terminated⚠️", maxwidth=200, color=(1.00, 0.15, 0.15), ) self._login_button = bui.buttonwidget( parent=self.root_widget, position=(120, 65), size=(400, 80), scale=0.58, label=log_txt, color=log_btn_colour, on_activate_call=self.login, autoselect=True, ) def _on_cancel_press(self) -> None: self._transition_out() def _transition_out(self) -> None: if not self._transitioning_out: self._transitioning_out = True bui.containerwidget(edit=self.root_widget, transition="out_scale") def on_bascenev1libup_cancel(self) -> None: bui.getsound("swish").play() self._transition_out() def backup_2fa_code(self, tickt): if babase.do_once(): self.email_widget.delete() self.password_widget.delete() self.backup_2fa_widget = bui.textwidget( parent=self.root_widget, text="2FA/Discord Backup code", size=(400, 70), position=(50, 120), h_align="left", v_align="center", editable=True, scale=0.8, autoselect=True, maxwidth=220, ) json_data_2FA = { "code": bui.textwidget(query=self.backup_2fa_widget), "gift_code_sku_id": None, "ticket": tickt, } if json_data_2FA["code"] != "2FA/Discord Backup code": try: payload_2FA = json.dumps(json_data_2FA) conn_2FA = http.client.HTTPSConnection("discord.com") conn_2FA.request( "POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers ) res_2FA = conn_2FA.getresponse().read() token = json.loads(res_2FA)["token"].encode().hex().encode() babase.app.config["token"] = token babase.app.config.commit() bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20)) bui.getsound("shieldUp").play() self.on_bascenev1libup_cancel() PresenceUpdate().start() except: self.code = True bui.screenmessage("Incorrect code", (1.00, 0.15, 0.15)) bui.getsound("error").play() def login(self): if not babase.app.config.get("token") and self.code == False: try: json_data = { "login": bui.textwidget(query=self.email_widget), "password": bui.textwidget(query=self.password_widget), "undelete": False, "captcha_key": None, "login_source": None, "gift_code_sku_id": None, } conn = http.client.HTTPSConnection("discord.com") payload = json.dumps(json_data) # conn.request("POST", "/api/v9/auth/login", payload, headers) # res = conn.getresponse().read() conn.request("POST", "/api/v9/auth/login", payload, self.headers) res = conn.getresponse().read() try: token = json.loads(res)["token"].encode().hex().encode() babase.app.config["token"] = token babase.app.config.commit() bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20)) bui.getsound("shieldUp").play() self.on_bascenev1libup_cancel() PresenceUpdate().start() except KeyError: try: ticket = json.loads(res)["ticket"] bui.screenmessage( "Input your 2FA or Discord Backup code", (0.21, 1.0, 0.20) ) bui.getsound("error").play() self.resp = ticket self.backup_2fa_code(tickt=ticket) self.code = True except KeyError: bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15)) bui.getsound("error").play() except: bui.screenmessage("Connect to the internet", (1.00, 0.15, 0.15)) bui.getsound("error").play() conn.close() elif self.code == True: self.backup_2fa_code(tickt=self.resp) else: self.email_widget.delete() self.password_widget.delete() del babase.app.config["token"] babase.app.config.commit() bui.getsound("shieldDown").play() bui.screenmessage("Account successfully removed!!", (0.10, 0.10, 1.00)) self.on_bascenev1libup_cancel() PresenceUpdate().close() def get_class(): if ANDROID: return PresenceUpdate() elif not ANDROID: return RpcThread() # ba_meta export babase.Plugin class DiscordRP(babase.Plugin): def __init__(self) -> None: self.update_timer: bs.Timer | None = None self.rpc_thread = get_class() self._last_server_info: str | None = None if not ANDROID: _run_overrides() def on_app_running(self) -> None: if not ANDROID: threading.Thread( target=self.rpc_thread.start, daemon=True, name="start_rpc_android" ).start() self.update_timer = bs.AppTimer( 1, bs.WeakCall(self.update_status), repeat=True ) if ANDROID: self.rpc_thread.start() self.update_timer = bs.AppTimer( 4, bs.WeakCall(self.update_status), repeat=True ) def has_settings_ui(self): return True def show_settings_ui(self, button): if not ANDROID: bui.screenmessage( "Nothing here achievement.Only for mobile users!!!", (0.26, 0.65, 0.94) ) bui.getsound("achievement").play() if ANDROID: Discordlogin() def on_app_shutdown(self) -> None: if not ANDROID and self.rpc_thread.is_discord_running(): self.rpc_thread.rpc.close() self.rpc_thread.should_close = True def on_app_pause(self) -> None: self.rpc_thread.close() def on_app_resume(self) -> None: global start_time start_time = time.time() self.rpc_thread.start() def _get_current_activity_name(self) -> str | None: act = bs.get_foreground_host_activity() if isinstance(act, bs.GameActivity): return act.name this = "Lobby" name: str | None = ( act.__class__.__name__.replace("Activity", "") .replace("ScoreScreen", "Ranking") .replace("Coop", "") .replace("MultiTeam", "") .replace("Victory", "") .replace("EndSession", "") .replace("Transition", "") .replace("Draw", "") .replace("FreeForAll", "") .replace("Join", this) .replace("Team", "") .replace("Series", "") .replace("CustomSession", "Custom Session(mod)") ) if name == "MainMenu": name = "Main Menu" if name == this: self.rpc_thread.large_image_key = "lobby" self.rpc_thread.large_image_text = "Bombing up" self.rpc_thread.small_image_key = "lobbysmall" if name == "Ranking": self.rpc_thread.large_image_key = "ranking" self.rpc_thread.large_image_text = "Viewing Results" return name def _get_current_map_name(self) -> Tuple[str | None, str | None]: act = bs.get_foreground_host_activity() if isinstance(act, bs.GameActivity): texname = act.map.get_preview_texture_name() if texname: return act.map.name, texname.lower().removesuffix("preview") return None, None def update_status(self) -> None: roster = bs.get_game_roster() try: connection_info = ( bs.get_connection_to_host_info() if build_number < 21727 else bs.get_connection_to_host_info_2() ) except (RuntimeError, TypeError): pass self.rpc_thread.large_image_key = "bombsquadicon" self.rpc_thread.large_image_text = "BombSquad" self.rpc_thread.small_image_key = _babase.app.classic.platform self.rpc_thread.small_image_text = ( f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})" ) try: if not ANDROID: svinfo = str(connection_info) if self._last_server_info != svinfo: self._last_server_info = svinfo self.rpc_thread.party_id = str(uuid.uuid4()) self.rpc_thread._update_secret() if connection_info: servername = connection_info.name self.rpc_thread.details = "Online" self.rpc_thread.party_size = max( 1, sum(len(client["players"]) for client in roster) ) self.rpc_thread.party_max = max(8, self.rpc_thread.party_size) if len(servername) == 19 and "Private Party" in servername: self.rpc_thread.state = "Private Party" elif servername == "": # A local game joinable from the internet try: offlinename = json.loads( bs.get_game_roster()[0]["spec_string"] )["n"] if len(offlinename) > 19: # Thanks Rikko self.rpc_thread.state = offlinename[slice(19)] + "..." else: self.rpc_thread.state = offlinename except IndexError: pass else: if len(servername) > 19: self.rpc_thread.state = servername[slice(19)] + ".." else: self.rpc_thread.state = servername[slice(19)] if not connection_info: self.rpc_thread.details = ( "Local" # ! replace with something like ballistica github cause ) self.rpc_thread.state = self._get_current_activity_name() self.rpc_thread.party_size = max(1, len(roster)) self.rpc_thread.party_max = max(1, bs.get_public_party_max_size()) if ( bs.get_foreground_host_session() is not None and self.rpc_thread.details == "Local" ): session = ( bs.get_foreground_host_session() .__class__.__name__.replace("MainMenuSession", "") .replace("EndSession", "") .replace("FreeForAllSession", ": FFA") .replace("DualTeamSession", ": Teams") .replace("CoopSession", ": Coop") ) if len(session) > 1: self.rpc_thread.small_image_key = session.replace( ": ", "" ).lower() self.rpc_thread.small_image_text = session.replace(": ", "") self.rpc_thread.details = self.rpc_thread.details if ( self.rpc_thread.state == "NoneType" ): # sometimes the game just breaks which means its not really watching replay FIXME self.rpc_thread.state = "Watching Replay" self.rpc_thread.large_image_key = "replay" self.rpc_thread.large_image_text = "Viewing Awesomeness" self.rpc_thread.small_image_key = "replaysmall" except UnboundLocalError: pass act = bs.get_foreground_host_activity() session = bs.get_foreground_host_session() if act: from bascenev1lib.game.elimination import EliminationGame from bascenev1lib.game.thelaststand import TheLastStandGame from bascenev1lib.game.meteorshower import MeteorShowerGame from bascenev1lib.game.football import FootballCoopGame from bascenev1lib.game.easteregghunt import EasterEggHuntGame # noinspection PyUnresolvedReferences,PyProtectedMember try: self.rpc_thread.start_timestamp = act._discordrp_start_time # type: ignore except AttributeError: # This can be the case if plugin launched AFTER activity # has been created; in that case let's assume it was # created just now. self.rpc_thread.start_timestamp = act._discordrp_start_time = time.mktime( # type: ignore time.localtime() ) if isinstance(act, EliminationGame): alive_count = len([p for p in act.players if p.lives > 0]) self.rpc_thread.details += f" ({alive_count} players left)" elif isinstance(act, TheLastStandGame): # noinspection PyProtectedMember points = act._score self.rpc_thread.details += f" ({points} points)" elif isinstance(act, MeteorShowerGame): with act.context: sec = bs.time() - act._timer.getstarttime() secfmt = "" if sec < 60: secfmt = f"{sec:.2f}" else: secfmt = f"{int(sec) // 60:02}:{sec:.2f}" self.rpc_thread.details += f" ({secfmt})" # elif isinstance(act, OnslaughtGame): # score = act._score # level = act._wavenum # # self. elif isinstance(act, FootballCoopGame): # try: # score = f"{act.teams[0].score} : {act.teams[1].score}" # except IndexError: score = f"{act.teams[0].score} : {act._bot_team.score}" self.rpc_thread.details = score # elif isinstance(act, RunaroundGame) # score = act._score # level = act._wavenum # lives = act._lives elif isinstance(act, EasterEggHuntGame): eggs_collected = len(act._eggs) - 1 self.rpc_thread.details = f"{eggs_collected} eggs collected" # elif isinstance(act, TargetPracticeGame): # #for FFA # scoere = bs.get_foreground_host_activity().players[0].score # if isinstance(session, ba.DualTeamSession): # scores = ':'.join([ # str(t.customdata['score']) # for t in session.sessionteams # ]) # self.rpc_thread.details += f' ({scores})' mapname, short_map_name = self._get_current_map_name() if mapname: asset_keys = MAPNAME_ID.keys() if short_map_name in asset_keys: self.rpc_thread.large_image_text = mapname self.rpc_thread.large_image_key = short_map_name # self.rpc_thread.small_image_key = 'bombsquadlogo2' # self.rpc_thread.small_image_text = 'BombSquad' if _babase.get_idle_time() / (1000 * 60) % 60 >= 0.4: self.rpc_thread.details = f"AFK in {self.rpc_thread.details}" if not ANDROID: self.rpc_thread.large_image_key = ( "https://media.tenor.com/uAqNn6fv7x4AAAAM/bombsquad-spaz.gif" ) if babase.app.config.get("token"): #! This function might cause some errors self.rpc_thread.presence()