Api 9 release

This commit is contained in:
brostosjoined 2025-01-26 20:33:06 +03:00
parent 9b4ee194a1
commit ba27a96ede
4 changed files with 867 additions and 533 deletions

View file

@ -1439,6 +1439,7 @@
}
],
"versions": {
"1.5.2" : null,
"1.5.1": {
"api_version": 9,
"commit_sha": "a93fda3",
@ -1681,6 +1682,7 @@
}
],
"versions": {
"1.5.0": null,
"1.0.0": {
"api_version": 8,
"commit_sha": "48bd0da",
@ -1688,6 +1690,20 @@
"md5sum": "15f969c23d19118d4898570cfae71c7b"
}
}
},
"wave_emote": {
"description": "Type `hello` in chat while in game and get a wave emote",
"external_url": "",
"authors": [
{
"name": "brostosjoined",
"email": "",
"discord": "brostos"
}
],
"versions": {
"1.5.0": null
}
}
}
}

View file

@ -4,6 +4,9 @@
# ba_meta require api 9
#!"Made to you by @brostos & @Dliwk"
# TODO
# - Update to the latest libs
# - Use account id to hash the tkn
from __future__ import annotations
@ -12,12 +15,13 @@ from pathlib import Path
from os import getcwd, remove
from bauiv1lib.popup import PopupWindow
import asyncio
import sys
import http.client
import ast
import uuid
import json
import socket
import time
import threading
import shutil
@ -35,17 +39,93 @@ if TYPE_CHECKING:
from typing import Any, Tuple
MAPNAME_ID = {
"bombsquadicon": "963448129900908595",
"zigzagpreview": "963448133130522624",
"tiptoppreview": "963448133168279582",
"towerdpreview": "963448135886200912",
"thepadpreview": "963448137916248084",
"steprightuppreview": "963448141728862248",
"roundaboutpreview": "963448143997972550",
"rampagepreview": "963448146422296676",
"monkeyfacepreview": "963448151182831626",
"footballstadiumpreview": "963448158719983646",
"doomshroompreview": "963448160993292368",
"cragcastlepreview": "963448163048513536",
"courtyardpreview": "963448166127120504",
"bridgitpreview": "963448169180565654",
"biggpreview": "963448172905127996",
"alwayslandpreview": "963448174163423252",
"bigg": "1013013392455376977",
"bridgit": "1013013400139333632",
"courtyard": "1013013410776096788",
"cragcastle": "1013013423132528700",
"doomshroom": "1013013438223622224",
"footballstadium": "1013013452517810226",
"hockeystadium": "1013013464060547112",
"monkeyface": "1013013477721383023",
"rampage": "1013013484830728273",
"roundabout": "1013013508323037264",
"steprightup": "1013013567768907826",
"thepad": "1013013577197699163",
"tiptop": "1013013593089904721",
"towerd": "1013013604531970131",
"zigzag": "1013013618188619816",
"bombsquadlogo2": "1013016083701190726",
"windows": "1084050785488338984",
"linux": "1084078945944739920",
"lobby": "1084180821973418226",
"ranking": "1084224689272004719",
"rampagelevelcolor": "1086989941541703741",
"landmine": "1087000404866371766",
"rgbstripes": "1087000416492990474",
"shrapnel1color": "1087151233225195590",
"bonescolor": "1087151164077899928",
"bridgitlevelcolor": "1087151178674094182",
"crossout": "1087151197963681902",
"naturebackgroundcolor": "1087151209896476782",
"zigzaglevelcolor": "1087151253206876241",
"zoeicon": "1087151266989363240",
"bg": "1087564057890000906",
"alwayslandlevelcolor": "1087564765406167080",
"hockeystadiumpreview": "1087574349285961768",
"mac": "1087584375287336992",
"flyer": "1087584543147561051",
"replay": "1087592122393301102",
"coop": "1097697042891018311",
"ffa": "1097697050214269008",
"lobbysmall": "1097697055926923335",
"replaysmall": "1097697062746853386",
"teams": "1097697068727935036",
"bacongreece": "1097700754623565894",
"basketballstadium": "1097700771501441167",
"flapland": "1097700783622979664",
"alwaysland": "1097700794213613610",
"hoveringwood": "1097700802321199224",
"jrmponslaught": "1097700810479124520",
"jrmprunaround": "1097700817194205286",
"lakefrigid": "1097700828023898203",
"mushfeud": "1097700836920000594",
"pillar_bases": "1097700846340407427",
"powerup_factory": "1097700854422851656",
"snowballpit": "1097700869673341009",
"stoneishfort": "1097700887826272308",
"toiletdonut": "1097700898584666193",
"whereeaglesdare": "1097700904972587109",
"android": "1097728392280932453",
}
ANDROID = babase.app.classic.platform == "android"
DIRPATH = Path(
f"{_babase.app.python_directory_user if build_number < 21282 else _babase.app.env.python_directory_user}/image_id.json")
APP_VERSION = _babase.app.version if build_number < 21282 else (
_babase.app.env.engine_version if build_number > 21823 else _babase.app.env.version)
if ANDROID: # !can add ios in future
# Installing websocket
def get_module():
install_path = Path(f"{getcwd()}/ba_data/python") # For the guys like me on windows
install_path = Path(
f"{getcwd()}/ba_data/python"
) # For the guys like me on windows
path = Path(f"{install_path}/websocket.tar.gz")
file_path = Path(f"{install_path}/websocket")
source_dir = Path(f"{install_path}/websocket-client-1.6.1/websocket")
@ -60,8 +140,11 @@ if ANDROID: # !can add ios in future
filename, headers = urlretrieve(url, filename=path)
with open(filename, "rb") as f:
content = f.read()
assert hashlib.md5(content).hexdigest() == "86bc69b61947943627afc1b351c0b5db"
shutil.unpack_archive(filename, install_path)
assert (
hashlib.md5(content).hexdigest()
== "86bc69b61947943627afc1b351c0b5db"
)
shutil.unpack_archive(filename, install_path, format='gztar')
remove(path)
shutil.copytree(source_dir, file_path)
shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1"))
@ -70,6 +153,7 @@ if ANDROID: # !can add ios in future
shutil.rmtree(Path(f"{install_path}/websocket-client-1.6.1"))
else:
pass
get_module()
from websocket import WebSocketConnectionClosedException
@ -79,16 +163,18 @@ if ANDROID: # !can add ios in future
class PresenceUpdate:
def __init__(self):
self.ws = websocket.WebSocketApp("wss://gateway.discord.gg/?encoding=json&v=10",
self.ws = websocket.WebSocketApp(
"wss://gateway.discord.gg/?encoding=json&v=10",
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
on_close=self.on_close,
)
self.heartbeat_interval = int(41250)
self.resume_gateway_url: str | None = None
self.session_id: str | None = None
self.stop_heartbeat_thread = threading.Event()
self.do_once = True
self.do_once: bool = True
self.state: str | None = "In Game"
self.details: str | None = "Main Menu"
self.start_timestamp = time.time()
@ -96,7 +182,8 @@ if ANDROID: # !can add ios in future
self.large_image_text: str | None = "BombSquad Icon"
self.small_image_key: str | None = None
self.small_image_text: str | None = (
f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})")
f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})"
)
self.media_proxy = "mp:/app-assets/963434684669382696/{}.png"
self.identify: bool = False
self.party_id: str = str(uuid.uuid4())
@ -104,15 +191,13 @@ if ANDROID: # !can add ios in future
self.party_max = 8
def presence(self):
with open(DIRPATH, "r") as maptxt:
largetxt = json.load(maptxt)[self.large_image_key]
with open(DIRPATH, "r") as maptxt:
smalltxt = json.load(maptxt)[self.small_image_key]
largetxt = MAPNAME_ID[self.large_image_key]
smalltxt = MAPNAME_ID[self.small_image_key]
presencepayload = {
"op": 3,
"d": {
"since": None, # used to show how long the user went idle will add afk to work with this and then set the status to idle
"since": start_time, # Fixed the unlimited time bug
"status": "online",
"afk": "false",
"activities": [
@ -122,12 +207,10 @@ if ANDROID: # !can add ios in future
"application_id": "963434684669382696",
"state": self.state,
"details": self.details,
"timestamps": {
"start": start_time
},
# "timestamps": {"start": start_time},
"party": {
"id": self.party_id,
"size": [self.party_size, self.party_max]
"size": [self.party_size, self.party_max],
},
"assets": {
"large_image": self.media_proxy.format(largetxt),
@ -172,7 +255,13 @@ if ANDROID: # !can add ios in future
babase.print_exception(error)
def on_close(self, ws, close_status_code, close_msg):
(
print("Closed Discord Connection Successfully")
if close_status_code == 1000
else print(
f"Closed Discord Connection with code {close_status_code} and message {close_msg}"
)
)
def on_open(self, ws):
print("Connected to Discord Websocket")
@ -189,8 +278,8 @@ if ANDROID: # !can add ios in future
def identify():
"""Identifying to the gateway and enable by using user token and the intents we will be using e.g 256->For Presence"""
with open(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt", 'r') as f:
token = bytes.fromhex(f.read()).decode('utf-8')
byt_tkn = babase.app.config.get("token")
token = bytes.fromhex(byt_tkn).decode("utf-8")
identify_payload = {
"op": 2,
"d": {
@ -204,6 +293,7 @@ if ANDROID: # !can add ios in future
},
} # step 3 send an identify
self.ws.send(json.dumps(identify_payload))
identify()
while True:
heartbeat_payload = {"op": 1, "d": self.heartbeat_interval}
@ -221,8 +311,34 @@ if ANDROID: # !can add ios in future
threading.Thread(target=heartbeats, daemon=True, name="heartbeat").start()
def start(self):
if Path(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt").exists():
threading.Thread(target=self.ws.run_forever, daemon=True, name="websocket").start()
if (
Path(
f"{_babase.app.env.python_directory_user}/__pycache__/token.txt"
).exists()
or Path(f"{getcwd()}/token.txt").exists()
):
try:
with open(f"{getcwd()}/token.txt", "r") as f:
token = bytes.fromhex(f.read()).decode("utf-8")
except FileNotFoundError:
with open(
f"{_babase.app.env.python_directory_user}/__pycache__/token.txt",
"r",
) as f:
token = bytes.fromhex(f.read()).decode("utf-8")
babase.app.config["token"] = token
babase.app.config.commit()
if babase.app.config.get("token"):
try:
while True:
urlopen("http://www.google.com", timeout=5)
threading.Thread(
target=self.ws.run_forever, daemon=True, name="websocket"
).start()
return
except Exception:
return
def close(self):
self.stop_heartbeat_thread.set()
@ -243,62 +359,25 @@ if not ANDROID:
filename, headers = urlretrieve(url, filename=path)
with open(filename, "rb") as f:
content = f.read()
assert hashlib.md5(content).hexdigest() == "f7c163cdd001af2456c09e241b90bad7"
shutil.unpack_archive(filename, install_path)
assert (
hashlib.md5(content).hexdigest()
== "f7c163cdd001af2456c09e241b90bad7"
)
shutil.unpack_archive(filename, install_path, format='gztar')
shutil.copytree(source_dir, file_path)
shutil.rmtree(Path(f"{install_path}/pypresence-4.3.0"))
remove(path)
except:
pass
# Make modifications for it to work on windows
if babase.app.classic.platform == "windows":
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
data = file.readlines()
data[45] = """
def get_event_loop(force_fresh=False):
loop = asyncio.ProactorEventLoop() if sys.platform == 'win32' else asyncio.new_event_loop()
if force_fresh:
return loop
try:
running = asyncio.get_running_loop()
except RuntimeError:
return loop
if running.is_closed():
return loop
else:
if sys.platform in ('linux', 'darwin'):
return running
if sys.platform == 'win32':
if isinstance(running, asyncio.ProactorEventLoop):
return running
else:
return loop"""
# Thanks Loup
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "w") as file:
for number, line in enumerate(data):
if number not in range(46, 56):
file.write(line)
# fix the mess i did with the previous
elif file_path.exists():
with open(Path(f"{getcwd()}/ba_data/python/pypresence/utils.py"), "r") as file:
data = file.readlines()
first_line = data[0].rstrip("\n")
if not first_line == '"""Util functions that are needed but messy."""':
shutil.rmtree(file_path)
get_module()
get_module()
from pypresence import PipeClosed, DiscordError, DiscordNotFound
from pypresence.utils import get_event_loop
import pypresence
import socket
DEBUG = True
_last_server_addr = 'localhost'
_last_server_port = 43210
def print_error(err: str, include_exception: bool = False) -> None:
if DEBUG:
if include_exception:
@ -327,8 +406,9 @@ def get_event_loop(force_fresh=False):
global _last_server_addr
global _last_server_port
old_connect(*args, **kwargs)
c = kwargs.get("address") or args[0]
_last_server_port = kwargs.get("port") or args[1]
_last_server_addr = kwargs.get("address") or args[0]
# ! Joining a game on same device as host NB check what happens if host is port forwarded you join it and check joining a server port forwarded or not
_last_server_port = kwargs.get("port") or args[1] if len(args) > 1 else 43210
bs.connect_to_party = new_connect
@ -353,6 +433,7 @@ def get_event_loop(force_fresh=False):
self._last_secret_update_time: float = 0
self._last_connect_time: float = 0
self.should_close = False
self.connection_to_host_info = None
@staticmethod
def is_discord_running():
@ -360,45 +441,49 @@ def get_event_loop(force_fresh=False):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.01)
try:
conn = s.connect_ex(('localhost', i))
conn = s.connect_ex(("localhost", i))
s.close()
if (conn == 0):
if conn == 0:
s.close()
return (True)
return True
except:
s.close()
return (False)
return False
def _generate_join_secret(self):
# resp = requests.get('https://legacy.ballistica.net/bsAccessCheck').text
try:
connection_info = bs.get_connection_to_host_info_2()
connection_info = self.connection_to_host_info
if connection_info:
addr = _last_server_addr
port = _last_server_port
else:
with urlopen(
"https://legacy.ballistica.net/bsAccessCheck"
) as resp:
with urlopen("https://legacy.ballistica.net/bsAccessCheck") as resp:
resp = resp.read().decode()
resp = ast.literal_eval(resp)
addr = resp["address"]
port = 43210
port = resp["port"]
addr, port = addr, port
secret_dict = {
"format_version": 1,
"hostname": addr,
"port": port,
}
self.join_secret = json.dumps(secret_dict)
except:
except Exception as _:
pass
def _update_secret(self):
#! use in game thread
threading.Thread(target=self._generate_join_secret, daemon=True).start()
self._last_secret_update_time = time.time()
def run(self) -> None:
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.set_event_loop(get_event_loop())
while not self.should_close:
if time.time() - self._last_update_time > 0.1:
self._do_update_presence()
@ -456,6 +541,7 @@ def get_event_loop(force_fresh=False):
party_id=self.party_id,
party_size=[self.party_size, self.party_max],
join=self.join_secret,
)
# buttons = [ #!cant use buttons together with join
# {
# "label": "Discord Server",
@ -465,8 +551,6 @@ def get_event_loop(force_fresh=False):
# "label": "Download Bombsquad",
# "url": "https://bombsquad.ga/download"}
# ]
)
self.handle_event(data)
except (PipeClosed, DiscordError, AssertionError, AttributeError):
try:
@ -503,28 +587,31 @@ def get_event_loop(force_fresh=False):
user = data.get("user", {})
uid = user.get("id")
username = user.get("username")
discriminator = user.get("discriminator", None)
avatar = user.get("avatar")
self.on_join_request(username, uid, discriminator, avatar)
self.on_join_request(username, uid, avatar)
def _connect_to_party(self, hostname, port) -> None:
babase.pushcall(
babase.Call(bs.connect_to_party, hostname, port), from_other_thread=True
)
def on_join_request(self, username, uid, discriminator, avatar) -> None:
def on_join_request(self, username, uid, avatar) -> None:
del uid # unused
del avatar # unused
babase.pushcall(
babase.Call(
bui.screenmessage,
"Discord: {}{} wants to join!".format(
username, discriminator if discriminator != "#0" else ""),
"Discord: {} wants to join!".format(
username
),
color=(0.0, 1.0, 0.0),
),
from_other_thread=True,
)
babase.pushcall(lambda: bui.getsound('bellMed').play(), from_other_thread=True)
#! check this function for sound creation error
babase.pushcall(
lambda: bui.getsound("bellMed").play(), from_other_thread=True
)
class Discordlogin(PopupWindow):
@ -533,92 +620,112 @@ class Discordlogin(PopupWindow):
# pylint: disable=too-many-locals
_uiscale = bui.app.ui_v1.uiscale
self._transitioning_out = False
s = 1.25 if _uiscale is babase.UIScale.SMALL else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3
s = (
1.25
if _uiscale is babase.UIScale.SMALL
else 1.27 if _uiscale is babase.UIScale.MEDIUM else 1.3
)
self._width = 380 * s
self._height = 150 + 150 * s
self.path = Path(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt")
bg_color = (0.5, 0.4, 0.6)
log_btn_colour = (0.10, 0.95, 0.10) if not self.path.exists() else (1.00, 0.15, 0.15)
log_txt = "LOG IN" if not self.path.exists() else "LOG OUT"
log_btn_colour = (
(0.10, 0.95, 0.10)
if not babase.app.config.get("token")
else (1.00, 0.15, 0.15)
)
log_txt = "LOG IN" if not babase.app.config.get("token") else "LOG OUT"
self.code = False
self.resp = "Placeholder"
self.headers = {
'user-agent': "Mozilla/5.0",
'content-type': "application/json",
"user-agent": "Mozilla/5.0",
"content-type": "application/json",
}
# creates our _root_widget
PopupWindow.__init__(self,
PopupWindow.__init__(
self,
position=(0.0, 0.0),
size=(self._width, self._height),
scale=(2.1 if _uiscale is babase.UIScale.SMALL else 1.5
if _uiscale is babase.UIScale.MEDIUM else 1.0),
bg_color=bg_color)
scale=(
2.1
if _uiscale is babase.UIScale.SMALL
else 1.5 if _uiscale is babase.UIScale.MEDIUM else 1.0
),
bg_color=bg_color,
)
self._cancel_button = bui.buttonwidget(
parent=self.root_widget,
position=(25, self._height - 40),
size=(50, 50),
scale=0.58,
label='',
label="",
color=bg_color,
on_activate_call=self._on_cancel_press,
autoselect=True,
icon=bui.gettexture('crossOut'),
iconscale=1.2)
icon=bui.gettexture("crossOut"),
iconscale=1.2,
)
bui.imagewidget(parent=self.root_widget,
bui.imagewidget(
parent=self.root_widget,
position=(180, self._height - 55),
size=(32 * s, 32 * s),
texture=bui.gettexture("discordLogo"),
color=(10 - 0.32, 10 - 0.39, 10 - 0.96))
color=(10 - 0.32, 10 - 0.39, 10 - 0.96),
)
self.email_widget = bui.textwidget(parent=self.root_widget,
self.email_widget = bui.textwidget(
parent=self.root_widget,
text="Email/Phone Number",
size=(400, 70),
position=(50, 180),
h_align='left',
v_align='center',
h_align="left",
v_align="center",
editable=True,
scale=0.8,
autoselect=True,
maxwidth=220)
maxwidth=220,
)
self.password_widget = bui.textwidget(parent=self.root_widget,
self.password_widget = bui.textwidget(
parent=self.root_widget,
text="Password",
size=(400, 70),
position=(50, 120),
h_align='left',
v_align='center',
h_align="left",
v_align="center",
editable=True,
scale=0.8,
autoselect=True,
maxwidth=220)
maxwidth=220,
)
bui.containerwidget(edit=self.root_widget,
cancel_button=self._cancel_button)
bui.containerwidget(edit=self.root_widget, cancel_button=self._cancel_button)
bui.textwidget(
parent=self.root_widget,
position=(265, self._height - 37),
size=(0, 0),
h_align='center',
v_align='center',
h_align="center",
v_align="center",
scale=1.0,
text="Discord",
maxwidth=200,
color=(0.80, 0.80, 0.80))
color=(0.80, 0.80, 0.80),
)
bui.textwidget(
parent=self.root_widget,
position=(265, self._height - 78),
size=(0, 0),
h_align='center',
v_align='center',
h_align="center",
v_align="center",
scale=1.0,
text="💀Use at your own risk💀\ndiscord account might get terminated⚠",
text="💀Use at your own risk💀\n︝discord account might get terminated⚠︝",
maxwidth=200,
color=(1.00, 0.15, 0.15))
color=(1.00, 0.15, 0.15),
)
self._login_button = bui.buttonwidget(
parent=self.root_widget,
@ -628,7 +735,8 @@ class Discordlogin(PopupWindow):
label=log_txt,
color=log_btn_colour,
on_activate_call=self.login,
autoselect=True)
autoselect=True,
)
def _on_cancel_press(self) -> None:
self._transition_out()
@ -636,10 +744,10 @@ class Discordlogin(PopupWindow):
def _transition_out(self) -> None:
if not self._transitioning_out:
self._transitioning_out = True
bui.containerwidget(edit=self.root_widget, transition='out_scale')
bui.containerwidget(edit=self.root_widget, transition="out_scale")
def on_bascenev1libup_cancel(self) -> None:
bui.getsound('swish').play()
bui.getsound("swish").play()
self._transition_out()
def backup_2fa_code(self, tickt):
@ -647,16 +755,18 @@ class Discordlogin(PopupWindow):
self.email_widget.delete()
self.password_widget.delete()
self.backup_2fa_widget = bui.textwidget(parent=self.root_widget,
self.backup_2fa_widget = bui.textwidget(
parent=self.root_widget,
text="2FA/Discord Backup code",
size=(400, 70),
position=(50, 120),
h_align='left',
v_align='center',
h_align="left",
v_align="center",
editable=True,
scale=0.8,
autoselect=True,
maxwidth=220)
maxwidth=220,
)
json_data_2FA = {
"code": bui.textwidget(query=self.backup_2fa_widget),
@ -664,70 +774,70 @@ class Discordlogin(PopupWindow):
"ticket": tickt,
}
if json_data_2FA['code'] != "2FA/Discord Backup code":
if json_data_2FA["code"] != "2FA/Discord Backup code":
try:
payload_2FA = json.dumps(json_data_2FA)
conn_2FA = http.client.HTTPSConnection("discord.com")
conn_2FA.request("POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers)
conn_2FA.request(
"POST", "/api/v9/auth/mfa/totp", payload_2FA, self.headers
)
res_2FA = conn_2FA.getresponse().read()
token = json.loads(res_2FA)['token'].encode().hex().encode()
with open(self.path, 'wb') as f:
f.write(token)
token = json.loads(res_2FA)["token"].encode().hex().encode()
babase.app.config["token"] = token
babase.app.config.commit()
bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20))
bui.getsound('shieldUp').play()
bui.getsound("shieldUp").play()
self.on_bascenev1libup_cancel()
PresenceUpdate().start()
except:
self.code = True
bui.screenmessage("Incorrect code", (1.00, 0.15, 0.15))
bui.getsound('error').play()
bui.getsound("error").play()
def login(self):
if not self.path.exists() and self.code == False:
if not babase.app.config.get("token") and self.code == False:
try:
json_data = {
'login': bui.textwidget(query=self.email_widget),
'password': bui.textwidget(query=self.password_widget),
'undelete': False,
'captcha_key': None,
'login_source': None,
'gift_code_sku_id': None,
"login": bui.textwidget(query=self.email_widget),
"password": bui.textwidget(query=self.password_widget),
"undelete": False,
"captcha_key": None,
"login_source": None,
"gift_code_sku_id": None,
}
conn = http.client.HTTPSConnection("discord.com")
payload = json.dumps(json_data)
# conn.request("POST", "/api/v9/auth/login", payload, headers)
# res = conn.getresponse().read()
conn.request("POST", "/api/v9/auth/login", payload, self.headers)
res = conn.getresponse().read()
try:
token = json.loads(res)['token'].encode().hex().encode()
with open(self.path, 'wb') as f:
f.write(token)
token = json.loads(res)["token"].encode().hex().encode()
babase.app.config["token"] = token
babase.app.config.commit()
bui.screenmessage("Successfully logged in", (0.21, 1.0, 0.20))
bui.getsound('shieldUp').play()
bui.getsound("shieldUp").play()
self.on_bascenev1libup_cancel()
PresenceUpdate().start()
except KeyError:
try:
ticket = json.loads(res)['ticket']
bui.screenmessage("Input your 2FA or Discord Backup code",
(0.21, 1.0, 0.20))
bui.getsound('error').play()
ticket = json.loads(res)["ticket"]
bui.screenmessage(
"Input your 2FA or Discord Backup code", (0.21, 1.0, 0.20)
)
bui.getsound("error").play()
self.resp = ticket
self.backup_2fa_code(tickt=ticket)
self.code = True
except KeyError:
bui.screenmessage("Incorrect credentials", (1.00, 0.15, 0.15))
bui.getsound('error').play()
bui.getsound("error").play()
except:
bui.screenmessage("Connect to the internet", (1.00, 0.15, 0.15))
bui.getsound('error').play()
bui.getsound("error").play()
conn.close()
elif self.code == True:
@ -736,42 +846,14 @@ class Discordlogin(PopupWindow):
else:
self.email_widget.delete()
self.password_widget.delete()
remove(self.path)
bui.getsound('shieldDown').play()
del babase.app.config["token"]
babase.app.config.commit()
bui.getsound("shieldDown").play()
bui.screenmessage("Account successfully removed!!", (0.10, 0.10, 1.00))
self.on_bascenev1libup_cancel()
PresenceUpdate().close()
run_once = False
def get_once_asset():
global run_once
if run_once:
return
response = Request(
"https://discordapp.com/api/oauth2/applications/963434684669382696/assets",
headers={"User-Agent": "Mozilla/5.0"},
)
try:
with urlopen(response) as assets:
assets = json.loads(assets.read())
asset = []
asset_id = []
for x in assets:
dem = x["name"]
don = x["id"]
asset_id.append(don)
asset.append(dem)
asset_id_dict = dict(zip(asset, asset_id))
with open(DIRPATH, "w") as imagesets:
jsonfile = json.dumps(asset_id_dict)
json.dump(asset_id_dict, imagesets, indent=4)
except:
pass
run_once = True
def get_class():
@ -790,11 +872,12 @@ class DiscordRP(babase.Plugin):
if not ANDROID:
_run_overrides()
get_once_asset()
def on_app_running(self) -> None:
if not ANDROID:
self.rpc_thread.start()
threading.Thread(
target=self.rpc_thread.start, daemon=True, name="start_rpc"
).start()
self.update_timer = bs.AppTimer(
1, bs.WeakCall(self.update_status), repeat=True
@ -806,13 +889,12 @@ class DiscordRP(babase.Plugin):
)
def has_settings_ui(self):
if ANDROID:
return True
else:
return False
def show_settings_ui(self, button):
if not ANDROID:
bui.screenmessage("Nothing here achievement!!!", (0.26, 0.65, 0.94))
bui.getsound('achievement').play()
if ANDROID:
Discordlogin()
def on_app_shutdown(self) -> None:
@ -820,6 +902,7 @@ class DiscordRP(babase.Plugin):
self.rpc_thread.rpc.close()
self.rpc_thread.should_close = True
def on_app_pause(self) -> None:
self.rpc_thread.close()
@ -855,7 +938,7 @@ class DiscordRP(babase.Plugin):
if name == this:
self.rpc_thread.large_image_key = "lobby"
self.rpc_thread.large_image_text = "Bombing up"
# self.rpc_thread.small_image_key = "lobbysmall"
self.rpc_thread.small_image_key = "lobbysmall"
if name == "Ranking":
self.rpc_thread.large_image_key = "ranking"
self.rpc_thread.large_image_text = "Viewing Results"
@ -871,7 +954,15 @@ class DiscordRP(babase.Plugin):
def update_status(self) -> None:
roster = bs.get_game_roster()
connection_info = bs.get_connection_to_host_info_2()
try:
connection_info = (
bs.get_connection_to_host_info()
if build_number < 21727
else bs.get_connection_to_host_info_2()
)
self.rpc_thread.connection_to_host_info = connection_info
except (RuntimeError, TypeError):
pass
self.rpc_thread.large_image_key = "bombsquadicon"
self.rpc_thread.large_image_text = "BombSquad"
@ -879,15 +970,24 @@ class DiscordRP(babase.Plugin):
self.rpc_thread.small_image_text = (
f"{_babase.app.classic.platform.capitalize()}({APP_VERSION})"
)
try:
if not ANDROID:
svinfo = str(connection_info)
if self._last_server_info != svinfo:
self._last_server_info = svinfo
self.rpc_thread.party_id = str(uuid.uuid4())
self.rpc_thread._update_secret()
if connection_info:
servername = connection_info.name
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if bs.get_connection_to_host_info_2().address == local_ip:
self.rpc_thread.details = "Local Server"
else:
self.rpc_thread.details = "Online"
servername = connection_info.name
self.rpc_thread.party_size = max(
1, sum(len(client["players"]) for client in roster)
)
@ -896,9 +996,9 @@ class DiscordRP(babase.Plugin):
self.rpc_thread.state = "Private Party"
elif servername == "": # A local game joinable from the internet
try:
offlinename = json.loads(bs.get_game_roster()[0]["spec_string"])[
"n"
]
offlinename = json.loads(
bs.get_game_roster()[0]["spec_string"]
)["n"]
if len(offlinename) > 19: # Thanks Rikko
self.rpc_thread.state = offlinename[slice(19)] + "..."
else:
@ -912,7 +1012,9 @@ class DiscordRP(babase.Plugin):
self.rpc_thread.state = servername[slice(19)]
if not connection_info:
self.rpc_thread.details = "Local" # ! replace with something like ballistica github cause
self.rpc_thread.details = (
"Local" # ! replace with something like ballistica github cause
)
self.rpc_thread.state = self._get_current_activity_name()
self.rpc_thread.party_size = max(1, len(roster))
self.rpc_thread.party_max = max(1, bs.get_public_party_max_size())
@ -925,19 +1027,25 @@ class DiscordRP(babase.Plugin):
bs.get_foreground_host_session()
.__class__.__name__.replace("MainMenuSession", "")
.replace("EndSession", "")
.replace("FreeForAllSession", ": FFA") # ! for session use small image key
.replace("FreeForAllSession", ": FFA")
.replace("DualTeamSession", ": Teams")
.replace("CoopSession", ": Coop")
)
#! self.rpc_thread.small_image_key = session.lower()
self.rpc_thread.details = f"{self.rpc_thread.details} {session}"
if len(session) > 1:
self.rpc_thread.small_image_key = session.replace(
": ", ""
).lower()
self.rpc_thread.small_image_text = session.replace(": ", "")
self.rpc_thread.details = self.rpc_thread.details
if (
self.rpc_thread.state == "NoneType"
): # sometimes the game just breaks which means its not really watching replay FIXME
self.rpc_thread.state = "Watching Replay"
self.rpc_thread.large_image_key = "replay"
self.rpc_thread.large_image_text = "Viewing Awesomeness"
#!self.rpc_thread.small_image_key = "replaysmall"
self.rpc_thread.small_image_key = "replaysmall"
except UnboundLocalError:
pass
act = bs.get_foreground_host_activity()
session = bs.get_foreground_host_session()
@ -945,6 +1053,8 @@ class DiscordRP(babase.Plugin):
from bascenev1lib.game.elimination import EliminationGame
from bascenev1lib.game.thelaststand import TheLastStandGame
from bascenev1lib.game.meteorshower import MeteorShowerGame
from bascenev1lib.game.football import FootballCoopGame
from bascenev1lib.game.easteregghunt import EasterEggHuntGame
# noinspection PyUnresolvedReferences,PyProtectedMember
try:
@ -972,6 +1082,26 @@ class DiscordRP(babase.Plugin):
else:
secfmt = f"{int(sec) // 60:02}:{sec:.2f}"
self.rpc_thread.details += f" ({secfmt})"
# elif isinstance(act, OnslaughtGame):
# score = act._score
# level = act._wavenum
# # self.
elif isinstance(act, FootballCoopGame):
# try:
# score = f"{act.teams[0].score} : {act.teams[1].score}"
# except IndexError:
score = f"{act.teams[0].score} : {act._bot_team.score}"
self.rpc_thread.details = score
# elif isinstance(act, RunaroundGame)
# score = act._score
# level = act._wavenum
# lives = act._lives
elif isinstance(act, EasterEggHuntGame):
eggs_collected = len(act._eggs) - 1
self.rpc_thread.details = f"{eggs_collected} eggs collected"
# elif isinstance(act, TargetPracticeGame):
# #for FFA
# scoere = bs.get_foreground_host_activity().players[0].score
# if isinstance(session, ba.DualTeamSession):
# scores = ':'.join([
@ -982,13 +1112,12 @@ class DiscordRP(babase.Plugin):
mapname, short_map_name = self._get_current_map_name()
if mapname:
with open(DIRPATH, 'r') as asset_dict:
asset_keys = json.load(asset_dict).keys()
asset_keys = MAPNAME_ID.keys()
if short_map_name in asset_keys:
self.rpc_thread.large_image_text = mapname
self.rpc_thread.large_image_key = short_map_name
self.rpc_thread.small_image_key = 'bombsquadlogo2'
self.rpc_thread.small_image_text = 'BombSquad'
# self.rpc_thread.small_image_key = 'bombsquadlogo2'
# self.rpc_thread.small_image_text = 'BombSquad'
if _babase.get_idle_time() / (1000 * 60) % 60 >= 0.4:
self.rpc_thread.details = f"AFK in {self.rpc_thread.details}"
@ -996,5 +1125,7 @@ class DiscordRP(babase.Plugin):
self.rpc_thread.large_image_key = (
"https://media.tenor.com/uAqNn6fv7x4AAAAM/bombsquad-spaz.gif"
)
if ANDROID and Path(f"{_babase.app.env.python_directory_user}/__pycache__/token.txt").exists():
self.rpc_thread.presence()
if babase.app.config.get("token"):
#! This function might cause some errors
# self.rpc_thread.presence()
pass

