# Released under the MIT and Apache License. See LICENSE for details. # """placeholder :clown:""" # ba_meta require api 9 #!"Made to you by @brostos & @Dliwk" # TODO # - Update to the latest libs # - Use account id to hash the tkn from __future__ import annotations from urllib.request import Request, urlopen, urlretrieve from pathlib import Path from os import getcwd, remove from bauiv1lib.popup import PopupWindow import asyncio import sys import http.client import ast import uuid import json import socket import time import threading import shutil import hashlib import babase import _babase import bascenev1 as bs import bascenev1lib import bauiv1 as bui from baenv import TARGET_BALLISTICA_BUILD as build_number from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Tuple MAPNAME_ID = { "bombsquadicon": "963448129900908595", "zigzagpreview": "963448133130522624", "tiptoppreview": "963448133168279582", "towerdpreview": "963448135886200912", "thepadpreview": "963448137916248084", "steprightuppreview": "963448141728862248", "roundaboutpreview": "963448143997972550", "rampagepreview": "963448146422296676", "monkeyfacepreview": "963448151182831626", "footballstadiumpreview": "963448158719983646", "doomshroompreview": "963448160993292368", "cragcastlepreview": "963448163048513536", "courtyardpreview": "963448166127120504", "bridgitpreview": "963448169180565654", "biggpreview": "963448172905127996", "alwayslandpreview": "963448174163423252", "bigg": "1013013392455376977", "bridgit": "1013013400139333632", "courtyard": "1013013410776096788", "cragcastle": "1013013423132528700", "doomshroom": "1013013438223622224", "footballstadium": "1013013452517810226", "hockeystadium": "1013013464060547112", "monkeyface": "1013013477721383023", "rampage": "1013013484830728273", "roundabout": "1013013508323037264", "steprightup": "1013013567768907826", "thepad": "1013013577197699163", "tiptop": "1013013593089904721", "towerd": "1013013604531970131", "zigzag": "1013013618188619816", "bombsquadlogo2": "1013016083701190726", "windows": "1084050785488338984", "linux": "1084078945944739920", "lobby": "1084180821973418226", "ranking": "1084224689272004719", "rampagelevelcolor": "1086989941541703741", "landmine": "1087000404866371766", "rgbstripes": "1087000416492990474", "shrapnel1color": "1087151233225195590", "bonescolor": "1087151164077899928", "bridgitlevelcolor": "1087151178674094182", "crossout": "1087151197963681902", "naturebackgroundcolor": "1087151209896476782", "zigzaglevelcolor": "1087151253206876241", "zoeicon": "1087151266989363240", "bg": "1087564057890000906", "alwayslandlevelcolor": "1087564765406167080", "hockeystadiumpreview": "1087574349285961768", "mac": "1087584375287336992", "flyer": "1087584543147561051", "replay": "1087592122393301102", "coop": "1097697042891018311", "ffa": "1097697050214269008", "lobbysmall": "1097697055926923335", "replaysmall": "1097697062746853386", "teams": "1097697068727935036", "bacongreece": "1097700754623565894", "basketballstadium": "1097700771501441167", "flapland": "1097700783622979664", "alwaysland": "1097700794213613610", "hoveringwood": "1097700802321199224", "jrmponslaught": "1097700810479124520", "jrmprunaround": "1097700817194205286", "lakefrigid": "1097700828023898203", "mushfeud": "1097700836920000594", "pillar_bases": "1097700846340407427", "powerup_factory": "1097700854422851656", "snowballpit": "1097700869673341009", "stoneishfort": "1097700887826272308", "toiletdonut": "1097700898584666193", "whereeaglesdare": "1097700904972587109", "android": "1097728392280932453", } ANDROID = babase.app.classic.platform == "android" APP_VERSION = _babase.app.version if build_number < 21282 else ( _babase.app.env.engine_version if build_number > 21823 else _babase.app.env.version) if ANDROID: # !can add ios in future # Installing websocket def get_module(): install_path = Path( f"{getcwd()}/ba_data/python" ) # For the guys like me on windows path = Path(f"{install_path}/websocket.tar.gz") file_path = Path(f"{install_path}/websocket") source_dir = Path(f"{install_path}/websocket-client-1.6.1/websocket") if not Path(f"{file_path}/__init__.py").exists(): # YouKnowDev url = "https://files.pythonhosted.org/packages/b1/34/3a5cae1e07d9566ad073fa6d169bf22c03a3ba7b31b3c3422ec88d039108/websocket-client-1.6.1.tar.gz" try: # fix issue where the file delete themselves try: shutil.rmtree(file_path) except: pass filename, headers = urlretrieve(url, filename=path) with open(filename, "rb") as f: content = f.read() assert ( hashlib.md5(content).hexdigest() == "86bc69b61947943627afc1b351c0b5db" ) shutil.unpack_archive(filename, install_path, format='gztar') remove(path) shutil.copytree(source_dir, file_path) shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1")) except Exception as e: if type(e) == shutil.Error: shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1")) else: pass get_module() from websocket import WebSocketConnectionClosedException import websocket start_time = time.time() class PresenceUpdate: def __init__(self): self.ws = websocket.WebSocketApp( "wss://gateway.discord.gg/?encoding=json&v=10", on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, ) self.heartbeat_interval = int(41250) self.resume_gateway_url: str | None = None self.session_id: str | None = None self.stop_heartbeat_thread = threading.Event() self.do_once: bool = True self.state: str | None = "In Game" self.details: str | None = "Main Menu" self.start_timestamp = time.time() self.large_image_key: str | None = "bombsquadicon" self.large_image_text: str | None = "BombSquad Icon" self.small_image_key: str | None = None self.small_image_text: str | None = ( f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})" ) self.media_proxy = "mp:/app-assets/963434684669382696/{}.png" self.identify: bool = False self.party_id: str = str(uuid.uuid4()) self.party_size = 1 self.party_max = 8 def presence(self): largetxt = MAPNAME_ID[self.large_image_key] smalltxt = MAPNAME_ID[self.small_image_key] presencepayload = { "op": 3, "d": { "since": start_time, # Fixed the unlimited time bug "status": "online", "afk": "false", "activities": [ { "name": "BombSquad", "type": 0, "application_id": "963434684669382696", "state": self.state, "details": self.details, # "timestamps": {"start": start_time}, "party": { "id": self.party_id, "size": [self.party_size, self.party_max], }, "assets": { "large_image": self.media_proxy.format(largetxt), "large_text": self.large_image_text, "small_image": self.media_proxy.format(smalltxt), "small_text": self.small_image_text, }, "client_info": { "version": 0, "os": "android", "client": "mobile", }, "buttons": ["Discord Server", "Download BombSquad"], "metadata": { "button_urls": [ "https://discord.gg/bombsquad-ballistica-official-1001896771347304639", "https://bombsquad-community.web.app/download", ] }, } ], }, } try: self.ws.send(json.dumps(presencepayload)) except WebSocketConnectionClosedException: pass def on_message(self, ws, message): message = json.loads(message) try: self.heartbeat_interval = message["d"]["heartbeat_interval"] except: pass try: self.resume_gateway_url = message["d"]["resume_gateway_url"] self.session_id = message["d"]["session_id"] except: pass def on_error(self, ws, error): babase.print_exception(error) def on_close(self, ws, close_status_code, close_msg): ( print("Closed Discord Connection Successfully") if close_status_code == 1000 else print( f"Closed Discord Connection with code {close_status_code} and message {close_msg}" ) ) def on_open(self, ws): print("Connected to Discord Websocket") def heartbeats(): """Sending heartbeats to keep the connection alive""" if self.do_once: heartbeat_payload = { "op": 1, "d": 251, } # step two keeping connection alive by sending heart beats and receiving opcode 11 self.ws.send(json.dumps(heartbeat_payload)) self.do_once = False def identify(): """Identifying to the gateway and enable by using user token and the intents we will be using e.g 256->For Presence""" byt_tkn = babase.app.config.get("token") token = bytes.fromhex(byt_tkn).decode("utf-8") identify_payload = { "op": 2, "d": { "token": token, "properties": { "os": "linux", "browser": "Discord Android", "device": "android", }, "intents": 256, }, } # step 3 send an identify self.ws.send(json.dumps(identify_payload)) identify() while True: heartbeat_payload = {"op": 1, "d": self.heartbeat_interval} try: self.ws.send(json.dumps(heartbeat_payload)) time.sleep(self.heartbeat_interval / 1000) except: pass if self.stop_heartbeat_thread.is_set(): self.stop_heartbeat_thread.clear() break threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start() def start(self): if ( Path( f"{_babase.app.env.python_directory_user}/__pycache__/token.txt" ).exists() or Path(f"{getcwd()}/token.txt").exists() ): try: with open(f"{getcwd()}/token.txt", "r") as f: token = bytes.fromhex(f.read()).decode("utf-8") except FileNotFoundError: with open( f"{_babase.app.env.python_directory_user}/__pycache__/token.txt", "r", ) as f: token = bytes.fromhex(f.read()).decode("utf-8") babase.app.config["token"] = token babase.app.config.commit() if babase.app.config.get("token"): try: while True: urlopen("http://www.google.com", timeout=5) threading.Thread( target=self.ws.run_forever, daemon=True, name="websocket" ).start() return except Exception: return def close(self): self.stop_heartbeat_thread.set() self.do_once = True self.ws.close() if not ANDROID: # installing pypresence def get_module(): install_path = Path(f"{getcwd()}/ba_data/python") path = Path(f"{install_path}/pypresence.tar.gz") file_path = Path(f"{install_path}/pypresence") source_dir = Path(f"{install_path}/pypresence-4.3.0/pypresence") if not file_path.exists(): url = "https://files.pythonhosted.org/packages/f4/2e/d110f862720b5e3ba1b0b719657385fc4151929befa2c6981f48360aa480/pypresence-4.3.0.tar.gz" try: filename, headers = urlretrieve(url, filename=path) with open(filename, "rb") as f: content = f.read() assert ( hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7" ) shutil.unpack_archive(filename, install_path, format='gztar') shutil.copytree(source_dir, file_path) shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0")) remove(path) except: pass get_module() from pypresence import PipeClosed, DiscordError, DiscordNotFound from pypresence.utils import get_event_loop import pypresence DEBUG = True def print_error(err: str, include_exception: bool = False) -> None: if DEBUG: if include_exception: babase.print_exception(err) else: babase.print_error(err) else: print(f"ERROR in discordrp.py: {err}") def log(msg: str) -> None: if DEBUG: print(f"LOG in discordrp.py: {msg}") def _run_overrides() -> None: old_init = bs.Activity.__init__ def new_init(self, *args: Any, **kwargs: Any) -> None: # type: ignore old_init(self, *args, **kwargs) self._discordrp_start_time = time.mktime(time.localtime()) bs.Activity.__init__ = new_init # type: ignore old_connect = bs.connect_to_party def new_connect(*args, **kwargs) -> None: # type: ignore global _last_server_addr global _last_server_port old_connect(*args, **kwargs) _last_server_addr = kwargs.get("address") or args[0] # ! Joining a game on same device as host NB check what happens if host is port forwarded you join it and check joining a server port forwarded or not _last_server_port = kwargs.get("port") or args[1] if len(args) > 1 else 43210 bs.connect_to_party = new_connect start_time = time.time() class RpcThread(threading.Thread): def __init__(self): super().__init__(name="RpcThread") self.rpc = pypresence.Presence(963434684669382696) self.state: str | None = "In Game" self.details: str | None = "Main Menu" self.start_timestamp = time.mktime(time.localtime()) self.large_image_key: str | None = "bombsquadicon" self.large_image_text: str | None = "BombSquad Icon" self.small_image_key: str | None = None self.small_image_text: str | None = None self.party_id: str = str(uuid.uuid4()) self.party_size = 1 self.party_max = 8 self.join_secret: str | None = None self._last_update_time: float = 0 self._last_secret_update_time: float = 0 self._last_connect_time: float = 0 self.should_close = False self.connection_to_host_info = None @staticmethod def is_discord_running(): for i in range(6463, 6473): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.01) try: conn = s.connect_ex(("localhost", i)) s.close() if conn == 0: s.close() return True except: s.close() return False def _generate_join_secret(self): # resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text try: connection_info = self.connection_to_host_info if connection_info: addr = _last_server_addr port = _last_server_port else: with urlopen("https://legacy.ballistica.net/bsAccessCheck") as resp: resp = resp.read().decode() resp = ast.literal_eval(resp) addr = resp["address"] port = resp["port"] addr, port = addr, port secret_dict = { "format_version": 1, "hostname": addr, "port": port, } self.join_secret = json.dumps(secret_dict) except Exception as _: pass def _update_secret(self): #! use in game thread threading.Thread(target=self._generate_join_secret, daemon=True).start() self._last_secret_update_time = time.time() def run(self) -> None: if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) asyncio.set_event_loop(get_event_loop()) while not self.should_close: if time.time() - self._last_update_time > 0.1: self._do_update_presence() if time.time() - self._last_secret_update_time > 15: self._update_secret() # if time.time() - self._last_connect_time > 120 and is_discord_running(): #!Eric please add module manager(pip) # self._reconnect() time.sleep(0.03) def _subscribe(self, event: str, **args): self.rpc.send_data( 1, { "nonce": f"{time.time():.20f}", "cmd": "SUBSCRIBE", "evt": event, "args": args, }, ) data = self.rpc.loop.run_until_complete(self.rpc.read_output()) self.handle_event(data) def _subscribe_events(self): self._subscribe("ACTIVITY_JOIN") self._subscribe("ACTIVITY_JOIN_REQUEST") # def _update_presence(self) -> None: # self._last_update_time = time.time() # try: # self._do_update_presence() # except (AttributeError, AssertionError): # try: # self._reconnect() # except Exception: # print_error("failed to update presence", include_exception= True) def _reconnect(self) -> None: self.rpc.connect() self._subscribe_events() self._do_update_presence() self._last_connect_time = time.time() def _do_update_presence(self) -> None: if RpcThread.is_discord_running(): self._last_update_time = time.time() try: data = self.rpc.update( state=self.state or " ", details=self.details, start=start_time, large_image=self.large_image_key, large_text=self.large_image_text, small_image=self.small_image_key, small_text=self.small_image_text, party_id=self.party_id, party_size=[self.party_size, self.party_max], join=self.join_secret, ) # buttons = [ #!cant use buttons together with join # { # "label": "Discord Server", # "url": "https://ballistica.net/discord" # }, # { # "label": "Download Bombsquad", # "url": "https://bombsquad.ga/download"} # ] self.handle_event(data) except (PipeClosed, DiscordError, AssertionError, AttributeError): try: self._reconnect() except (DiscordNotFound, DiscordError): pass def handle_event(self, data): evt = data["evt"] if evt is None: return data = data.get("data", {}) if evt == "ACTIVITY_JOIN": secret = data.get("secret") try: server = json.loads(secret) format_version = server["format_version"] except Exception: babase.print_exception("discordrp: unknown activity join format") else: try: if format_version == 1: hostname = server["hostname"] port = server["port"] self._connect_to_party(hostname, port) except Exception: babase.print_exception( f"discordrp: incorrect activity join data, {format_version=}" ) elif evt == "ACTIVITY_JOIN_REQUEST": user = data.get("user", {}) uid = user.get("id") username = user.get("username") avatar = user.get("avatar") self.on_join_request(username, uid, avatar) def _connect_to_party(self, hostname, port) -> None: babase.pushcall( babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True ) def on_join_request(self, username, uid, avatar) -> None: del uid # unused del avatar # unused babase.pushcall( babase.Call( bui.screenmessage, "Discord: {} wants to join!".format( username ), color=(0.0, 1.0, 0.0), ), from_other_thread=True, ) #! check this function for sound creation error babase.pushcall( lambda: bui.getsound("bellMed").play(), from_other_thread=True ) class Discordlogin(PopupWindow): def __init__(self): # pylint: disable=too-many-locals _uiscale = bui.app.ui_v1.uiscale self._transitioning_out = False s = ( 1.25 if _uiscale is babase.UIScale.SMALL else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3 ) self._width = 380 * s self._height = 150 + 150 * s bg_color = (0.5, 0.4, 0.6) log_btn_colour = ( (0.10, 0.95, 0.10) if not babase.app.config.get("token") else (1.00, 0.15, 0.15) ) log_txt = "LOG IN" if not babase.app.config.get("token") else "LOG OUT" self.code = False self.resp = "Placeholder" self.headers = { "user-agent": "Mozilla/5.0", "content-type": "application/json", } # creates our _root_widget PopupWindow.__init__( self, position=(0.0, 0.0), size=(self._width, self._height), scale=( 2.1 if _uiscale is babase.UIScale.SMALL else 1.5 if _uiscale is babase.UIScale.MEDIUM else 1.0 ), bg_color=bg_color, ) self._cancel_button = bui.buttonwidget( parent=self.root_widget, position=(25, self._height - 40), size=(50, 50), scale=0.58, label="", color=bg_color, on_activate_call=self._on_cancel_press, autoselect=True, icon=bui.gettexture("crossOut"), iconscale=1.2, ) bui.imagewidget( parent=self.root_widget, position=(180, self._height - 55), size=(32 * s, 32 * s), texture=bui.gettexture("discordLogo"), color=(10 - 0.32, 10 - 0.39, 10 - 0.96), ) self.email_widget = bui.textwidget( parent=self.root_widget, text="Email/Phone Number", size=(400, 70), position=(50, 180), h_align="left", v_align="center", editable=True, scale=0.8, autoselect=True, maxwidth=220, ) self.password_widget = bui.textwidget( parent=self.root_widget, text="Password", size=(400, 70), position=(50, 120), h_align="left", v_align="center", editable=True, scale=0.8, autoselect=True, maxwidth=220, ) bui.containerwidget(edit=self.root_widget, cancel_button=self._cancel_button) bui.textwidget( parent=self.root_widget, position=(265, self._height - 37), size=(0, 0), h_align="center", v_align="center", scale=1.0, text="Discord", maxwidth=200, color=(0.80, 0.80, 0.80), ) bui.textwidget( parent=self.root_widget, position=(265, self._height - 78), size=(0, 0), h_align="center", v_align="center", scale=1.0, text="💀Use at your own risk💀\n ⚠︝discord account might get terminated⚠︝", maxwidth=200, color=(1.00, 0.15, 0.15), ) self._login_button = bui.buttonwidget( parent=self.root_widget, position=(120, 65), size=(400, 80), scale=0.58, label=log_txt, color=log_btn_colour, on_activate_call=self.login, autoselect=True, ) def _on_cancel_press(self) -> None: self._transition_out() def _transition_out(self) -> None: if not self._transitioning_out: self._transitioning_out = True bui.containerwidget(edit=self.root_widget, transition="out_scale") def on_bascenev1libup_cancel(self) -> None: bui.getsound("swish").play() self._transition_out() def backup_2fa_code(self, tickt): if babase.do_once(): self.email_widget.delete() self.password_widget.delete() self.backup_2fa_widget = bui.textwidget( parent=self.root_widget, text="2FA/Discord Backup code", size=(400, 70), position=(50, 120), h_align="left", v_align="center", editable=True, scale=0.8, autoselect=True, maxwidth=220, ) json_data_2FA = { "code": bui.textwidget(query=self.backup_2fa_widget), "gift_code_sku_id": None, "ticket": tickt, } if json_data_2FA["code"] != "2FA/Discord Backup code": try: payload_2FA = json.dumps(json_data_2FA) conn_2FA = http.client.HTTPSConnection("discord.com") conn_2FA.request( "POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers ) res_2FA = conn_2FA.getresponse().read() token = json.loads(res_2FA)["token"].encode().hex().encode() babase.app.config["token"] = token babase.app.config.commit() bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20)) bui.getsound("shieldUp").play() self.on_bascenev1libup_cancel() PresenceUpdate().start() except: self.code = True bui.screenmessage("Incorrect code", (1.00, 0.15, 0.15)) bui.getsound("error").play() def login(self): if not babase.app.config.get("token") and self.code == False: try: json_data = { "login": bui.textwidget(query=self.email_widget), "password": bui.textwidget(query=self.password_widget), "undelete": False, "captcha_key": None, "login_source": None, "gift_code_sku_id": None, } conn = http.client.HTTPSConnection("discord.com") payload = json.dumps(json_data) conn.request("POST", "/api/v9/auth/login", payload, self.headers) res = conn.getresponse().read() try: token = json.loads(res)["token"].encode().hex().encode() babase.app.config["token"] = token babase.app.config.commit() bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20)) bui.getsound("shieldUp").play() self.on_bascenev1libup_cancel() PresenceUpdate().start() except KeyError: try: ticket = json.loads(res)["ticket"] bui.screenmessage( "Input your 2FA or Discord Backup code", (0.21, 1.0, 0.20) ) bui.getsound("error").play() self.resp = ticket self.backup_2fa_code(tickt=ticket) self.code = True except KeyError: bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15)) bui.getsound("error").play() except: bui.screenmessage("Connect to the internet", (1.00, 0.15, 0.15)) bui.getsound("error").play() conn.close() elif self.code == True: self.backup_2fa_code(tickt=self.resp) else: self.email_widget.delete() self.password_widget.delete() del babase.app.config["token"] babase.app.config.commit() bui.getsound("shieldDown").play() bui.screenmessage("Account successfully removed!!", (0.10, 0.10, 1.00)) self.on_bascenev1libup_cancel() PresenceUpdate().close() def get_class(): if ANDROID: return PresenceUpdate() elif not ANDROID: return RpcThread() # ba_meta export babase.Plugin class DiscordRP(babase.Plugin): def __init__(self) -> None: self.update_timer: bs.Timer | None = None self.rpc_thread = get_class() self._last_server_info: str | None = None if not ANDROID: _run_overrides() def on_app_running(self) -> None: if not ANDROID: threading.Thread( target=self.rpc_thread.start, daemon=True, name="start_rpc" ).start() self.update_timer = bs.AppTimer( 1, bs.WeakCall(self.update_status), repeat=True ) if ANDROID: self.rpc_thread.start() self.update_timer = bs.AppTimer( 4, bs.WeakCall(self.update_status), repeat=True ) def has_settings_ui(self): if ANDROID: return True else: return False def show_settings_ui(self, button): Discordlogin() def on_app_shutdown(self) -> None: if not ANDROID and self.rpc_thread.is_discord_running(): self.rpc_thread.rpc.close() self.rpc_thread.should_close = True def on_app_pause(self) -> None: self.rpc_thread.close() def on_app_resume(self) -> None: global start_time start_time = time.time() self.rpc_thread.start() def _get_current_activity_name(self) -> str | None: act = bs.get_foreground_host_activity() if isinstance(act, bs.GameActivity): return act.name this = "Lobby" name: str | None = ( act.__class__.__name__.replace("Activity", "") .replace("ScoreScreen", "Ranking") .replace("Coop", "") .replace("MultiTeam", "") .replace("Victory", "") .replace("EndSession", "") .replace("Transition", "") .replace("Draw", "") .replace("FreeForAll", "") .replace("Join", this) .replace("Team", "") .replace("Series", "") .replace("CustomSession", "Custom Session(mod)") ) if name == "MainMenu": name = "Main Menu" if name == this: self.rpc_thread.large_image_key = "lobby" self.rpc_thread.large_image_text = "Bombing up" self.rpc_thread.small_image_key = "lobbysmall" if name == "Ranking": self.rpc_thread.large_image_key = "ranking" self.rpc_thread.large_image_text = "Viewing Results" return name def _get_current_map_name(self) -> Tuple[str | None, str | None]: act = bs.get_foreground_host_activity() if isinstance(act, bs.GameActivity): texname = act.map.get_preview_texture_name() if texname: return act.map.name, texname.lower().removesuffix("preview") return None, None def update_status(self) -> None: roster = bs.get_game_roster() try: connection_info = ( bs.get_connection_to_host_info() if build_number < 21727 else bs.get_connection_to_host_info_2() ) self.rpc_thread.connection_to_host_info = connection_info except (RuntimeError, TypeError): pass self.rpc_thread.large_image_key = "bombsquadicon" self.rpc_thread.large_image_text = "BombSquad" self.rpc_thread.small_image_key = _babase.app.classic.platform self.rpc_thread.small_image_text = ( f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})" ) try: if not ANDROID: svinfo = str(connection_info) if self._last_server_info != svinfo: self._last_server_info = svinfo self.rpc_thread.party_id = str(uuid.uuid4()) self.rpc_thread._update_secret() if connection_info: hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) if bs.get_connection_to_host_info_2().address == local_ip: self.rpc_thread.details = "Local Server" else: self.rpc_thread.details = "Online" servername = connection_info.name self.rpc_thread.party_size = max( 1, sum(len(client["players"]) for client in roster) ) self.rpc_thread.party_max = max(8, self.rpc_thread.party_size) if len(servername) == 19 and "Private Party" in servername: self.rpc_thread.state = "Private Party" elif servername == "": # A local game joinable from the internet try: offlinename = json.loads( bs.get_game_roster()[0]["spec_string"] )["n"] if len(offlinename) > 19: # Thanks Rikko self.rpc_thread.state = offlinename[slice(19)] + "..." else: self.rpc_thread.state = offlinename except IndexError: pass else: if len(servername) > 19: self.rpc_thread.state = servername[slice(19)] + ".." else: self.rpc_thread.state = servername[slice(19)] if not connection_info: self.rpc_thread.details = ( "Local" # ! replace with something like ballistica github cause ) self.rpc_thread.state = self._get_current_activity_name() self.rpc_thread.party_size = max(1, len(roster)) self.rpc_thread.party_max = max(1, bs.get_public_party_max_size()) if ( bs.get_foreground_host_session() is not None and self.rpc_thread.details == "Local" ): session = ( bs.get_foreground_host_session() .__class__.__name__.replace("MainMenuSession", "") .replace("EndSession", "") .replace("FreeForAllSession", ": FFA") .replace("DualTeamSession", ": Teams") .replace("CoopSession", ": Coop") ) if len(session) > 1: self.rpc_thread.small_image_key = session.replace( ": ", "" ).lower() self.rpc_thread.small_image_text = session.replace(": ", "") self.rpc_thread.details = self.rpc_thread.details if ( self.rpc_thread.state == "NoneType" ): # sometimes the game just breaks which means its not really watching replay FIXME self.rpc_thread.state = "Watching Replay" self.rpc_thread.large_image_key = "replay" self.rpc_thread.large_image_text = "Viewing Awesomeness" self.rpc_thread.small_image_key = "replaysmall" except UnboundLocalError: pass act = bs.get_foreground_host_activity() session = bs.get_foreground_host_session() if act: from bascenev1lib.game.elimination import EliminationGame from bascenev1lib.game.thelaststand import TheLastStandGame from bascenev1lib.game.meteorshower import MeteorShowerGame from bascenev1lib.game.football import FootballCoopGame from bascenev1lib.game.easteregghunt import EasterEggHuntGame # noinspection PyUnresolvedReferences,PyProtectedMember try: self.rpc_thread.start_timestamp = act._discordrp_start_time # type: ignore except AttributeError: # This can be the case if plugin launched AFTER activity # has been created; in that case let's assume it was # created just now. self.rpc_thread.start_timestamp = act._discordrp_start_time = time.mktime( # type: ignore time.localtime() ) if isinstance(act, EliminationGame): alive_count = len([p for p in act.players if p.lives > 0]) self.rpc_thread.details += f" ({alive_count} players left)" elif isinstance(act, TheLastStandGame): # noinspection PyProtectedMember points = act._score self.rpc_thread.details += f" ({points} points)" elif isinstance(act, MeteorShowerGame): with act.context: sec = bs.time() - act._timer.getstarttime() secfmt = "" if sec < 60: secfmt = f"{sec:.2f}" else: secfmt = f"{int(sec) // 60:02}:{sec:.2f}" self.rpc_thread.details += f" ({secfmt})" # elif isinstance(act, OnslaughtGame): # score = act._score # level = act._wavenum # # self. elif isinstance(act, FootballCoopGame): # try: # score = f"{act.teams[0].score} : {act.teams[1].score}" # except IndexError: score = f"{act.teams[0].score} : {act._bot_team.score}" self.rpc_thread.details = score # elif isinstance(act, RunaroundGame) # score = act._score # level = act._wavenum # lives = act._lives elif isinstance(act, EasterEggHuntGame): eggs_collected = len(act._eggs) - 1 self.rpc_thread.details = f"{eggs_collected} eggs collected" # elif isinstance(act, TargetPracticeGame): # #for FFA # scoere = bs.get_foreground_host_activity().players[0].score # if isinstance(session, ba.DualTeamSession): # scores = ':'.join([ # str(t.customdata['score']) # for t in session.sessionteams # ]) # self.rpc_thread.details += f' ({scores})' mapname, short_map_name = self._get_current_map_name() if mapname: asset_keys = MAPNAME_ID.keys() if short_map_name in asset_keys: self.rpc_thread.large_image_text = mapname self.rpc_thread.large_image_key = short_map_name # self.rpc_thread.small_image_key = 'bombsquadlogo2' # self.rpc_thread.small_image_text = 'BombSquad' if _babase.get_idle_time() / (1000 * 60) % 60 >= 0.4: self.rpc_thread.details = f"AFK in {self.rpc_thread.details}" if not ANDROID: self.rpc_thread.large_image_key = ( "https://media.tenor.com/uAqNn6fv7x4AAAAM/bombsquad-spaz.gif" ) if babase.app.config.get("token"): #! This function might cause some errors # self.rpc_thread.presence() pass