[ci] auto-format

This commit is contained in:
brostosjoined 2023-08-01 00:36:34 +00:00 committed by github-actions[bot]
parent 037ee6e25c
commit fbe8a1afb3

View file

@ -24,7 +24,7 @@ import shutil
import hashlib import hashlib
import babase import babase
import _babase import _babase
import bascenev1 as bs import bascenev1 as bs
import bascenev1lib import bascenev1lib
import bauiv1 as bui import bauiv1 as bui
@ -71,16 +71,15 @@ if ANDROID: # !can add ios in future
from websocket import WebSocketConnectionClosedException from websocket import WebSocketConnectionClosedException
import websocket import websocket
start_time = time.time() start_time = time.time()
class PresenceUpdate: class PresenceUpdate:
def __init__(self): def __init__(self):
self.ws = websocket.WebSocketApp("wss://gateway.discord.gg/?encoding=json&v=10", self.ws = websocket.WebSocketApp("wss://gateway.discord.gg/?encoding=json&v=10",
on_open=self.on_open, on_open=self.on_open,
on_message=self.on_message, on_message=self.on_message,
on_error=self.on_error, on_error=self.on_error,
on_close=self.on_close) on_close=self.on_close)
self.heartbeat_interval = int(41250) self.heartbeat_interval = int(41250)
self.resume_gateway_url: str | None = None self.resume_gateway_url: str | None = None
self.session_id: str | None = None self.session_id: str | None = None
@ -204,30 +203,27 @@ if ANDROID: # !can add ios in future
identify() identify()
while True: while True:
heartbeat_payload = {"op": 1, "d": self.heartbeat_interval} heartbeat_payload = {"op": 1, "d": self.heartbeat_interval}
try: try:
self.ws.send(json.dumps(heartbeat_payload)) self.ws.send(json.dumps(heartbeat_payload))
time.sleep(self.heartbeat_interval / 1000) time.sleep(self.heartbeat_interval / 1000)
except: except:
pass pass
if self.stop_heartbeat_thread.is_set(): if self.stop_heartbeat_thread.is_set():
self.stop_heartbeat_thread.clear() self.stop_heartbeat_thread.clear()
break break
threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start() threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start()
def start(self): def start(self):
if Path(f"{getcwd()}/token.txt").exists(): if Path(f"{getcwd()}/token.txt").exists():
threading.Thread(target=self.ws.run_forever, daemon=True, name="websocket").start() threading.Thread(target=self.ws.run_forever, daemon=True, name="websocket").start()
def close(self): def close(self):
self.stop_heartbeat_thread.set() self.stop_heartbeat_thread.set()
self.do_once = True self.do_once = True
self.ws.close() self.ws.close()
if not ANDROID: if not ANDROID:
@ -244,13 +240,13 @@ if not ANDROID:
with open(filename, "rb") as f: with open(filename, "rb") as f:
content = f.read() content = f.read()
assert hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7" assert hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7"
shutil.unpack_archive( filename, install_path) shutil.unpack_archive(filename, install_path)
shutil.copytree(source_dir, file_path) shutil.copytree(source_dir, file_path)
shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0")) shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0"))
remove(path) remove(path)
except: except:
pass pass
# Make modifications for it to work on windows # Make modifications for it to work on windows
if babase.app.classic.platform == "windows": if babase.app.classic.platform == "windows":
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file: with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
@ -274,12 +270,12 @@ def get_event_loop(force_fresh=False):
return running return running
else: else:
return loop""" return loop"""
#Thanks Loup # Thanks Loup
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "w") as file: with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "w") as file:
for number, line in enumerate(data): for number, line in enumerate(data):
if number not in range(46,56): if number not in range(46, 56):
file.write(line) file.write(line)
#fix the mess i did with the previous # fix the mess i did with the previous
elif file_path.exists(): elif file_path.exists():
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file: with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
data = file.readlines() data = file.readlines()
@ -289,17 +285,16 @@ def get_event_loop(force_fresh=False):
get_module() get_module()
get_module() get_module()
from pypresence import PipeClosed, DiscordError, DiscordNotFound from pypresence import PipeClosed, DiscordError, DiscordNotFound
from pypresence.utils import get_event_loop from pypresence.utils import get_event_loop
import pypresence import pypresence
import socket import socket
DEBUG = True DEBUG = True
_last_server_addr = 'localhost' _last_server_addr = 'localhost'
_last_server_port = 43210 _last_server_port = 43210
def print_error(err: str, include_exception: bool = False) -> None: def print_error(err: str, include_exception: bool = False) -> None:
if DEBUG: if DEBUG:
if include_exception: if include_exception:
@ -330,8 +325,8 @@ def get_event_loop(force_fresh=False):
old_connect(*args, **kwargs) old_connect(*args, **kwargs)
c = kwargs.get("address") or args[0] c = kwargs.get("address") or args[0]
_last_server_port = kwargs.get("port") or args[1] _last_server_port = kwargs.get("port") or args[1]
bs.connect_to_party = new_connect bs.connect_to_party = new_connect
start_time = time.time() start_time = time.time()
@ -354,10 +349,10 @@ def get_event_loop(force_fresh=False):
self._last_secret_update_time: float = 0 self._last_secret_update_time: float = 0
self._last_connect_time: float = 0 self._last_connect_time: float = 0
self.should_close = False self.should_close = False
@staticmethod @staticmethod
def is_discord_running(): def is_discord_running():
for i in range(6463,6473): for i in range(6463, 6473):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.01) s.settimeout(0.01)
try: try:
@ -365,11 +360,11 @@ def get_event_loop(force_fresh=False):
s.close() s.close()
if (conn == 0): if (conn == 0):
s.close() s.close()
return(True) return (True)
except: except:
s.close() s.close()
return(False) return (False)
def _generate_join_secret(self): def _generate_join_secret(self):
# resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text # resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text
connection_info = bs.get_connection_to_host_info() connection_info = bs.get_connection_to_host_info()
@ -426,7 +421,7 @@ def get_event_loop(force_fresh=False):
self._subscribe("ACTIVITY_JOIN") self._subscribe("ACTIVITY_JOIN")
self._subscribe("ACTIVITY_JOIN_REQUEST") self._subscribe("ACTIVITY_JOIN_REQUEST")
# def _update_presence(self) -> None: # def _update_presence(self) -> None:
# self._last_update_time = time.time() # self._last_update_time = time.time()
# try: # try:
# self._do_update_presence() # self._do_update_presence()
@ -435,7 +430,6 @@ def get_event_loop(force_fresh=False):
# self._reconnect() # self._reconnect()
# except Exception: # except Exception:
# print_error("failed to update presence", include_exception= True) # print_error("failed to update presence", include_exception= True)
def _reconnect(self) -> None: def _reconnect(self) -> None:
self.rpc.connect() self.rpc.connect()
@ -512,7 +506,7 @@ def get_event_loop(force_fresh=False):
def _connect_to_party(self, hostname, port) -> None: def _connect_to_party(self, hostname, port) -> None:
babase.pushcall( babase.pushcall(
babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True
) )
def on_join_request(self, username, uid, discriminator, avatar) -> None: def on_join_request(self, username, uid, discriminator, avatar) -> None:
del uid # unused del uid # unused
@ -520,7 +514,8 @@ def get_event_loop(force_fresh=False):
babase.pushcall( babase.pushcall(
babase.Call( babase.Call(
bui.screenmessage, bui.screenmessage,
"Discord: {}{} wants to join!".format(username, discriminator if discriminator != "#0" else ""), "Discord: {}{} wants to join!".format(
username, discriminator if discriminator != "#0" else ""),
color=(0.0, 1.0, 0.0), color=(0.0, 1.0, 0.0),
), ),
from_other_thread=True, from_other_thread=True,
@ -540,15 +535,13 @@ class Discordlogin(PopupWindow):
self.path = Path(f"{getcwd()}/token.txt") self.path = Path(f"{getcwd()}/token.txt")
bg_color = (0.5, 0.4, 0.6) bg_color = (0.5, 0.4, 0.6)
log_btn_colour = (0.10, 0.95, 0.10) if not self.path.exists() else (1.00, 0.15, 0.15) log_btn_colour = (0.10, 0.95, 0.10) if not self.path.exists() else (1.00, 0.15, 0.15)
log_txt = "LOG IN" if not self.path.exists() else "LOG OUT" log_txt = "LOG IN" if not self.path.exists() else "LOG OUT"
self.code = False self.code = False
self.resp = "Placeholder" self.resp = "Placeholder"
self.headers = { self.headers = {
'user-agent': "Mozilla/5.0", 'user-agent': "Mozilla/5.0",
'content-type': "application/json", 'content-type': "application/json",
} }
# creates our _root_widget # creates our _root_widget
PopupWindow.__init__(self, PopupWindow.__init__(self,
@ -557,7 +550,6 @@ class Discordlogin(PopupWindow):
scale=(2.1 if _uiscale is babase.UIScale.SMALL else 1.5 scale=(2.1 if _uiscale is babase.UIScale.SMALL else 1.5
if _uiscale is babase.UIScale.MEDIUM else 1.0), if _uiscale is babase.UIScale.MEDIUM else 1.0),
bg_color=bg_color) bg_color=bg_color)
self._cancel_button = bui.buttonwidget( self._cancel_button = bui.buttonwidget(
parent=self.root_widget, parent=self.root_widget,
@ -570,44 +562,38 @@ class Discordlogin(PopupWindow):
autoselect=True, autoselect=True,
icon=bui.gettexture('crossOut'), icon=bui.gettexture('crossOut'),
iconscale=1.2) iconscale=1.2)
bui.imagewidget(parent=self.root_widget,
position=(180, self._height - 55),
size=(32 * s, 32 * s),
texture=bui.gettexture("discordLogo"),
color=(10 - 0.32, 10 - 0.39, 10 - 0.96))
bui.imagewidget(parent=self.root_widget,
position=(180, self._height - 55),
size=(32 * s, 32 * s),
texture=bui.gettexture("discordLogo"),
color=(10 - 0.32, 10 - 0.39, 10 - 0.96))
self.email_widget = bui.textwidget(parent=self.root_widget, self.email_widget = bui.textwidget(parent=self.root_widget,
text="Email/Phone Number", text="Email/Phone Number",
size=(400, 70), size=(400, 70),
position=(50, 180), position=(50, 180),
h_align='left', h_align='left',
v_align='center', v_align='center',
editable=True, editable=True,
scale=0.8, scale=0.8,
autoselect=True, autoselect=True,
maxwidth=220) maxwidth=220)
self.password_widget = bui.textwidget(parent=self.root_widget, self.password_widget = bui.textwidget(parent=self.root_widget,
text="Password", text="Password",
size=(400, 70), size=(400, 70),
position=(50, 120), position=(50, 120),
h_align='left', h_align='left',
v_align='center', v_align='center',
editable=True, editable=True,
scale=0.8, scale=0.8,
autoselect=True, autoselect=True,
maxwidth=220) maxwidth=220)
bui.containerwidget(edit=self.root_widget, bui.containerwidget(edit=self.root_widget,
cancel_button=self._cancel_button) cancel_button=self._cancel_button)
bui.textwidget( bui.textwidget(
parent=self.root_widget, parent=self.root_widget,
position=(265, self._height - 37), position=(265, self._height - 37),
@ -618,7 +604,7 @@ class Discordlogin(PopupWindow):
text="Discord", text="Discord",
maxwidth=200, maxwidth=200,
color=(0.80, 0.80, 0.80)) color=(0.80, 0.80, 0.80))
bui.textwidget( bui.textwidget(
parent=self.root_widget, parent=self.root_widget,
position=(265, self._height - 78), position=(265, self._height - 78),
@ -629,8 +615,7 @@ class Discordlogin(PopupWindow):
text="💀Use at your own risk💀\ndiscord account might get terminated⚠", text="💀Use at your own risk💀\ndiscord account might get terminated⚠",
maxwidth=200, maxwidth=200,
color=(1.00, 0.15, 0.15)) color=(1.00, 0.15, 0.15))
self._login_button = bui.buttonwidget( self._login_button = bui.buttonwidget(
parent=self.root_widget, parent=self.root_widget,
position=(120, 65), position=(120, 65),
@ -652,33 +637,29 @@ class Discordlogin(PopupWindow):
def on_bascenev1libup_cancel(self) -> None: def on_bascenev1libup_cancel(self) -> None:
bui.getsound('swish').play() bui.getsound('swish').play()
self._transition_out() self._transition_out()
def backup_2fa_code(self, tickt): def backup_2fa_code(self, tickt):
if babase.do_once(): if babase.do_once():
self.email_widget.delete() self.email_widget.delete()
self.password_widget.delete() self.password_widget.delete()
self.backup_2fa_widget = bui.textwidget(parent=self.root_widget, self.backup_2fa_widget = bui.textwidget(parent=self.root_widget,
text="2FA/Discord Backup code", text="2FA/Discord Backup code",
size=(400, 70), size=(400, 70),
position=(50, 120), position=(50, 120),
h_align='left', h_align='left',
v_align='center', v_align='center',
editable=True, editable=True,
scale=0.8, scale=0.8,
autoselect=True, autoselect=True,
maxwidth=220) maxwidth=220)
json_data_2FA = { json_data_2FA = {
"code": bui.textwidget(query=self.backup_2fa_widget), "code": bui.textwidget(query=self.backup_2fa_widget),
"gift_code_sku_id": None, "gift_code_sku_id": None,
"ticket": tickt, "ticket": tickt,
} }
if json_data_2FA['code'] != "2FA/Discord Backup code": if json_data_2FA['code'] != "2FA/Discord Backup code":
try: try:
payload_2FA = json.dumps(json_data_2FA) payload_2FA = json.dumps(json_data_2FA)
@ -686,7 +667,7 @@ class Discordlogin(PopupWindow):
conn_2FA.request("POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers) conn_2FA.request("POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers)
res_2FA = conn_2FA.getresponse().read() res_2FA = conn_2FA.getresponse().read()
token = json.loads(res_2FA)['token'].encode().hex().encode() token = json.loads(res_2FA)['token'].encode().hex().encode()
with open(self.path, 'wb') as f: with open(self.path, 'wb') as f:
f.write(token) f.write(token)
bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20)) bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20))
@ -697,11 +678,11 @@ class Discordlogin(PopupWindow):
self.code = True self.code = True
bui.screenmessage("Incorrect code", (1.00, 0.15, 0.15)) bui.screenmessage("Incorrect code", (1.00, 0.15, 0.15))
bui.getsound('error').play() bui.getsound('error').play()
def login(self): def login(self):
if not self.path.exists() and self.code == False: if not self.path.exists() and self.code == False:
try: try:
json_data = { json_data = {
'login': bui.textwidget(query=self.email_widget), 'login': bui.textwidget(query=self.email_widget),
'password': bui.textwidget(query=self.password_widget), 'password': bui.textwidget(query=self.password_widget),
@ -710,7 +691,7 @@ class Discordlogin(PopupWindow):
'login_source': None, 'login_source': None,
'gift_code_sku_id': None, 'gift_code_sku_id': None,
} }
conn = http.client.HTTPSConnection("discord.com") conn = http.client.HTTPSConnection("discord.com")
payload = json.dumps(json_data) payload = json.dumps(json_data)
@ -718,9 +699,7 @@ class Discordlogin(PopupWindow):
# res = conn.getresponse().read() # res = conn.getresponse().read()
conn.request("POST", "/api/v9/auth/login", payload, self.headers) conn.request("POST", "/api/v9/auth/login", payload, self.headers)
res = conn.getresponse().read() res = conn.getresponse().read()
try: try:
token = json.loads(res)['token'].encode().hex().encode() token = json.loads(res)['token'].encode().hex().encode()
with open(self.path, 'wb') as f: with open(self.path, 'wb') as f:
@ -729,27 +708,27 @@ class Discordlogin(PopupWindow):
bui.getsound('shieldUp').play() bui.getsound('shieldUp').play()
self.on_bascenev1libup_cancel() self.on_bascenev1libup_cancel()
PresenceUpdate().start() PresenceUpdate().start()
except KeyError: except KeyError:
try: try:
ticket = json.loads(res)['ticket'] ticket = json.loads(res)['ticket']
bui.screenmessage("Input your 2FA or Discord Backup code", (0.21, 1.0, 0.20)) bui.screenmessage("Input your 2FA or Discord Backup code",
(0.21, 1.0, 0.20))
bui.getsound('error').play() bui.getsound('error').play()
self.resp = ticket self.resp = ticket
self.backup_2fa_code(tickt=ticket) self.backup_2fa_code(tickt=ticket)
self.code = True self.code = True
except KeyError: except KeyError:
bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15)) bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15))
bui.getsound('error').play() bui.getsound('error').play()
except: except:
bui.screenmessage("Connect to the internet", (1.00, 0.15, 0.15)) bui.screenmessage("Connect to the internet", (1.00, 0.15, 0.15))
bui.getsound('error').play() bui.getsound('error').play()
conn.close() conn.close()
elif self.code == True: elif self.code == True:
self.backup_2fa_code(tickt=self.resp) self.backup_2fa_code(tickt=self.resp)
else: else:
self.email_widget.delete() self.email_widget.delete()
self.password_widget.delete() self.password_widget.delete()
@ -759,8 +738,10 @@ class Discordlogin(PopupWindow):
self.on_bascenev1libup_cancel() self.on_bascenev1libup_cancel()
PresenceUpdate().close() PresenceUpdate().close()
run_once = False run_once = False
def get_once_asset(): def get_once_asset():
global run_once global run_once
if run_once: if run_once:
@ -788,6 +769,7 @@ def get_once_asset():
pass pass
run_once = True run_once = True
def get_class(): def get_class():
if ANDROID: if ANDROID:
return PresenceUpdate() return PresenceUpdate()
@ -808,7 +790,7 @@ class DiscordRP(babase.Plugin):
def on_app_running(self) -> None: def on_app_running(self) -> None:
if not ANDROID: if not ANDROID:
self.rpc_thread.start() self.rpc_thread.start()
self.update_timer = bs.AppTimer( self.update_timer = bs.AppTimer(
1, bs.WeakCall(self.update_status), repeat=True 1, bs.WeakCall(self.update_status), repeat=True
) )
@ -835,17 +817,17 @@ class DiscordRP(babase.Plugin):
def on_app_pause(self) -> None: def on_app_pause(self) -> None:
self.rpc_thread.close() self.rpc_thread.close()
def on_app_resume(self) -> None: def on_app_resume(self) -> None:
global start_time global start_time
start_time = time.time() start_time = time.time()
self.rpc_thread.start() self.rpc_thread.start()
def _get_current_activity_name(self) -> str | None: def _get_current_activity_name(self) -> str | None:
act = bs.get_foreground_host_activity() act = bs.get_foreground_host_activity()
if isinstance(act, bs.GameActivity): if isinstance(act, bs.GameActivity):
return act.name return act.name
this = "Lobby" this = "Lobby"
name: str | None = ( name: str | None = (
act.__class__.__name__.replace("Activity", "") act.__class__.__name__.replace("Activity", "")
@ -913,7 +895,7 @@ class DiscordRP(babase.Plugin):
offlinename = json.loads(bs.get_game_roster()[0]["spec_string"])[ offlinename = json.loads(bs.get_game_roster()[0]["spec_string"])[
"n" "n"
] ]
if len(offlinename) > 19: # Thanks Rikko if len(offlinename) > 19: # Thanks Rikko
self.rpc_thread.state = offlinename[slice(19)] + "..." self.rpc_thread.state = offlinename[slice(19)] + "..."
else: else:
self.rpc_thread.state = offlinename self.rpc_thread.state = offlinename
@ -926,7 +908,7 @@ class DiscordRP(babase.Plugin):
self.rpc_thread.state = servername[slice(19)] self.rpc_thread.state = servername[slice(19)]
if connection_info == {}: if connection_info == {}:
self.rpc_thread.details = "Local" #! replace with something like ballistica github cause self.rpc_thread.details = "Local" # ! replace with something like ballistica github cause
self.rpc_thread.state = self._get_current_activity_name() self.rpc_thread.state = self._get_current_activity_name()
self.rpc_thread.party_size = max(1, len(roster)) self.rpc_thread.party_size = max(1, len(roster))
self.rpc_thread.party_max = max(1, bs.get_public_party_max_size()) self.rpc_thread.party_max = max(1, bs.get_public_party_max_size())
@ -939,7 +921,7 @@ class DiscordRP(babase.Plugin):
bs.get_foreground_host_session() bs.get_foreground_host_session()
.__class__.__name__.replace("MainMenuSession", "") .__class__.__name__.replace("MainMenuSession", "")
.replace("EndSession", "") .replace("EndSession", "")
.replace("FreeForAllSession", ": FFA") #! for session use small image key .replace("FreeForAllSession", ": FFA") # ! for session use small image key
.replace("DualTeamSession", ": Teams") .replace("DualTeamSession", ": Teams")
.replace("CoopSession", ": Coop") .replace("CoopSession", ": Coop")
) )
@ -1012,4 +994,3 @@ class DiscordRP(babase.Plugin):
) )
if ANDROID and Path(f"{getcwd()}/token.txt").exists(): if ANDROID and Path(f"{getcwd()}/token.txt").exists():
self.rpc_thread.presence() self.rpc_thread.presence()