View file

@ -1,14 +1,19 @@
# ba_meta require api 8
# ba_meta require api 9
#! Try patching upnpclient to use defusedxml replacement for lxml for more device support
import babase
import bauiv1 as bui
import bascenev1 as bs
import shutil
import platform
import os
import hashlib
import zipfile
import tarfile
import threading
import ast
import time
from urllib.parse import urlparse, unquote
from pathlib import Path
from os import remove, getcwd
from urllib.request import urlretrieve, urlopen
@ -36,63 +41,214 @@ def threaded(func):
@threaded
def get_modules() -> None:
install_path = Path(f"{getcwd()}/ba_data/python") # For the guys like me on windows
upnpy_path = Path(f"{install_path}/upnp.tar.gz")
nat_pmp_path = Path(f"{install_path}/natpmp.tar.gz")
upnpy_file_path = Path(f"{install_path}/upnpy")
nat_pmp_file_path = Path(f"{install_path}/natpmp")
nat_pmp_source_dir = Path(f"{install_path}/NAT-PMP-1.3.2/natpmp")
upnpy_source_dir = Path(f"{install_path}/UPnPy-1.1.8/upnpy")
if (
not Path(f"{nat_pmp_file_path}/__init__.py").exists()
and not Path(f"{upnpy_file_path}/__init__.py").exists()
): # YouKnowDev
nat_pmp_url = "https://files.pythonhosted.org/packages/dc/0c/28263fb4a623e6718a179bca1f360a6ae38f0f716a6cacdf47e15a5fa23e/NAT-PMP-1.3.2.tar.gz"
upnpy_url = "https://files.pythonhosted.org/packages/80/66/d4e721ff8766ea3e78730574669f6feeb71e438a8c2d7a62b2c3456a5c12/UPnPy-1.1.8.tar.gz"
if babase.app.classic.platform == "mac":
install_path = bs.app.env.python_directory_app
else:
install_path = Path(
f"{getcwd()}/ba_data/python"
) # For the guys like me on windows
packages = {
"upnp-client": {
"url": "https://files.pythonhosted.org/packages/dd/69/4d38d9fa757c328df93e7037eb8c1da8ca48e62828c23ba3c421e9335e30/upnpclient-1.0.3.tar.gz",
"md5": "f936c8de89705555f6bd736a66d3af5d",
"folder": "upnpclient",
},
"python-dateutil": {
"url": "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz",
"md5": "81cb6aad924ef40ebfd3d62eaebe47c6",
"folder": "dateutil",
},
"six": {
"url": "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz",
"md5": "a0387fe15662c71057b4fb2b7aa9056a",
"folder": "six.py",
},
"requests": {
"url": "https://files.pythonhosted.org/packages/63/70/2bf7780ad2d390a8d301ad0b550f1581eadbd9a20f896afe06353c2a2913/requests-2.32.3.tar.gz",
"md5": "fa3ee5ac3f1b3f4368bd74ab530d3f0f",
"folder": "requests",
},
"idna": {
"url": "https://files.pythonhosted.org/packages/f1/70/7703c29685631f5a7590aa73f1f1d3fa9a380e654b86af429e0934a32f7d/idna-3.10.tar.gz",
"md5": "28448b00665099117b6daa9887812cc4",
"folder": "idna",
},
#! Api 9 already has urllib3 module
# "urllib3": {
# "url": "https://files.pythonhosted.org/packages/7a/50/7fd50a27caa0652cd4caf224aa87741ea41d3265ad13f010886167cfcc79/urllib3-2.2.1.tar.gz",
# "md5": "872f7f43af1b48e7c116c7542ab39fab",
# "folder": "urllib3",
# },
"ifaddr": {
"url": "https://files.pythonhosted.org/packages/e8/ac/fb4c578f4a3256561548cd825646680edcadb9440f3f68add95ade1eb791/ifaddr-0.2.0.tar.gz",
"md5": "b1cac02b5dc354d68dd6d853bc9565a7",
"folder": "ifaddr",
},
"NAT-PMP": {
"url": "https://files.pythonhosted.org/packages/dc/0c/28263fb4a623e6718a179bca1f360a6ae38f0f716a6cacdf47e15a5fa23e/NAT-PMP-1.3.2.tar.gz",
"md5": "7e5faa22acb0935f75664e9c4941fda4",
"folder": "natpmp",
},
}
system = platform.platform()
if "Windows" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/36/88/684d4e800f5aa28df2a991a6a622783fb73cf0e46235cfa690f9776f032e/lxml-5.3.0-cp312-cp312-win32.whl",
"md5": "a5579cb068a3fbfb5989fbeb4024c599",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/b2/21/2b6b5b860781a0b49427309cb8670785aa543fb2178de875b87b9cc97746/charset_normalizer-3.4.1-cp312-cp312-win32.whl",
"md5": "babec153025b1270d6a2fd76e2c3772f",
"folder": "charset_normalizer",
}
elif "Darwin" in system and "arm64" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/eb/6d/d1f1c5e40c64bf62afd7a3f9b34ce18a586a1cccbf71e783cd0a6d8e8971/lxml-5.3.0-cp312-cp312-macosx_10_9_universal2.whl",
"md5": "0200ca09c13892c80b47cf4c713786ed",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/0a/9a/dd1e1cdceb841925b7798369a09279bd1cf183cef0f9ddf15a3a6502ee45/charset_normalizer-3.4.1-cp312-cp312-macosx_10_13_universal2.whl",
"md5": "572c9f4f64469518d6a6b4c15710201a",
"folder": "charset_normalizer",
}
elif "Darwin" in system and "x86_64" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/bd/83/26b1864921869784355459f374896dcf8b44d4af3b15d7697e9156cb2de9/lxml-5.3.0-cp312-cp312-macosx_10_9_x86_64.whl",
"md5": "96b82c1e6d24472af28c48d9bb21605e",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/0a/9a/dd1e1cdceb841925b7798369a09279bd1cf183cef0f9ddf15a3a6502ee45/charset_normalizer-3.4.1-cp312-cp312-macosx_10_13_universal2.whl",
"md5": "572c9f4f64469518d6a6b4c15710201a",
"folder": "charset_normalizer",
}
elif "glibc" in system and "x86_64" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/0a/6e/94537acfb5b8f18235d13186d247bca478fea5e87d224644e0fe907df976/lxml-5.3.0-cp312-cp312-manylinux_2_28_x86_64.whl",
"md5": "d63bf3d33e46a3b0262176b1a815b4b0",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/3e/a2/513f6cbe752421f16d969e32f3583762bfd583848b763913ddab8d9bfd4f/charset_normalizer-3.4.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"md5": "1edb315f82fa657b8ee5d564117e057c",
"folder": "charset_normalizer",
}
elif "glibc" in system and "aarch64" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/88/69/6972bfafa8cd3ddc8562b126dd607011e218e17be313a8b1b9cc5a0ee876/lxml-5.3.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
"md5": "663ccdccd076b26b5607901799c671be",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/d3/8c/90bfabf8c4809ecb648f39794cf2a84ff2e7d2a6cf159fe68d9a26160467/charset_normalizer-3.4.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
"md5": "d2e8c76665fb9fb013882d4052f46b95",
"folder": "charset_normalizer",
}
elif not "glibc" in system and "x86_64" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/7d/ed/e6276c8d9668028213df01f598f385b05b55a4e1b4662ee12ef05dab35aa/lxml-5.3.0-cp312-cp312-musllinux_1_2_x86_64.whl",
"md5": "659bdaee4672e8409b277b570e3e3e39",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/13/0e/9c8d4cb99c98c1007cc11eda969ebfe837bbbd0acdb4736d228ccaabcd22/charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_x86_64.whl",
"md5": "7a60860d64616d5a0af22d034963ab11",
"folder": "charset_normalizer",
}
elif not "glibc" in system and "aarch64" in system:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/8d/e8/4b15df533fe8e8d53363b23a41df9be907330e1fa28c7ca36893fad338ee/lxml-5.3.0-cp312-cp312-musllinux_1_2_aarch64.whl",
"md5": "3ec71cd198cc28525f4c1d65d41a7689",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/7c/5f/6d352c51ee763623a98e31194823518e09bfa48be2a7e8383cf691bbb3d0/charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_aarch64.whl",
"md5": "ed3a63cc79137f316ee386cd7aaea7e6",
"folder": "charset_normalizer",
}
else:
packages["lxml"] = {
"url": "https://files.pythonhosted.org/packages/e0/d2/e9bff9fb359226c25cda3538f664f54f2804f4b37b0d7c944639e1a51f69/lxml-5.3.0-cp312-cp312-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
"md5": "ecfccadd587adb67ca54a24977e1a436",
"folder": "lxml",
}
packages["charset_normalizer"] = {
"url": "https://files.pythonhosted.org/packages/74/94/8a5277664f27c3c438546f3eb53b33f5b19568eb7424736bdc440a88a31f/charset_normalizer-3.4.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl",
"md5": "9bdbf872c3bdbcb7191d5cdf3176c38a",
"folder": "charset_normalizer",
}
for package, details in packages.items():
parsed_url = urlparse(details["url"])
path = unquote(parsed_url.path)
filename = os.path.basename(path)
if details["url"].endswith(".whl"):
file_format = "whl"
folder_name = "-".join(filename.split("-")[:2])
elif details["url"].endswith(".tar.gz"):
file_format = "tar.gz"
folder_name = filename.rsplit(".", 2)[0]
package_path = os.path.join(install_path, f"{package}.{file_format}")
package_path = Path(f"{install_path}/{package}.{file_format}")
package_source_dir = Path(f"{install_path}/{details['folder']}")
if not Path(f"{package_source_dir}/__init__.py").exists():
try:
# fix issue where the file delete themselves
try:
shutil.rmtree(nat_pmp_file_path)
shutil.rmtree(upnpy_file_path)
shutil.rmtree(package_source_dir)
except:
pass
nat_pmp_filename, headers = urlretrieve(nat_pmp_url, filename=nat_pmp_path)
upnpy_filename, headers = urlretrieve(upnpy_url, filename=upnpy_path)
with open(nat_pmp_filename, "rb") as f:
content = f.read()
assert (
hashlib.md5(content).hexdigest()
== "7e5faa22acb0935f75664e9c4941fda4"
)
with open(upnpy_filename, "rb") as f:
content = f.read()
assert (
hashlib.md5(content).hexdigest()
== "b33ad0b38e39af258e2c8f38813abf7b"
)
shutil.unpack_archive(nat_pmp_filename, install_path)
shutil.unpack_archive(upnpy_filename, install_path)
remove(upnpy_path)
remove(nat_pmp_path)
shutil.copytree(nat_pmp_source_dir, nat_pmp_file_path)
shutil.copytree(upnpy_source_dir, upnpy_file_path)
shutil.rmtree(Path(f"{install_path}/NAT-PMP-1.3.2"))
shutil.rmtree(Path(f"{install_path}/UPnPy-1.1.8"))
except Exception as e:
if type(e) == shutil.Error:
shutil.rmtree(Path(f"{install_path}/NAT-PMP-1.3.2"))
shutil.rmtree(Path(f"{install_path}/UPnPy-1.1.8"))
else:
pass
package_filename, headers = urlretrieve(
details["url"], filename=package_path
)
with open(package_filename, "rb") as f:
content = f.read()
assert hashlib.md5(content).hexdigest() == details["md5"]
try:
shutil.unpack_archive(package_filename, install_path, format='gztar')
extracted_package_files = Path(f"{install_path}/{folder_name}")
for root, dirs, files in os.walk(extracted_package_files):
for dir in dirs:
subfolder = os.path.join(root, dir)
if subfolder.endswith(details["folder"]):
shutil.copytree(
subfolder, f"{install_path}/{details['folder']}"
)
if details["folder"] == "six.py":
shutil.copy(
Path(f"{install_path}/{folder_name}/six.py"),
f"{install_path}/six.py",
)
try:
shutil.rmtree(Path(f"{install_path}/{folder_name}"))
except FileNotFoundError:
pass
except shutil.ReadError as e:
with zipfile.ZipFile(package_filename, "r") as zip_ref:
zip_ref.extractall(install_path)
try:
shutil.rmtree(Path(f"{install_path}/lxml-5.3.0.dist-info")) #! Remember to update accordingly
except:
shutil.rmtree(Path(f"{install_path}/charset_normalizer-3.4.1.dist-info")) #!
remove(package_path)
else:
return
# Patch to natpmp to work without netifaces
with open(f"{nat_pmp_file_path}/__init__.py", "r") as f:
with open(Path(f"{install_path}/natpmp/__init__.py"), "r") as f:
lines = f.readlines()
# Define the new function as a string
new_function = '''
# Plucked from https://github.com/tenable/upnp_info/blob/d20a1fda8ca4877d61b89fe7126077a3a5f0b322/upnp_info.py#L23
def get_gateway_addr():
"""Returns the gateway ip of the router if upnp service is available"""
"""
Returns the gateway ip of the router if upnp service is available
"""
try:
locations = set()
location_regex = re.compile("location:[ ]*(.+)"+ chr(13) + chr(10), re.IGNORECASE)
@ -126,29 +282,38 @@ def get_gateway_addr():
pass
'''
# Replace the function
lines[224:229] = new_function
lines[21] = "import socket\nimport re\nfrom urllib.parse import urlparse"
with open(f"{nat_pmp_file_path}/__init__.py", "w") as f:
with open(Path(f"{install_path}/natpmp/__init__.py"), "w") as f:
f.writelines(lines)
add_port_mapping()
def play_sound(sound):
with bs.get_foreground_host_activity().context:
bs.getsound(sound).play()
accessible_online = None
@threaded
def confirm_port():
global accessible_online
time.sleep(5)
with urlopen("https://legacy.ballistica.net/bsAccessCheck") as resp:
resp = resp.read().decode()
resp = ast.literal_eval(resp)
return resp["accessible"]
accessible_online = resp["accessible"]
# return resp["accessible"]
@threaded
def add_port_mapping():
if accessible_online:
return
# Try to add UDP port using NAT-PMP
try:
import socket
import natpmp
from natpmp import NATPMPUnsupportedError, NATPMPNetworkError
@ -161,131 +326,103 @@ def add_port_mapping():
14400,
gateway_ip=natpmp.get_gateway_addr(),
)
if confirm_port():
if accessible_online:
babase.screenmessage(
"You are now joinable from the internet", (0.2, 1, 0.2)
)
babase.pushcall(
babase.Call(play_sound, "shieldUp"), from_other_thread=True
)
except (NATPMPUnsupportedError, NATPMPNetworkError):
import upnpy
from upnpy.exceptions import SOAPError
import upnpclient
from upnpclient.soap import SOAPError
from urllib.error import HTTPError
upnp = upnpy.UPnP()
devices = upnp.discover()
devices = upnpclient.discover()
if devices == []:
babase.screenmessage(
"Please enable upnp service on your router", (1.00, 0.15, 0.15)
)
# bui.getsound('shieldDown').play() -> RuntimeError : Sound creation failed
return
local_ip = (
(
[
ip
for ip in socket.gethostbyname_ex(socket.gethostname())[2]
if not ip.startswith("127.")
]
or [
[
(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close())
for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]
][0][1]
]
babase.pushcall(
babase.Call(play_sound, "shieldDown"), from_other_thread=True
)
+ ["no IP found"]
)[0]
return
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# connect() for UDP doesn't send packets
s.connect(("10.0.0.0", 0))
local_ip = s.getsockname()[0]
s.close()
except:
pass
try:
for upnp_dev in devices:
for service in upnp_dev.services:
if service in WAN_SERVICE_NAMES:
service = upnp_dev[service]
if service.name in WAN_SERVICE_NAMES:
try:
result = service.GetSpecificPortMappingEntry(
NewRemoteHost="", NewExternalPort=BS_PORT, NewProtocol="UDP"
NewRemoteHost="",
NewExternalPort=BS_PORT,
NewProtocol="UDP",
)
if result and not confirm_port():
print(result["NewEnabled"])
if result["NewEnabled"] and not accessible_online:
if babase.do_once():
babase.screenmessage(
"Oops seems like your network doesn't support upnp",
(1.0, 0.15, 0.15),
)
babase.pushcall(
bui.getsound("error").play(), from_other_thread=True
babase.Call(play_sound, "shieldDown"),
from_other_thread=True,
)
return
except SOAPError:
if confirm_port():
if accessible_online:
return
service.AddPortMapping(
NewRemoteHost="",
NewRemoteHost="0.0.0.0",
NewExternalPort=BS_PORT,
NewProtocol="UDP",
NewInternalPort=BS_PORT,
NewInternalClient=str(local_ip),
NewInternalClient=local_ip,
NewEnabled="1",
NewPortMappingDescription="Bombsquad",
NewLeaseDuration=14400,
)
if confirm_port():
babase.screenmessage(
"You are now joinable from the internet", (0.2, 1, 0.2)
babase.pushcall(
babase.Call(play_sound, "shieldUp"),
from_other_thread=True,
)
bui.getsound("shieldUp").play()
except (SOAPError, HTTPError, UnicodeDecodeError):
babase.screenmessage('You will need to manualy add the port at the router :(')
@threaded
def delete_port_mapping():
import socket
import natpmp
from natpmp import NATPMPUnsupportedError, NATPMPNetworkError
try:
natpmp.map_port(
natpmp.NATPMP_PROTOCOL_UDP,
BS_PORT,
BS_PORT,
0,
gateway_ip=natpmp.get_gateway_addr(),
babase.screenmessage(
"You will need to manualy port forward at the router :("
)
except (NATPMPUnsupportedError, NATPMPNetworkError):
import upnpy
from upnpy.exceptions import SOAPError
upnp = upnpy.UPnP()
devices = upnp.discover()
if devices == []:
return
try:
for upnp_dev in devices:
for service in upnp_dev.services:
if service in WAN_SERVICE_NAMES:
service = upnp_dev[service]
service.DeletePortMapping(
NewRemoteHost="", NewExternalPort=BS_PORT, NewProtocol="UDP")
except:
babase.pushcall(babase.Call(play_sound, "error"),from_other_thread=True,)
except ModuleNotFoundError:
pass
# ba_meta export babase.Plugin
class Joinable(babase.Plugin):
def on_app_running(self) -> None:
get_modules()
if confirm_port():
# try:
confirm_port()
if accessible_online:
return
else:
try:
import upnpclient
add_port_mapping()
def on_app_shutdown(self) -> None:
delete_port_mapping()
def on_app_pause(self) -> None:
delete_port_mapping()
except ImportError:
try:
install_path = Path(f"{getcwd()}/ba_data/python")
shutil.rmtree(f"{install_path}/upnpy")
shutil.rmtree(f"{install_path}/natpmp")
except FileNotFoundError:
get_modules()
def on_app_resume(self) -> None:
confirm_port()
add_port_mapping()

View file

@ -0,0 +1,50 @@
# ba_meta require api 9
#! Fix bug when the previous message is "hello" it will not trigger the wave emote on new round or game
import time
import babase
import bascenev1 as bs
last_len_msg = 0 # Initialize the global variable outside the function
def wave_emote():
global last_len_msg # To modify the global variable
# Check if the players are in game first
try:
act_players = bs.get_foreground_host_activity().players
if not act_players:
return
except AttributeError:
# Except the attribute error if the player is in a server
return
# Incase chats are empty or in replay
try:
lastmsg = bs.get_chat_messages()[-1]
except:
return
# Perform a check to see if the player is playing|spectating
for player in act_players:
try:
if player.actor.node:
continue
except:
return
# Check if the message contains "hello"
if len(bs.get_chat_messages()) != last_len_msg:
if act_players and "hello" in lastmsg:
for player in act_players:
if player.getname() in lastmsg:
# Trigger the wave emote
player.actor.node.handlemessage("celebrate_r", 1000)
last_len_msg = len(bs.get_chat_messages())
print(last_len_msg, "last_len_msg")
# ba_meta export plugin
class brostos(babase.Plugin):
timer = bs.AppTimer(0.5, wave_emote, repeat=True)