Compare commits

...

2 commits

Author SHA1 Message Date
Ayush Saini
4e5156073d updating binary for MFA
Some checks failed
CI / run_server_binary (push) Has been cancelled
2025-09-16 00:22:03 +05:30
Ayush Saini
f26d3cc213 adding MFA for V2 accounts 2025-09-16 00:08:59 +05:30
8 changed files with 162 additions and 7 deletions

View file

@ -32,6 +32,7 @@ def get_player_icon(sessionplayer: bascenev1.SessionPlayer) -> dict[str, Any]:
'tint2_color': info['tint2_color'],
}
def filter_chat_message(msg: str, client_id: int) -> str | None:
try:
import custom_hooks as chooks
@ -45,12 +46,37 @@ def filter_chat_message(msg: str, client_id: int) -> str | None:
to ignore the message.
"""
try:
return chooks.filter_chat_message(msg,client_id)
return chooks.filter_chat_message(msg, client_id)
except:
return msg
def kick_vote_started(by:str,to:str) -> None:
def bcs_verify_client_account_ip(account_id: str, ip: str, client_id: int) -> str | None:
try:
import custom_hooks as chooks
except:
pass
"""Verify a client account ID and IP address.
This is called when a client connects while using BCS servers.
If you return None, the connection is dropped
If you return a string, the connection proceeds as normal.
TODO make this actually do something. For now we have to disconnect client manually
Here account_id is display_string of account ~ V2 account tag not pb-id (which can be spoofed easily).
"""
try:
return chooks.bcs_verify_client_account_ip(account_id, ip, client_id)
except Exception as e:
print(e)
return
def kick_vote_started(by: str, to: str) -> None:
print("kick vot started by"+by+" to"+to)
def local_chat_message(msg: str) -> None:
classic = babase.app.classic
assert classic is not None

View file

@ -446,3 +446,11 @@ def on_classic_app_mode_active():
_bascenev1.set_transparent_kickvote(settings["ShowKickVoteStarterName"])
_bascenev1.set_kickvote_msg_type(settings["KickVoteMsgType"])
_bascenev1.hide_player_device_id(settings["Anti-IdRevealer"])
def bcs_verify_client_account_ip(account_id: str, ip: str, client_id: int) -> str | None:
"""Verify a client account ID and IP address.
"""
if settings["mfa"]["enable"]:
_thread.start_new_thread(servercheck.account_check,
(account_id, ip, client_id))

41
dist/ba_root/mods/repository/db.py vendored Normal file
View file

@ -0,0 +1,41 @@
import os
import sqlite3
import time
import random
DB_PATH = os.path.expanduser("~/shared_bcs_data/bcsserverdata.db")
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
MAX_RETRIES = 5
RETRY_DELAY = (0.1, 0.5)
def get_connection():
"""Get a SQLite connection with WAL enabled."""
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.execute("PRAGMA journal_mode=WAL;")
return conn
def run_query(query, params=(), fetch=False):
"""Execute query with retry logic. Returns rows if fetch=True."""
retries = 0
while True:
try:
conn = get_connection()
cur = conn.cursor()
cur.execute(query, params)
rows = cur.fetchall() if fetch else None
conn.commit()
conn.close()
return rows
except sqlite3.OperationalError as e:
if "database is locked" in str(e).lower():
retries += 1
if retries > MAX_RETRIES:
raise RuntimeError(
"Max retries exceeded due to DB lock") from e
time.sleep(random.uniform(*RETRY_DELAY))
else:
raise

View file

@ -0,0 +1,38 @@
from repository.db import run_query
def init_db():
run_query("""
CREATE TABLE IF NOT EXISTS profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
v2Tag TEXT,
lastIP TEXT,
server_profile_created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
init_db()
def get_profile(v2Tag):
rows = run_query(
"SELECT id, v2Tag, lastIP FROM profiles WHERE v2Tag = ?",
(v2Tag,),
fetch=True,
)
if rows:
return {
'id': rows[0][0],
'v2Tag': rows[0][1],
'lastIP': rows[0][2],
}
return None
def upsert_ip(v2Tag, ip):
run_query("""
INSERT INTO profiles (v2Tag, lastIP) VALUES (?, ?)
ON CONFLICT(v2Tag) DO UPDATE SET lastIP=excluded.lastIP
""", (v2Tag, ip))

View file

@ -18,7 +18,7 @@
"StumbledScoreScreen": true,
"WarnCooldownMinutes": 30,
"afk_remover": {
"enable": false,
"enable": true,
"ingame_idle_time_in_secs": 60,
"kick_idle_from_lobby": true,
"lobby_idle_time_in_secs": 10
@ -158,7 +158,7 @@
"sameCharacterForTeam": false,
"statsResetAfterDays": 31,
"textonmap": {
"bottom left watermark": "Owner : <owner-name> \nEditor : <bablu>\nScripts : BCS1.7.41",
"bottom left watermark": "Owner : <owner-name> \nEditor : <bablu>\nScripts : BCS1.7.51",
"center highlights": {
"color": [
1,
@ -177,5 +177,10 @@
},
"useV2Account": false,
"warnMsg": "WARNING !!!",
"whitelist": false
"whitelist": false,
"mfa" : {
"enable": true,
"enforce_for_all_players": false,
"enforce_for_accounts": [ "HeySmoothy", "test" ]
}
}

View file

@ -1,3 +1,4 @@
import _thread
import http.client
import json

View file

@ -19,7 +19,7 @@ import babase
import bascenev1 as bs
from babase._general import Call
from tools import logger
from repository import profiles
blacklist = pdata.get_blacklist()
settings = setting.get_settings_data()
@ -63,7 +63,7 @@ class checkserver(object):
else:
ipClientMap[ip].append(ros["client_id"])
if len(ipClientMap[ip]) >= settings['maxAccountPerIP']:
_babase.chatmessage(
bs.chatmessage(
f"Only {settings['maxAccountPerIP']} player per IP allowed, disconnecting this device.",
clients=[
ros["client_id"]])
@ -445,3 +445,39 @@ def on_join_request(ip):
serverdata.ips[ip] = {"lastRequest": time.time(), "count": count}
else:
serverdata.ips[ip] = {"lastRequest": time.time(), "count": 0}
# this method is running in different thread , be careful while editing
def account_check(account_id, ip, client_id):
if not account_id.startswith("\ue063"):
return
account_id = account_id.replace("\ue063", "")
profile = profiles.get_profile(account_id)
if settings["mfa"]["enforce_for_all_players"] or account_id in settings["mfa"]["enforce_for_accounts"]:
if profile is None:
# check bcs master
try:
data = urllib.request.urlopen(
f"https://mods.ballistica.workers.dev/verifyownerip?ip={ip}&tag={account_id}")
except:
_babase.pushcall(Call(bs.chatmessage,
"Click stats button and login your V2 account, to verify your identity", [client_id]), from_other_thread=True)
_babase.pushcall(
Call(bs.disconnect_client, client_id, 2), from_other_thread=True)
return
profiles.upsert_ip(account_id, ip)
else:
if profile["lastIP"] != ip:
# ip changed , do something
# disconnect client for now , wiht warning to login bcs website again
try:
data = urllib.request.urlopen(
f"https://mods.ballistica.workers.dev/verifyownerip?ip={ip}&tag={account_id}")
except:
_babase.pushcall(Call(bs.chatmessage,
"Click stats button and login your V2 account, to verify your identity", [client_id]), from_other_thread=True)
_babase.pushcall(
Call(bs.disconnect_client, client_id, 2), from_other_thread=True)
return
profiles.upsert_ip(account_id, ip)

Binary file not shown.