bombsquad-plugin-manager/plugins/utilities/discord_richpresence.py
2024-08-12 16:32:10 +00:00

1002 lines
41 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Released under the MIT and Apache License. See LICENSE for details.
#
"""placeholder :clown:"""
# ba_meta require api 8
#!"Made to you by @brostos & @Dliwk"
from __future__ import annotations
from urllib.request import Request, urlopen, urlretrieve
from pathlib import Path
from os import getcwd, remove
from bauiv1lib.popup import PopupWindow
import asyncio
import http.client
import ast
import uuid
import json
import time
import threading
import shutil
import hashlib
import babase
import _babase
import bascenev1 as bs
import bascenev1lib
import bauiv1 as bui
from baenv import TARGET_BALLISTICA_BUILD as build_number
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Tuple
ANDROID = babase.app.classic.platform == "android"
DIRPATH = Path(
f"{_babase.app.python_directory_user if build_number < 21282 else _babase.app.env.python_directory_user}/image_id.json")
APP_VERSION = _babase.app.version if build_number < 21282 else (
_babase.app.env.engine_version if build_number > 21823 else _babase.app.env.version)
if ANDROID: # !can add ios in future
# Installing websocket
def get_module():
install_path = Path(f"{getcwd()}/ba_data/python") # For the guys like me on windows
path = Path(f"{install_path}/websocket.tar.gz")
file_path = Path(f"{install_path}/websocket")
source_dir = Path(f"{install_path}/websocket-client-1.6.1/websocket")
if not Path(f"{file_path}/__init__.py").exists(): # YouKnowDev
url = "https://files.pythonhosted.org/packages/b1/34/3a5cae1e07d9566ad073fa6d169bf22c03a3ba7b31b3c3422ec88d039108/websocket-client-1.6.1.tar.gz"
try:
# fix issue where the file delete themselves
try:
shutil.rmtree(file_path)
except:
pass
filename, headers = urlretrieve(url, filename=path)
with open(filename, "rb") as f:
content = f.read()
assert hashlib.md5(content).hexdigest() == "86bc69b61947943627afc1b351c0b5db"
shutil.unpack_archive(filename, install_path)
remove(path)
shutil.copytree(source_dir, file_path)
shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1"))
except Exception as e:
if type(e) == shutil.Error:
shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1"))
else:
pass
get_module()
from websocket import WebSocketConnectionClosedException
import websocket
start_time = time.time()
class PresenceUpdate:
def __init__(self):
self.ws = websocket.WebSocketApp("wss://gateway.discord.gg/?encoding=json&v=10",
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.heartbeat_interval = int(41250)
self.resume_gateway_url: str | None = None
self.session_id: str | None = None
self.stop_heartbeat_thread = threading.Event()
self.do_once = True
self.state: str | None = "In Game"
self.details: str | None = "Main Menu"
self.start_timestamp = time.time()
self.large_image_key: str | None = "bombsquadicon"
self.large_image_text: str | None = "BombSquad Icon"
self.small_image_key: str | None = None
self.small_image_text: str | None = (
f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})")
self.media_proxy = "mp:/app-assets/963434684669382696/{}.png"
self.identify: bool = False
self.party_id: str = str(uuid.uuid4())
self.party_size = 1
self.party_max = 8
def presence(self):
with open(DIRPATH, "r") as maptxt:
largetxt = json.load(maptxt)[self.large_image_key]
with open(DIRPATH, "r") as maptxt:
smalltxt = json.load(maptxt)[self.small_image_key]
presencepayload = {
"op": 3,
"d": {
"since": None, # used to show how long the user went idle will add afk to work with this and then set the status to idle
"status": "online",
"afk": "false",
"activities": [
{
"name": "BombSquad",
"type": 0,
"application_id": "963434684669382696",
"state": self.state,
"details": self.details,
"timestamps": {
"start": start_time
},
"party": {
"id": self.party_id,
"size": [self.party_size, self.party_max]
},
"assets": {
"large_image": self.media_proxy.format(largetxt),
"large_text": self.large_image_text,
"small_image": self.media_proxy.format(smalltxt),
"small_text": self.small_image_text,
},
"client_info": {
"version": 0,
"os": "android",
"client": "mobile",
},
"buttons": ["Discord Server", "Download BombSquad"],
"metadata": {
"button_urls": [
"https://discord.gg/bombsquad-ballistica-official-1001896771347304639",
"https://bombsquad-community.web.app/download",
]
},
}
],
},
}
try:
self.ws.send(json.dumps(presencepayload))
except WebSocketConnectionClosedException:
pass
def on_message(self, ws, message):
message = json.loads(message)
try:
self.heartbeat_interval = message["d"]["heartbeat_interval"]
except:
pass
try:
self.resume_gateway_url = message["d"]["resume_gateway_url"]
self.session_id = message["d"]["session_id"]
except:
pass
def on_error(self, ws, error):
babase.print_exception(error)
def on_close(self, ws, close_status_code, close_msg):
print("Closed Discord Connection Successfully")
def on_open(self, ws):
print("Connected to Discord Websocket")
def heartbeats():
"""Sending heartbeats to keep the connection alive"""
if self.do_once:
heartbeat_payload = {
"op": 1,
"d": 251,
} # step two keeping connection alive by sending heart beats and receiving opcode 11
self.ws.send(json.dumps(heartbeat_payload))
self.do_once = False
def identify():
"""Identifying to the gateway and enable by using user token and the intents we will be using e.g 256->For Presence"""
with open(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt", 'r') as f:
token = bytes.fromhex(f.read()).decode('utf-8')
identify_payload = {
"op": 2,
"d": {
"token": token,
"properties": {
"os": "linux",
"browser": "Discord Android",
"device": "android",
},
"intents": 256,
},
} # step 3 send an identify
self.ws.send(json.dumps(identify_payload))
identify()
while True:
heartbeat_payload = {"op": 1, "d": self.heartbeat_interval}
try:
self.ws.send(json.dumps(heartbeat_payload))
time.sleep(self.heartbeat_interval / 1000)
except:
pass
if self.stop_heartbeat_thread.is_set():
self.stop_heartbeat_thread.clear()
break
threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start()
def start(self):
if Path(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt").exists():
threading.Thread(target=self.ws.run_forever, daemon=True, name="websocket").start()
def close(self):
self.stop_heartbeat_thread.set()
self.do_once = True
self.ws.close()
if not ANDROID:
# installing pypresence
def get_module():
install_path = Path(f"{getcwd()}/ba_data/python")
path = Path(f"{install_path}/pypresence.tar.gz")
file_path = Path(f"{install_path}/pypresence")
source_dir = Path(f"{install_path}/pypresence-4.3.0/pypresence")
if not file_path.exists():
url = "https://files.pythonhosted.org/packages/f4/2e/d110f862720b5e3ba1b0b719657385fc4151929befa2c6981f48360aa480/pypresence-4.3.0.tar.gz"
try:
filename, headers = urlretrieve(url, filename=path)
with open(filename, "rb") as f:
content = f.read()
assert hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7"
shutil.unpack_archive(filename, install_path)
shutil.copytree(source_dir, file_path)
shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0"))
remove(path)
except:
pass
# Make modifications for it to work on windows
if babase.app.classic.platform == "windows":
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
data = file.readlines()
data[45] = """
def get_event_loop(force_fresh=False):
loop = asyncio.ProactorEventLoop() if sys.platform == 'win32' else asyncio.new_event_loop()
if force_fresh:
return loop
try:
running = asyncio.get_running_loop()
except RuntimeError:
return loop
if running.is_closed():
return loop
else:
if sys.platform in ('linux', 'darwin'):
return running
if sys.platform == 'win32':
if isinstance(running, asyncio.ProactorEventLoop):
return running
else:
return loop"""
# Thanks Loup
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "w") as file:
for number, line in enumerate(data):
if number not in range(46, 56):
file.write(line)
# fix the mess i did with the previous
elif file_path.exists():
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
data = file.readlines()
first_line = data[0].rstrip("\n")
if not first_line == '"""Util functions that are needed but messy."""':
shutil.rmtree(file_path)
get_module()
get_module()
from pypresence import PipeClosed, DiscordError, DiscordNotFound
from pypresence.utils import get_event_loop
import pypresence
import socket
DEBUG = True
_last_server_addr = 'localhost'
_last_server_port = 43210
def print_error(err: str, include_exception: bool = False) -> None:
if DEBUG:
if include_exception:
babase.print_exception(err)
else:
babase.print_error(err)
else:
print(f"ERROR in discordrp.py: {err}")
def log(msg: str) -> None:
if DEBUG:
print(f"LOG in discordrp.py: {msg}")
def _run_overrides() -> None:
old_init = bs.Activity.__init__
def new_init(self, *args: Any, **kwargs: Any) -> None: # type: ignore
old_init(self, *args, **kwargs)
self._discordrp_start_time = time.mktime(time.localtime())
bs.Activity.__init__ = new_init # type: ignore
old_connect = bs.connect_to_party
def new_connect(*args, **kwargs) -> None: # type: ignore
global _last_server_addr
global _last_server_port
old_connect(*args, **kwargs)
c = kwargs.get("address") or args[0]
_last_server_port = kwargs.get("port") or args[1]
bs.connect_to_party = new_connect
start_time = time.time()
class RpcThread(threading.Thread):
def __init__(self):
super().__init__(name="RpcThread")
self.rpc = pypresence.Presence(963434684669382696)
self.state: str | None = "In Game"
self.details: str | None = "Main Menu"
self.start_timestamp = time.mktime(time.localtime())
self.large_image_key: str | None = "bombsquadicon"
self.large_image_text: str | None = "BombSquad Icon"
self.small_image_key: str | None = None
self.small_image_text: str | None = None
self.party_id: str = str(uuid.uuid4())
self.party_size = 1
self.party_max = 8
self.join_secret: str | None = None
self._last_update_time: float = 0
self._last_secret_update_time: float = 0
self._last_connect_time: float = 0
self.should_close = False
@staticmethod
def is_discord_running():
for i in range(6463, 6473):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.01)
try:
conn = s.connect_ex(('localhost', i))
s.close()
if (conn == 0):
s.close()
return (True)
except:
s.close()
return (False)
def _generate_join_secret(self):
# resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text
try:
connection_info = bs.get_connection_to_host_info(
) if build_number < 21727 else bs.get_connection_to_host_info_2()
if connection_info:
addr = _last_server_addr
port = _last_server_port
else:
with urlopen(
"https://legacy.ballistica.net/bsAccessCheck"
) as resp:
resp = resp.read().decode()
resp = ast.literal_eval(resp)
addr = resp["address"]
port = 43210
secret_dict = {
"format_version": 1,
"hostname": addr,
"port": port,
}
self.join_secret = json.dumps(secret_dict)
except:
pass
def _update_secret(self):
threading.Thread(target=self._generate_join_secret, daemon=True).start()
self._last_secret_update_time = time.time()
def run(self) -> None:
asyncio.set_event_loop(get_event_loop())
while not self.should_close:
if time.time() - self._last_update_time > 0.1:
self._do_update_presence()
if time.time() - self._last_secret_update_time > 15:
self._update_secret()
# if time.time() - self._last_connect_time > 120 and is_discord_running(): #!Eric please add module manager(pip)
# self._reconnect()
time.sleep(0.03)
def _subscribe(self, event: str, **args):
self.rpc.send_data(
1,
{
"nonce": f"{time.time():.20f}",
"cmd": "SUBSCRIBE",
"evt": event,
"args": args,
},
)
data = self.rpc.loop.run_until_complete(self.rpc.read_output())
self.handle_event(data)
def _subscribe_events(self):
self._subscribe("ACTIVITY_JOIN")
self._subscribe("ACTIVITY_JOIN_REQUEST")
# def _update_presence(self) -> None:
# self._last_update_time = time.time()
# try:
# self._do_update_presence()
# except (AttributeError, AssertionError):
# try:
# self._reconnect()
# except Exception:
# print_error("failed to update presence", include_exception= True)
def _reconnect(self) -> None:
self.rpc.connect()
self._subscribe_events()
self._do_update_presence()
self._last_connect_time = time.time()
def _do_update_presence(self) -> None:
if RpcThread.is_discord_running():
self._last_update_time = time.time()
try:
data = self.rpc.update(
state=self.state or " ",
details=self.details,
start=start_time,
large_image=self.large_image_key,
large_text=self.large_image_text,
small_image=self.small_image_key,
small_text=self.small_image_text,
party_id=self.party_id,
party_size=[self.party_size, self.party_max],
join=self.join_secret,
# buttons = [ #!cant use buttons together with join
# {
# "label": "Discord Server",
# "url": "https://ballistica.net/discord"
# },
# {
# "label": "Download Bombsquad",
# "url": "https://bombsquad.ga/download"}
# ]
)
self.handle_event(data)
except (PipeClosed, DiscordError, AssertionError, AttributeError):
try:
self._reconnect()
except (DiscordNotFound, DiscordError):
pass
def handle_event(self, data):
evt = data["evt"]
if evt is None:
return
data = data.get("data", {})
if evt == "ACTIVITY_JOIN":
secret = data.get("secret")
try:
server = json.loads(secret)
format_version = server["format_version"]
except Exception:
babase.print_exception("discordrp: unknown activity join format")
else:
try:
if format_version == 1:
hostname = server["hostname"]
port = server["port"]
self._connect_to_party(hostname, port)
except Exception:
babase.print_exception(
f"discordrp: incorrect activity join data, {format_version=}"
)
elif evt == "ACTIVITY_JOIN_REQUEST":
user = data.get("user", {})
uid = user.get("id")
username = user.get("username")
discriminator = user.get("discriminator", None)
avatar = user.get("avatar")
self.on_join_request(username, uid, discriminator, avatar)
def _connect_to_party(self, hostname, port) -> None:
babase.pushcall(
babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True
)
def on_join_request(self, username, uid, discriminator, avatar) -> None:
del uid # unused
del avatar # unused
babase.pushcall(
babase.Call(
bui.screenmessage,
"Discord: {}{} wants to join!".format(
username, discriminator if discriminator != "#0" else ""),
color=(0.0, 1.0, 0.0),
),
from_other_thread=True,
)
babase.pushcall(lambda: bui.getsound('bellMed').play(), from_other_thread=True)
class Discordlogin(PopupWindow):
def __init__(self):
# pylint: disable=too-many-locals
_uiscale = bui.app.ui_v1.uiscale
self._transitioning_out = False
s = 1.25 if _uiscale is babase.UIScale.SMALL else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3
self._width = 380 * s
self._height = 150 + 150 * s
self.path = Path(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt")
bg_color = (0.5, 0.4, 0.6)
log_btn_colour = (0.10, 0.95, 0.10) if not self.path.exists() else (1.00, 0.15, 0.15)
log_txt = "LOG IN" if not self.path.exists() else "LOG OUT"
self.code = False
self.resp = "Placeholder"
self.headers = {
'user-agent': "Mozilla/5.0",
'content-type': "application/json",
}
# creates our _root_widget
PopupWindow.__init__(self,
position=(0.0, 0.0),
size=(self._width, self._height),
scale=(2.1 if _uiscale is babase.UIScale.SMALL else 1.5
if _uiscale is babase.UIScale.MEDIUM else 1.0),
bg_color=bg_color)
self._cancel_button = bui.buttonwidget(
parent=self.root_widget,
position=(25, self._height - 40),
size=(50, 50),
scale=0.58,
label='',
color=bg_color,
on_activate_call=self._on_cancel_press,
autoselect=True,
icon=bui.gettexture('crossOut'),
iconscale=1.2)
bui.imagewidget(parent=self.root_widget,
position=(180, self._height - 55),
size=(32 * s, 32 * s),
texture=bui.gettexture("discordLogo"),
color=(10 - 0.32, 10 - 0.39, 10 - 0.96))
self.email_widget = bui.textwidget(parent=self.root_widget,
text="Email/Phone Number",
size=(400, 70),
position=(50, 180),
h_align='left',
v_align='center',
editable=True,
scale=0.8,
autoselect=True,
maxwidth=220)
self.password_widget = bui.textwidget(parent=self.root_widget,
text="Password",
size=(400, 70),
position=(50, 120),
h_align='left',
v_align='center',
editable=True,
scale=0.8,
autoselect=True,
maxwidth=220)
bui.containerwidget(edit=self.root_widget,
cancel_button=self._cancel_button)
bui.textwidget(
parent=self.root_widget,
position=(265, self._height - 37),
size=(0, 0),
h_align='center',
v_align='center',
scale=1.0,
text="Discord",
maxwidth=200,
color=(0.80, 0.80, 0.80))
bui.textwidget(
parent=self.root_widget,
position=(265, self._height - 78),
size=(0, 0),
h_align='center',
v_align='center',
scale=1.0,
text="💀Use at your own risk💀\ndiscord account might get terminated⚠",
maxwidth=200,
color=(1.00, 0.15, 0.15))
self._login_button = bui.buttonwidget(
parent=self.root_widget,
position=(120, 65),
size=(400, 80),
scale=0.58,
label=log_txt,
color=log_btn_colour,
on_activate_call=self.login,
autoselect=True)
def _on_cancel_press(self) -> None:
self._transition_out()
def _transition_out(self) -> None:
if not self._transitioning_out:
self._transitioning_out = True
bui.containerwidget(edit=self.root_widget, transition='out_scale')
def on_bascenev1libup_cancel(self) -> None:
bui.getsound('swish').play()
self._transition_out()
def backup_2fa_code(self, tickt):
if babase.do_once():
self.email_widget.delete()
self.password_widget.delete()
self.backup_2fa_widget = bui.textwidget(parent=self.root_widget,
text="2FA/Discord Backup code",
size=(400, 70),
position=(50, 120),
h_align='left',
v_align='center',
editable=True,
scale=0.8,
autoselect=True,
maxwidth=220)
json_data_2FA = {
"code": bui.textwidget(query=self.backup_2fa_widget),
"gift_code_sku_id": None,
"ticket": tickt,
}
if json_data_2FA['code'] != "2FA/Discord Backup code":
try:
payload_2FA = json.dumps(json_data_2FA)
conn_2FA = http.client.HTTPSConnection("discord.com")
conn_2FA.request("POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers)
res_2FA = conn_2FA.getresponse().read()
token = json.loads(res_2FA)['token'].encode().hex().encode()
with open(self.path, 'wb') as f:
f.write(token)
bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20))
bui.getsound('shieldUp').play()
self.on_bascenev1libup_cancel()
PresenceUpdate().start()
except:
self.code = True
bui.screenmessage("Incorrect code", (1.00, 0.15, 0.15))
bui.getsound('error').play()
def login(self):
if not self.path.exists() and self.code == False:
try:
json_data = {
'login': bui.textwidget(query=self.email_widget),
'password': bui.textwidget(query=self.password_widget),
'undelete': False,
'captcha_key': None,
'login_source': None,
'gift_code_sku_id': None,
}
conn = http.client.HTTPSConnection("discord.com")
payload = json.dumps(json_data)
# conn.request("POST", "/api/v9/auth/login", payload, headers)
# res = conn.getresponse().read()
conn.request("POST", "/api/v9/auth/login", payload, self.headers)
res = conn.getresponse().read()
try:
token = json.loads(res)['token'].encode().hex().encode()
with open(self.path, 'wb') as f:
f.write(token)
bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20))
bui.getsound('shieldUp').play()
self.on_bascenev1libup_cancel()
PresenceUpdate().start()
except KeyError:
try:
ticket = json.loads(res)['ticket']
bui.screenmessage("Input your 2FA or Discord Backup code",
(0.21, 1.0, 0.20))
bui.getsound('error').play()
self.resp = ticket
self.backup_2fa_code(tickt=ticket)
self.code = True
except KeyError:
bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15))
bui.getsound('error').play()
except:
bui.screenmessage("Connect to the internet", (1.00, 0.15, 0.15))
bui.getsound('error').play()
conn.close()
elif self.code == True:
self.backup_2fa_code(tickt=self.resp)
else:
self.email_widget.delete()
self.password_widget.delete()
remove(self.path)
bui.getsound('shieldDown').play()
bui.screenmessage("Account successfully removed!!", (0.10, 0.10, 1.00))
self.on_bascenev1libup_cancel()
PresenceUpdate().close()
run_once = False
def get_once_asset():
global run_once
if run_once:
return
response = Request(
"https://discordapp.com/api/oauth2/applications/963434684669382696/assets",
headers={"User-Agent": "Mozilla/5.0"},
)
try:
with urlopen(response) as assets:
assets = json.loads(assets.read())
asset = []
asset_id = []
for x in assets:
dem = x["name"]
don = x["id"]
asset_id.append(don)
asset.append(dem)
asset_id_dict = dict(zip(asset, asset_id))
with open(DIRPATH, "w") as imagesets:
jsonfile = json.dumps(asset_id_dict)
json.dump(asset_id_dict, imagesets, indent=4)
except:
pass
run_once = True
def get_class():
if ANDROID:
return PresenceUpdate()
elif not ANDROID:
return RpcThread()
# ba_meta export babase.Plugin
class DiscordRP(babase.Plugin):
def __init__(self) -> None:
self.update_timer: bs.Timer | None = None
self.rpc_thread = get_class()
self._last_server_info: str | None = None
if not ANDROID:
_run_overrides()
get_once_asset()
def on_app_running(self) -> None:
if not ANDROID:
self.rpc_thread.start()
self.update_timer = bs.AppTimer(
1, bs.WeakCall(self.update_status), repeat=True
)
if ANDROID:
self.rpc_thread.start()
self.update_timer = bs.AppTimer(
4, bs.WeakCall(self.update_status), repeat=True
)
def has_settings_ui(self):
return True
def show_settings_ui(self, button):
if not ANDROID:
bui.screenmessage("Nothing here achievement!!!", (0.26, 0.65, 0.94))
bui.getsound('achievement').play()
if ANDROID:
Discordlogin()
def on_app_shutdown(self) -> None:
if not ANDROID and self.rpc_thread.is_discord_running():
self.rpc_thread.rpc.close()
self.rpc_thread.should_close = True
def on_app_pause(self) -> None:
self.rpc_thread.close()
def on_app_resume(self) -> None:
global start_time
start_time = time.time()
self.rpc_thread.start()
def _get_current_activity_name(self) -> str | None:
act = bs.get_foreground_host_activity()
if isinstance(act, bs.GameActivity):
return act.name
this = "Lobby"
name: str | None = (
act.__class__.__name__.replace("Activity", "")
.replace("ScoreScreen", "Ranking")
.replace("Coop", "")
.replace("MultiTeam", "")
.replace("Victory", "")
.replace("EndSession", "")
.replace("Transition", "")
.replace("Draw", "")
.replace("FreeForAll", "")
.replace("Join", this)
.replace("Team", "")
.replace("Series", "")
.replace("CustomSession", "Custom Session(mod)")
)
if name == "MainMenu":
name = "Main Menu"
if name == this:
self.rpc_thread.large_image_key = "lobby"
self.rpc_thread.large_image_text = "Bombing up"
# self.rpc_thread.small_image_key = "lobbysmall"
if name == "Ranking":
self.rpc_thread.large_image_key = "ranking"
self.rpc_thread.large_image_text = "Viewing Results"
return name
def _get_current_map_name(self) -> Tuple[str | None, str | None]:
act = bs.get_foreground_host_activity()
if isinstance(act, bs.GameActivity):
texname = act.map.get_preview_texture_name()
if texname:
return act.map.name, texname.lower().removesuffix("preview")
return None, None
def update_status(self) -> None:
roster = bs.get_game_roster()
connection_info = bs.get_connection_to_host_info(
) if build_number < 21727 else bs.get_connection_to_host_info_2()
self.rpc_thread.large_image_key = "bombsquadicon"
self.rpc_thread.large_image_text = "BombSquad"
self.rpc_thread.small_image_key = _babase.app.classic.platform
self.rpc_thread.small_image_text = (
f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})"
)
if not ANDROID:
svinfo = str(connection_info)
if self._last_server_info != svinfo:
self._last_server_info = svinfo
self.rpc_thread.party_id = str(uuid.uuid4())
self.rpc_thread._update_secret()
if connection_info:
servername = connection_info.name
self.rpc_thread.details = "Online"
self.rpc_thread.party_size = max(
1, sum(len(client["players"]) for client in roster)
)
self.rpc_thread.party_max = max(8, self.rpc_thread.party_size)
if len(servername) == 19 and "Private Party" in servername:
self.rpc_thread.state = "Private Party"
elif servername == "": # A local game joinable from the internet
try:
offlinename = json.loads(bs.get_game_roster()[0]["spec_string"])[
"n"
]
if len(offlinename) > 19: # Thanks Rikko
self.rpc_thread.state = offlinename[slice(19)] + "..."
else:
self.rpc_thread.state = offlinename
except IndexError:
pass
else:
if len(servername) > 19:
self.rpc_thread.state = servername[slice(19)] + ".."
else:
self.rpc_thread.state = servername[slice(19)]
if not connection_info:
self.rpc_thread.details = "Local" # ! replace with something like ballistica github cause
self.rpc_thread.state = self._get_current_activity_name()
self.rpc_thread.party_size = max(1, len(roster))
self.rpc_thread.party_max = max(1, bs.get_public_party_max_size())
if (
bs.get_foreground_host_session() is not None
and self.rpc_thread.details == "Local"
):
session = (
bs.get_foreground_host_session()
.__class__.__name__.replace("MainMenuSession", "")
.replace("EndSession", "")
.replace("FreeForAllSession", ": FFA") # ! for session use small image key
.replace("DualTeamSession", ": Teams")
.replace("CoopSession", ": Coop")
)
#! self.rpc_thread.small_image_key = session.lower()
self.rpc_thread.details = f"{self.rpc_thread.details} {session}"
if (
self.rpc_thread.state == "NoneType"
): # sometimes the game just breaks which means its not really watching replay FIXME
self.rpc_thread.state = "Watching Replay"
self.rpc_thread.large_image_key = "replay"
self.rpc_thread.large_image_text = "Viewing Awesomeness"
#!self.rpc_thread.small_image_key = "replaysmall"
act = bs.get_foreground_host_activity()
session = bs.get_foreground_host_session()
if act:
from bascenev1lib.game.elimination import EliminationGame
from bascenev1lib.game.thelaststand import TheLastStandGame
from bascenev1lib.game.meteorshower import MeteorShowerGame
# noinspection PyUnresolvedReferences,PyProtectedMember
try:
self.rpc_thread.start_timestamp = act._discordrp_start_time # type: ignore
except AttributeError:
# This can be the case if plugin launched AFTER activity
# has been created; in that case let's assume it was
# created just now.
self.rpc_thread.start_timestamp = act._discordrp_start_time = time.mktime( # type: ignore
time.localtime()
)
if isinstance(act, EliminationGame):
alive_count = len([p for p in act.players if p.lives > 0])
self.rpc_thread.details += f" ({alive_count} players left)"
elif isinstance(act, TheLastStandGame):
# noinspection PyProtectedMember
points = act._score
self.rpc_thread.details += f" ({points} points)"
elif isinstance(act, MeteorShowerGame):
with act.context:
sec = bs.time() - act._timer.getstarttime()
secfmt = ""
if sec < 60:
secfmt = f"{sec:.2f}"
else:
secfmt = f"{int(sec) // 60:02}:{sec:.2f}"
self.rpc_thread.details += f" ({secfmt})"
# if isinstance(session, ba.DualTeamSession):
# scores = ':'.join([
# str(t.customdata['score'])
# for t in session.sessionteams
# ])
# self.rpc_thread.details += f' ({scores})'
mapname, short_map_name = self._get_current_map_name()
if mapname:
with open(DIRPATH, 'r') as asset_dict:
asset_keys = json.load(asset_dict).keys()
if short_map_name in asset_keys:
self.rpc_thread.large_image_text = mapname
self.rpc_thread.large_image_key = short_map_name
self.rpc_thread.small_image_key = 'bombsquadlogo2'
self.rpc_thread.small_image_text = 'BombSquad'
if _babase.get_idle_time() / (1000 * 60) % 60 >= 0.4:
self.rpc_thread.details = f"AFK in {self.rpc_thread.details}"
if not ANDROID:
self.rpc_thread.large_image_key = (
"https://media.tenor.com/uAqNn6fv7x4AAAAM/bombsquad-spaz.gif"
)
if ANDROID and Path(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt").exists():
self.rpc_thread.presence()