mirror of
https://github.com/bombsquad-community/plugin-manager.git
synced 2025-10-08 14:54:36 +00:00
918 lines
37 KiB
Python
918 lines
37 KiB
Python
# Released under the MIT and Apache License. See LICENSE for details.
|
||
#
|
||
"""placeholder :clown:"""
|
||
|
||
# ba_meta require api 8
|
||
#!"Made to you by @brostos & @Dliwk"
|
||
|
||
|
||
from __future__ import annotations
|
||
from urllib.request import Request, urlopen, urlretrieve
|
||
from pathlib import Path
|
||
from os import getcwd, remove
|
||
from bauiv1lib.popup import PopupWindow
|
||
from babase._mgen.enums import TimeType
|
||
|
||
import asyncio
|
||
import http.client
|
||
import ast
|
||
import uuid
|
||
import json
|
||
import time
|
||
import threading
|
||
import shutil
|
||
import hashlib
|
||
import babase
|
||
import _babase
|
||
import bascenev1 as bs
|
||
import bascenev1lib
|
||
import bauiv1 as bui
|
||
|
||
from typing import TYPE_CHECKING
|
||
|
||
if TYPE_CHECKING:
|
||
from typing import Any, Tuple
|
||
|
||
|
||
ANDROID = babase.app.classic.platform == "android"
|
||
DIRPATH = Path(f"{_babase.app.python_directory_user}/image_id.json")
|
||
|
||
if ANDROID: # !can add ios in future
|
||
|
||
# Installing websocket
|
||
def get_module():
|
||
install_path = Path(f"{getcwd()}/ba_data/python") # For the guys like me on windows
|
||
path = Path(f"{install_path}/websocket.tar.gz")
|
||
file_path = Path(f"{install_path}/websocket")
|
||
source_dir = Path(f"{install_path}/websocket-client-1.6.1/websocket")
|
||
if not file_path.exists():
|
||
url = "https://files.pythonhosted.org/packages/b1/34/3a5cae1e07d9566ad073fa6d169bf22c03a3ba7b31b3c3422ec88d039108/websocket-client-1.6.1.tar.gz"
|
||
try:
|
||
filename, headers = urlretrieve(url, filename=path)
|
||
with open(filename, "rb") as f:
|
||
content = f.read()
|
||
assert hashlib.md5(content).hexdigest() == "86bc69b61947943627afc1b351c0b5db"
|
||
shutil.unpack_archive( filename, install_path)
|
||
shutil.copytree(source_dir, file_path)
|
||
shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1"))
|
||
remove(path)
|
||
except Exception as e:
|
||
if type(e) == shutil.Error:
|
||
shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1"))
|
||
else:
|
||
pass
|
||
get_module()
|
||
|
||
from websocket import WebSocketConnectionClosedException
|
||
import websocket
|
||
|
||
start_time = time.time()
|
||
|
||
class PresenceUpdate:
|
||
def __init__(self):
|
||
self.ws = websocket.WebSocketApp("wss://gateway.discord.gg/?encoding=json&v=10",
|
||
on_open=self.on_open,
|
||
on_message=self.on_message,
|
||
on_error=self.on_error,
|
||
on_close=self.on_close)
|
||
self.heartbeat_interval = int(41250)
|
||
self.resume_gateway_url: str | None = None
|
||
self.session_id: str | None = None
|
||
self.stop_heartbeat_thread = threading.Event()
|
||
self.do_once = True
|
||
self.state: str | None = "In Game"
|
||
self.details: str | None = "Main Menu"
|
||
self.start_timestamp = time.time()
|
||
self.large_image_key: str | None = "bombsquadicon"
|
||
self.large_image_text: str | None = "BombSquad Icon"
|
||
self.small_image_key: str | None = None
|
||
self.small_image_text: str | None = (
|
||
f"{_babase.app.classic.platform.capitalize()}({_babase.app.version})")
|
||
self.media_proxy = "mp:/app-assets/963434684669382696/{}.png"
|
||
self.identify: bool = False
|
||
self.party_id: str = str(uuid.uuid4())
|
||
self.party_size = 1
|
||
self.party_max = 8
|
||
|
||
def presence(self):
|
||
with open(DIRPATH, "r") as maptxt:
|
||
largetxt = json.load(maptxt)[self.large_image_key]
|
||
with open(DIRPATH, "r") as maptxt:
|
||
smalltxt = json.load(maptxt)[self.small_image_key]
|
||
|
||
presencepayload = {
|
||
"op": 3,
|
||
"d": {
|
||
"since": None, # used to show how long the user went idle will add afk to work with this and then set the status to idle
|
||
"status": "online",
|
||
"afk": "false",
|
||
"activities": [
|
||
{
|
||
"name": "BombSquad",
|
||
"type": 0,
|
||
"application_id": "963434684669382696",
|
||
"state": self.state,
|
||
"details": self.details,
|
||
"timestamps": {
|
||
"start": start_time
|
||
},
|
||
"party": {
|
||
"id": self.party_id,
|
||
"size": [self.party_size, self.party_max]
|
||
},
|
||
"assets": {
|
||
"large_image": self.media_proxy.format(largetxt),
|
||
"large_text": self.large_image_text,
|
||
"small_image": self.media_proxy.format(smalltxt),
|
||
"small_text": self.small_image_text,
|
||
},
|
||
"client_info": {
|
||
"version": 0,
|
||
"os": "android",
|
||
"client": "mobile",
|
||
},
|
||
"buttons": ["Discord Server", "Download BombSquad"],
|
||
"metadata": {
|
||
"button_urls": [
|
||
"https://discord.gg/bombsquad-ballistica-official-1001896771347304639",
|
||
"https://bombsquad-community.web.app/download",
|
||
]
|
||
},
|
||
}
|
||
],
|
||
},
|
||
}
|
||
try:
|
||
self.ws.send(json.dumps(presencepayload))
|
||
except WebSocketConnectionClosedException:
|
||
pass
|
||
|
||
def on_message(self, ws, message):
|
||
message = json.loads(message)
|
||
try:
|
||
self.heartbeat_interval = message["d"]["heartbeat_interval"]
|
||
except:
|
||
pass
|
||
try:
|
||
self.resume_gateway_url = message["d"]["resume_gateway_url"]
|
||
self.session_id = message["d"]["session_id"]
|
||
except:
|
||
pass
|
||
|
||
def on_error(self, ws, error):
|
||
babase.print_exception(error)
|
||
|
||
def on_close(self, ws, close_status_code, close_msg):
|
||
print("Closed Discord Connection Successfully")
|
||
|
||
def on_open(self, ws):
|
||
print("Connected to Discord Websocket")
|
||
|
||
def heartbeats():
|
||
"""Sending heartbeats to keep the connection alive"""
|
||
if self.do_once:
|
||
heartbeat_payload = {
|
||
"op": 1,
|
||
"d": 251,
|
||
} # step two keeping connection alive by sending heart beats and receiving opcode 11
|
||
self.ws.send(json.dumps(heartbeat_payload))
|
||
self.do_once = False
|
||
|
||
def identify():
|
||
"""Identifying to the gateway and enable by using user token and the intents we will be using e.g 256->For Presence"""
|
||
with open(f"{getcwd()}/token.txt", 'r') as f:
|
||
token = bytes.fromhex(f.read()).decode('utf-8')
|
||
identify_payload = {
|
||
"op": 2,
|
||
"d": {
|
||
"token": token,
|
||
"properties": {
|
||
"os": "linux",
|
||
"browser": "Discord Android",
|
||
"device": "android",
|
||
},
|
||
"intents": 256,
|
||
},
|
||
} # step 3 send an identify
|
||
self.ws.send(json.dumps(identify_payload))
|
||
identify()
|
||
while True:
|
||
heartbeat_payload = {"op": 1, "d": self.heartbeat_interval}
|
||
|
||
try:
|
||
self.ws.send(json.dumps(heartbeat_payload))
|
||
time.sleep(self.heartbeat_interval / 1000)
|
||
except:
|
||
pass
|
||
|
||
if self.stop_heartbeat_thread.is_set():
|
||
self.stop_heartbeat_thread.clear()
|
||
break
|
||
|
||
threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start()
|
||
|
||
def start(self):
|
||
if Path(f"{getcwd()}/token.txt").exists():
|
||
threading.Thread(target=self.ws.run_forever, daemon=True, name="websocket").start()
|
||
|
||
def close(self):
|
||
self.stop_heartbeat_thread.set()
|
||
self.do_once = True
|
||
self.ws.close()
|
||
|
||
|
||
if not ANDROID:
|
||
# installing pypresence
|
||
def get_module():
|
||
install_path = Path(f"{getcwd()}/ba_data/python")
|
||
path = Path(f"{install_path}/pypresence.tar.gz")
|
||
file_path = Path(f"{install_path}/pypresence")
|
||
source_dir = Path(f"{install_path}/pypresence-4.3.0/pypresence")
|
||
if not file_path.exists():
|
||
url = "https://files.pythonhosted.org/packages/f4/2e/d110f862720b5e3ba1b0b719657385fc4151929befa2c6981f48360aa480/pypresence-4.3.0.tar.gz"
|
||
try:
|
||
filename, headers = urlretrieve(url, filename=path)
|
||
with open(filename, "rb") as f:
|
||
content = f.read()
|
||
assert hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7"
|
||
shutil.unpack_archive( filename, install_path)
|
||
shutil.copytree(source_dir, file_path)
|
||
shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0"))
|
||
remove(path)
|
||
except:
|
||
pass
|
||
|
||
# Make modifications for it to work on windows
|
||
if babase.app.classic.platform == "windows":
|
||
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
|
||
data = file.readlines()
|
||
data[45] = """
|
||
def get_event_loop(force_fresh=False):
|
||
loop = asyncio.ProactorEventLoop() if sys.platform == 'win32' else asyncio.new_event_loop()
|
||
if force_fresh:
|
||
return loop
|
||
try:
|
||
running = asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
return loop
|
||
if running.is_closed():
|
||
return loop
|
||
else:
|
||
if sys.platform in ('linux', 'darwin'):
|
||
return running
|
||
if sys.platform == 'win32':
|
||
if isinstance(running, asyncio.ProactorEventLoop):
|
||
return running
|
||
else:
|
||
return loop"""
|
||
|
||
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "w") as file:
|
||
for number, line in enumerate(data):
|
||
if number not in range(46, 56):
|
||
file.write(line)
|
||
get_module()
|
||
|
||
from pypresence import PipeClosed, DiscordError, DiscordNotFound
|
||
from pypresence.utils import get_event_loop
|
||
import pypresence
|
||
import socket
|
||
|
||
DEBUG = True
|
||
|
||
_last_server_addr = 'localhost'
|
||
_last_server_port = 43210
|
||
|
||
def print_error(err: str, include_exception: bool = False) -> None:
|
||
if DEBUG:
|
||
if include_exception:
|
||
babase.print_exception(err)
|
||
else:
|
||
babase.print_error(err)
|
||
else:
|
||
print(f"ERROR in discordrp.py: {err}")
|
||
|
||
def log(msg: str) -> None:
|
||
if DEBUG:
|
||
print(f"LOG in discordrp.py: {msg}")
|
||
|
||
def _run_overrides() -> None:
|
||
old_init = bs.Activity.__init__
|
||
|
||
def new_init(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||
old_init(self, *args, **kwargs)
|
||
self._discordrp_start_time = time.mktime(time.localtime())
|
||
|
||
bs.Activity.__init__ = new_init # type: ignore
|
||
|
||
old_connect = bs.connect_to_party
|
||
|
||
def new_connect(*args, **kwargs) -> None: # type: ignore
|
||
global _last_server_addr
|
||
global _last_server_port
|
||
old_connect(*args, **kwargs)
|
||
c = kwargs.get("address") or args[0]
|
||
_last_server_port = kwargs.get("port") or args[1]
|
||
|
||
bs.connect_to_party = new_connect
|
||
|
||
start_time = time.time()
|
||
|
||
class RpcThread(threading.Thread):
|
||
def __init__(self):
|
||
super().__init__(name="RpcThread")
|
||
self.rpc = pypresence.Presence(963434684669382696)
|
||
self.state: str | None = "In Game"
|
||
self.details: str | None = "Main Menu"
|
||
self.start_timestamp = time.mktime(time.localtime())
|
||
self.large_image_key: str | None = "bombsquadicon"
|
||
self.large_image_text: str | None = "BombSquad Icon"
|
||
self.small_image_key: str | None = None
|
||
self.small_image_text: str | None = None
|
||
self.party_id: str = str(uuid.uuid4())
|
||
self.party_size = 1
|
||
self.party_max = 8
|
||
self.join_secret: str | None = None
|
||
self._last_update_time: float = 0
|
||
self._last_secret_update_time: float = 0
|
||
self._last_connect_time: float = 0
|
||
self.should_close = False
|
||
|
||
@staticmethod
|
||
def is_discord_running():
|
||
for i in range(6463, 6473):
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
s.settimeout(0.01)
|
||
try:
|
||
conn = s.connect_ex(('localhost', i))
|
||
s.close()
|
||
if (conn == 0):
|
||
s.close()
|
||
return (True)
|
||
except:
|
||
s.close()
|
||
return (False)
|
||
|
||
def _generate_join_secret(self):
|
||
# resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text
|
||
connection_info = bs.get_connection_to_host_info()
|
||
if connection_info:
|
||
addr = _last_server_addr
|
||
port = _last_server_port
|
||
else:
|
||
try:
|
||
with urlopen(
|
||
"https://legacy.ballistica.net/bsAccessCheck"
|
||
) as resp:
|
||
resp = resp.read().decode()
|
||
resp = ast.literal_eval(resp)
|
||
addr = resp["address"]
|
||
port = 43210
|
||
secret_dict = {
|
||
"format_version": 1,
|
||
"hostname": addr,
|
||
"port": port,
|
||
}
|
||
self.join_secret = json.dumps(secret_dict)
|
||
except:
|
||
pass
|
||
|
||
def _update_secret(self):
|
||
threading.Thread(target=self._generate_join_secret, daemon=True).start()
|
||
self._last_secret_update_time = time.time()
|
||
|
||
def run(self) -> None:
|
||
asyncio.set_event_loop(get_event_loop())
|
||
while not self.should_close:
|
||
if time.time() - self._last_update_time > 0.1:
|
||
self._do_update_presence()
|
||
if time.time() - self._last_secret_update_time > 15:
|
||
self._update_secret()
|
||
# if time.time() - self._last_connect_time > 120 and is_discord_running(): #!Eric please add module manager(pip)
|
||
# self._reconnect()
|
||
time.sleep(0.03)
|
||
|
||
def _subscribe(self, event: str, **args):
|
||
self.rpc.send_data(
|
||
1,
|
||
{
|
||
"nonce": f"{time.time():.20f}",
|
||
"cmd": "SUBSCRIBE",
|
||
"evt": event,
|
||
"args": args,
|
||
},
|
||
)
|
||
data = self.rpc.loop.run_until_complete(self.rpc.read_output())
|
||
self.handle_event(data)
|
||
|
||
def _subscribe_events(self):
|
||
self._subscribe("ACTIVITY_JOIN")
|
||
self._subscribe("ACTIVITY_JOIN_REQUEST")
|
||
|
||
# def _update_presence(self) -> None:
|
||
# self._last_update_time = time.time()
|
||
# try:
|
||
# self._do_update_presence()
|
||
# except (AttributeError, AssertionError):
|
||
# try:
|
||
# self._reconnect()
|
||
# except Exception:
|
||
# print_error("failed to update presence", include_exception= True)
|
||
|
||
def _reconnect(self) -> None:
|
||
self.rpc.connect()
|
||
self._subscribe_events()
|
||
self._do_update_presence()
|
||
self._last_connect_time = time.time()
|
||
|
||
def _do_update_presence(self) -> None:
|
||
if RpcThread.is_discord_running():
|
||
self._last_update_time = time.time()
|
||
try:
|
||
data = self.rpc.update(
|
||
state=self.state or " ",
|
||
details=self.details,
|
||
start=start_time,
|
||
large_image=self.large_image_key,
|
||
large_text=self.large_image_text,
|
||
small_image=self.small_image_key,
|
||
small_text=self.small_image_text,
|
||
party_id=self.party_id,
|
||
party_size=[self.party_size, self.party_max],
|
||
join=self.join_secret,
|
||
# buttons = [ #!cant use buttons together with join
|
||
# {
|
||
# "label": "Discord Server",
|
||
# "url": "https://ballistica.net/discord"
|
||
# },
|
||
# {
|
||
# "label": "Download Bombsquad",
|
||
# "url": "https://bombsquad.ga/download"}
|
||
# ]
|
||
)
|
||
|
||
self.handle_event(data)
|
||
except (PipeClosed, DiscordError, AssertionError, AttributeError):
|
||
try:
|
||
self._reconnect()
|
||
except (DiscordNotFound, DiscordError):
|
||
pass
|
||
|
||
def handle_event(self, data):
|
||
evt = data["evt"]
|
||
if evt is None:
|
||
return
|
||
|
||
data = data.get("data", {})
|
||
|
||
if evt == "ACTIVITY_JOIN":
|
||
secret = data.get("secret")
|
||
try:
|
||
server = json.loads(secret)
|
||
format_version = server["format_version"]
|
||
except Exception:
|
||
babase.print_exception("discordrp: unknown activity join format")
|
||
else:
|
||
try:
|
||
if format_version == 1:
|
||
hostname = server["hostname"]
|
||
port = server["port"]
|
||
self._connect_to_party(hostname, port)
|
||
except Exception:
|
||
babase.print_exception(
|
||
f"discordrp: incorrect activity join data, {format_version=}"
|
||
)
|
||
|
||
elif evt == "ACTIVITY_JOIN_REQUEST":
|
||
user = data.get("user", {})
|
||
uid = user.get("id")
|
||
username = user.get("username")
|
||
discriminator = user.get("discriminator", None)
|
||
avatar = user.get("avatar")
|
||
self.on_join_request(username, uid, discriminator, avatar)
|
||
|
||
def _connect_to_party(self, hostname, port) -> None:
|
||
babase.pushcall(
|
||
babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True
|
||
)
|
||
|
||
def on_join_request(self, username, uid, discriminator, avatar) -> None:
|
||
del uid # unused
|
||
del avatar # unused
|
||
babase.pushcall(
|
||
babase.Call(
|
||
bui.screenmessage,
|
||
"Discord: {} wants to join!".format(username),
|
||
color=(0.0, 1.0, 0.0),
|
||
),
|
||
from_other_thread=True,
|
||
)
|
||
babase.pushcall(lambda: bui.getsound('bellMed').play(), from_other_thread=True)
|
||
|
||
|
||
class Discordlogin(PopupWindow):
|
||
|
||
def __init__(self):
|
||
# pylint: disable=too-many-locals
|
||
_uiscale = bui.app.ui_v1.uiscale
|
||
self._transitioning_out = False
|
||
s = 1.25 if _uiscale is babase.UIScale.SMALL else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3
|
||
self._width = 380 * s
|
||
self._height = 150 + 150 * s
|
||
self.path = Path(f"{getcwd()}/token.txt")
|
||
bg_color = (0.5, 0.4, 0.6)
|
||
log_btn_colour = (0.10, 0.95, 0.10) if not self.path.exists() else (1.00, 0.15, 0.15)
|
||
log_txt = "LOG IN" if not self.path.exists() else "LOG OUT"
|
||
|
||
# creates our _root_widget
|
||
PopupWindow.__init__(self,
|
||
position=(0.0, 0.0),
|
||
size=(self._width, self._height),
|
||
scale=(2.1 if _uiscale is babase.UIScale.SMALL else 1.5
|
||
if _uiscale is babase.UIScale.MEDIUM else 1.0),
|
||
bg_color=bg_color)
|
||
|
||
self._cancel_button = bui.buttonwidget(
|
||
parent=self.root_widget,
|
||
position=(25, self._height - 40),
|
||
size=(50, 50),
|
||
scale=0.58,
|
||
label='',
|
||
color=bg_color,
|
||
on_activate_call=self._on_cancel_press,
|
||
autoselect=True,
|
||
icon=bui.gettexture('crossOut'),
|
||
iconscale=1.2)
|
||
|
||
bui.imagewidget(parent=self.root_widget,
|
||
position=(180, self._height - 55),
|
||
size=(32 * s, 32 * s),
|
||
texture=bui.gettexture("discordLogo"),
|
||
color=(10 - 0.32, 10 - 0.39, 10 - 0.96))
|
||
|
||
self.email_widget = bui.textwidget(parent=self.root_widget,
|
||
text="Email/Phone Number",
|
||
size=(400, 70),
|
||
position=(50, 180),
|
||
h_align='left',
|
||
v_align='center',
|
||
editable=True,
|
||
scale=0.8,
|
||
autoselect=True,
|
||
maxwidth=220)
|
||
|
||
self.password_widget = bui.textwidget(parent=self.root_widget,
|
||
text="Password",
|
||
size=(400, 70),
|
||
position=(50, 120),
|
||
h_align='left',
|
||
v_align='center',
|
||
editable=True,
|
||
scale=0.8,
|
||
autoselect=True,
|
||
maxwidth=220)
|
||
|
||
bui.containerwidget(edit=self.root_widget,
|
||
cancel_button=self._cancel_button)
|
||
|
||
bui.textwidget(
|
||
parent=self.root_widget,
|
||
position=(265, self._height - 37),
|
||
size=(0, 0),
|
||
h_align='center',
|
||
v_align='center',
|
||
scale=1.0,
|
||
text="Discord",
|
||
maxwidth=200,
|
||
color=(0.80, 0.80, 0.80))
|
||
|
||
bui.textwidget(
|
||
parent=self.root_widget,
|
||
position=(265, self._height - 78),
|
||
size=(0, 0),
|
||
h_align='center',
|
||
v_align='center',
|
||
scale=1.0,
|
||
text="💀Use at your own risk💀\n ⚠️discord account might get terminated⚠️",
|
||
maxwidth=200,
|
||
color=(1.00, 0.15, 0.15))
|
||
|
||
self._login_button = bui.buttonwidget(
|
||
parent=self.root_widget,
|
||
position=(120, 65),
|
||
size=(400, 80),
|
||
scale=0.58,
|
||
label=log_txt,
|
||
color=log_btn_colour,
|
||
on_activate_call=self.login,
|
||
autoselect=True)
|
||
|
||
def _on_cancel_press(self) -> None:
|
||
self._transition_out()
|
||
|
||
def _transition_out(self) -> None:
|
||
if not self._transitioning_out:
|
||
self._transitioning_out = True
|
||
bui.containerwidget(edit=self.root_widget, transition='out_scale')
|
||
|
||
def on_bascenev1libup_cancel(self) -> None:
|
||
bui.getsound('swish').play()
|
||
self._transition_out()
|
||
|
||
def login(self):
|
||
if not self.path.exists():
|
||
json_data = {
|
||
'login': bui.textwidget(query=self.email_widget),
|
||
'password': bui.textwidget(query=self.password_widget),
|
||
'undelete': False,
|
||
'captcha_key': None,
|
||
'login_source': None,
|
||
'gift_code_sku_id': None,
|
||
}
|
||
headers = {
|
||
'user-agent': "Mozilla/5.0",
|
||
'content-type': "application/json",
|
||
}
|
||
|
||
conn = http.client.HTTPSConnection("discord.com")
|
||
|
||
payload = json.dumps(json_data)
|
||
# conn.request("POST", "/api/v9/auth/login", payload, headers)
|
||
# res = conn.getresponse().read()
|
||
|
||
try:
|
||
conn.request("POST", "/api/v9/auth/login", payload, headers)
|
||
res = conn.getresponse().read()
|
||
token = json.loads(res)['token'].encode().hex().encode()
|
||
with open(self.path, 'wb') as f:
|
||
f.write(token)
|
||
bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20))
|
||
bui.getsound('shieldUp').play()
|
||
self.on_bascenev1libup_cancel()
|
||
except:
|
||
bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15))
|
||
bui.getsound('error').play()
|
||
|
||
conn.close()
|
||
else:
|
||
remove(self.path)
|
||
bui.getsound('shieldDown').play()
|
||
bui.screenmessage("Account successfully removed!!", (0.10, 0.10, 1.00))
|
||
self.on_bascenev1libup_cancel()
|
||
PresenceUpdate().ws.close()
|
||
|
||
|
||
run_once = False
|
||
|
||
|
||
def get_once_asset():
|
||
global run_once
|
||
if run_once:
|
||
return
|
||
response = Request(
|
||
"https://discordapp.com/api/oauth2/applications/963434684669382696/assets",
|
||
headers={"User-Agent": "Mozilla/5.0"},
|
||
)
|
||
try:
|
||
with urlopen(response) as assets:
|
||
assets = json.loads(assets.read())
|
||
asset = []
|
||
asset_id = []
|
||
for x in assets:
|
||
dem = x["name"]
|
||
don = x["id"]
|
||
asset_id.append(don)
|
||
asset.append(dem)
|
||
asset_id_dict = dict(zip(asset, asset_id))
|
||
|
||
with open(DIRPATH, "w") as imagesets:
|
||
jsonfile = json.dumps(asset_id_dict)
|
||
json.dump(asset_id_dict, imagesets, indent=4)
|
||
except:
|
||
pass
|
||
run_once = True
|
||
|
||
|
||
def get_class():
|
||
if ANDROID:
|
||
return PresenceUpdate()
|
||
elif not ANDROID:
|
||
return RpcThread()
|
||
|
||
|
||
# ba_meta export babase.Plugin
|
||
class DiscordRP(babase.Plugin):
|
||
def __init__(self) -> None:
|
||
self.update_timer: bs.Timer | None = None
|
||
self.rpc_thread = get_class()
|
||
self._last_server_info: str | None = None
|
||
|
||
if not ANDROID:
|
||
_run_overrides()
|
||
get_once_asset()
|
||
|
||
def on_app_running(self) -> None:
|
||
if not ANDROID:
|
||
self.rpc_thread.start()
|
||
self.update_timer = bs.AppTimer(
|
||
1, bs.WeakCall(self.update_status), repeat=True
|
||
)
|
||
if ANDROID:
|
||
self.rpc_thread.start()
|
||
self.update_timer = bs.AppTimer(
|
||
4, bs.WeakCall(self.update_status), repeat=True
|
||
)
|
||
|
||
def has_settings_ui(self):
|
||
return True
|
||
|
||
def show_settings_ui(self, button):
|
||
if not ANDROID:
|
||
bui.screenmessage("Nothing here achievement!!!", (0.26, 0.65, 0.94))
|
||
bui.getsound('achievement').play()
|
||
if ANDROID:
|
||
Discordlogin()
|
||
|
||
def on_app_shutdown(self) -> None:
|
||
if not ANDROID and self.rpc_thread.is_discord_running():
|
||
self.rpc_thread.rpc.close()
|
||
self.rpc_thread.should_close = True
|
||
|
||
def on_app_pause(self) -> None:
|
||
self.rpc_thread.close()
|
||
|
||
def on_app_resume(self) -> None:
|
||
global start_time
|
||
start_time = time.time()
|
||
self.rpc_thread.start()
|
||
|
||
def _get_current_activity_name(self) -> str | None:
|
||
act = bs.get_foreground_host_activity()
|
||
if isinstance(act, bs.GameActivity):
|
||
return act.name
|
||
|
||
this = "Lobby"
|
||
name: str | None = (
|
||
act.__class__.__name__.replace("Activity", "")
|
||
.replace("ScoreScreen", "Ranking")
|
||
.replace("Coop", "")
|
||
.replace("MultiTeam", "")
|
||
.replace("Victory", "")
|
||
.replace("EndSession", "")
|
||
.replace("Transition", "")
|
||
.replace("Draw", "")
|
||
.replace("FreeForAll", "")
|
||
.replace("Join", this)
|
||
.replace("Team", "")
|
||
.replace("Series", "")
|
||
.replace("CustomSession", "Custom Session(mod)")
|
||
)
|
||
|
||
if name == "MainMenu":
|
||
name = "Main Menu"
|
||
if name == this:
|
||
self.rpc_thread.large_image_key = "lobby"
|
||
self.rpc_thread.large_image_text = "Bombing up"
|
||
#self.rpc_thread.small_image_key = "lobbysmall"
|
||
if name == "Ranking":
|
||
self.rpc_thread.large_image_key = "ranking"
|
||
self.rpc_thread.large_image_text = "Viewing Results"
|
||
return name
|
||
|
||
def _get_current_map_name(self) -> Tuple[str | None, str | None]:
|
||
act = bs.get_foreground_host_activity()
|
||
if isinstance(act, bs.GameActivity):
|
||
texname = act.map.get_preview_texture_name()
|
||
if texname:
|
||
return act.map.name, texname.lower().removesuffix("preview")
|
||
return None, None
|
||
|
||
def update_status(self) -> None:
|
||
roster = bs.get_game_roster()
|
||
connection_info = bs.get_connection_to_host_info()
|
||
|
||
self.rpc_thread.large_image_key = "bombsquadicon"
|
||
self.rpc_thread.large_image_text = "BombSquad"
|
||
self.rpc_thread.small_image_key = _babase.app.classic.platform
|
||
self.rpc_thread.small_image_text = (
|
||
f"{_babase.app.classic.platform.capitalize()}({_babase.app.version})"
|
||
)
|
||
connection_info = bs.get_connection_to_host_info()
|
||
if not ANDROID:
|
||
svinfo = str(connection_info)
|
||
if self._last_server_info != svinfo:
|
||
self._last_server_info = svinfo
|
||
self.rpc_thread.party_id = str(uuid.uuid4())
|
||
self.rpc_thread._update_secret()
|
||
if connection_info != {}:
|
||
servername = connection_info["name"]
|
||
self.rpc_thread.details = "Online"
|
||
self.rpc_thread.party_size = max(
|
||
1, sum(len(client["players"]) for client in roster)
|
||
)
|
||
self.rpc_thread.party_max = max(8, self.rpc_thread.party_size)
|
||
if len(servername) == 19 and "Private Party" in servername:
|
||
self.rpc_thread.state = "Private Party"
|
||
elif servername == "": # A local game joinable from the internet
|
||
try:
|
||
offlinename = json.loads(bs.get_game_roster()[0]["spec_string"])[
|
||
"n"
|
||
]
|
||
if len(offlinename) > 19: # Thanks Rikko
|
||
self.rpc_thread.state = offlinename[slice(19)] + "..."
|
||
else:
|
||
self.rpc_thread.state = offlinename
|
||
except IndexError:
|
||
pass
|
||
else:
|
||
if len(servername) > 19:
|
||
self.rpc_thread.state = servername[slice(19)] + ".."
|
||
else:
|
||
self.rpc_thread.state = servername[slice(19)]
|
||
|
||
if connection_info == {}:
|
||
self.rpc_thread.details = "Local" # ! replace with something like ballistica github cause
|
||
self.rpc_thread.state = self._get_current_activity_name()
|
||
self.rpc_thread.party_size = max(1, len(roster))
|
||
self.rpc_thread.party_max = max(1, bs.get_public_party_max_size())
|
||
|
||
if (
|
||
bs.get_foreground_host_session() is not None
|
||
and self.rpc_thread.details == "Local"
|
||
):
|
||
session = (
|
||
bs.get_foreground_host_session()
|
||
.__class__.__name__.replace("MainMenuSession", "")
|
||
.replace("EndSession", "")
|
||
.replace("FreeForAllSession", ": FFA") # ! for session use small image key
|
||
.replace("DualTeamSession", ": Teams")
|
||
.replace("CoopSession", ": Coop")
|
||
)
|
||
#! self.rpc_thread.small_image_key = session.lower()
|
||
self.rpc_thread.details = f"{self.rpc_thread.details} {session}"
|
||
if (
|
||
self.rpc_thread.state == "NoneType"
|
||
): # sometimes the game just breaks which means its not really watching replay FIXME
|
||
self.rpc_thread.state = "Watching Replay"
|
||
self.rpc_thread.large_image_key = "replay"
|
||
self.rpc_thread.large_image_text = "Viewing Awesomeness"
|
||
#!self.rpc_thread.small_image_key = "replaysmall"
|
||
|
||
act = bs.get_foreground_host_activity()
|
||
session = bs.get_foreground_host_session()
|
||
if act:
|
||
from bascenev1lib.game.elimination import EliminationGame
|
||
from bascenev1lib.game.thelaststand import TheLastStandGame
|
||
from bascenev1lib.game.meteorshower import MeteorShowerGame
|
||
|
||
# noinspection PyUnresolvedReferences,PyProtectedMember
|
||
try:
|
||
self.rpc_thread.start_timestamp = act._discordrp_start_time # type: ignore
|
||
except AttributeError:
|
||
# This can be the case if plugin launched AFTER activity
|
||
# has been created; in that case let's assume it was
|
||
# created just now.
|
||
self.rpc_thread.start_timestamp = act._discordrp_start_time = time.mktime( # type: ignore
|
||
time.localtime()
|
||
)
|
||
if isinstance(act, EliminationGame):
|
||
alive_count = len([p for p in act.players if p.lives > 0])
|
||
self.rpc_thread.details += f" ({alive_count} players left)"
|
||
elif isinstance(act, TheLastStandGame):
|
||
# noinspection PyProtectedMember
|
||
points = act._score
|
||
self.rpc_thread.details += f" ({points} points)"
|
||
elif isinstance(act, MeteorShowerGame):
|
||
with bs.ContextRef(act):
|
||
sec = bs.time() - act._timer.getstarttime()
|
||
secfmt = ""
|
||
if sec < 60:
|
||
secfmt = f"{sec:.2f}"
|
||
else:
|
||
secfmt = f"{int(sec) // 60:02}:{sec:.2f}"
|
||
self.rpc_thread.details += f" ({secfmt})"
|
||
|
||
# if isinstance(session, ba.DualTeamSession):
|
||
# scores = ':'.join([
|
||
# str(t.customdata['score'])
|
||
# for t in session.sessionteams
|
||
# ])
|
||
# self.rpc_thread.details += f' ({scores})'
|
||
|
||
mapname, short_map_name = self._get_current_map_name()
|
||
if mapname:
|
||
with open(DIRPATH, 'r') as asset_dict:
|
||
asset_keys = json.load(asset_dict).keys()
|
||
if short_map_name in asset_keys:
|
||
self.rpc_thread.large_image_text = mapname
|
||
self.rpc_thread.large_image_key = short_map_name
|
||
self.rpc_thread.small_image_key = 'bombsquadlogo2'
|
||
self.rpc_thread.small_image_text = 'BombSquad'
|
||
|
||
if _babase.get_idle_time() / (1000 * 60) % 60 >= 0.4:
|
||
self.rpc_thread.details = f"AFK in {self.rpc_thread.details}"
|
||
if not ANDROID:
|
||
self.rpc_thread.large_image_key = (
|
||
"https://media.tenor.com/uAqNn6fv7x4AAAAM/bombsquad-spaz.gif"
|
||
)
|
||
if ANDROID and Path(f"{getcwd()}/token.txt").exists():
|
||
self.rpc_thread.presence()
|