1.7.17 base update

This commit is contained in:
Ayush Saini 2022-12-25 00:39:49 +05:30
parent e5034cbb6f
commit 0610d132f2
48 changed files with 2309 additions and 511 deletions

View file

@ -252,7 +252,6 @@ def submit_score(
name: Any,
score: int | None,
callback: Callable,
friend_callback: Callable | None,
order: str = 'increasing',
tournament_id: str | None = None,
score_type: str = 'points',

View file

@ -43,7 +43,8 @@ from _ba import (
newnode,
playsound,
printnodes,
printobjects,
ls_objects,
ls_input_devices,
pushcall,
quit,
rowwidget,
@ -316,7 +317,8 @@ __all__ = [
'print_error',
'print_exception',
'printnodes',
'printobjects',
'ls_objects',
'ls_input_devices',
'pushcall',
'quit',
'rowwidget',

View file

@ -48,6 +48,9 @@ class AccountV1Subsystem:
_ba.pushcall(do_auto_sign_in)
def on_app_pause(self) -> None:
"""Should be called when app is pausing."""
def on_app_resume(self) -> None:
"""Should be called when the app is resumed."""

View file

@ -4,13 +4,23 @@
from __future__ import annotations
import hashlib
import logging
from typing import TYPE_CHECKING
from efro.call import tpartial
from efro.error import CommunicationError
from bacommon.login import LoginType
import _ba
if TYPE_CHECKING:
from typing import Any
from ba._login import LoginAdapter
DEBUG_LOG = False
class AccountV2Subsystem:
"""Subsystem for modern account handling in the app.
@ -30,9 +40,23 @@ class AccountV2Subsystem:
self._kicked_off_workspace_load = False
self.login_adapters: dict[LoginType, LoginAdapter] = {}
self._implicit_signed_in_adapter: LoginAdapter | None = None
self._implicit_state_changed = False
self._can_do_auto_sign_in = True
if _ba.app.platform == 'android' and _ba.app.subplatform == 'google':
from ba._login import LoginAdapterGPGS
self.login_adapters[LoginType.GPGS] = LoginAdapterGPGS()
def on_app_launch(self) -> None:
"""Should be called at standard on_app_launch time."""
for adapter in self.login_adapters.values():
adapter.on_app_launch()
def set_primary_credentials(self, credentials: str | None) -> None:
"""Set credentials for the primary app account."""
raise RuntimeError('This should be overridden.')
@ -49,7 +73,7 @@ class AccountV2Subsystem:
@property
def primary(self) -> AccountV2Handle | None:
"""The primary account for the app, or None if not logged in."""
return None
return self.do_get_primary()
def do_get_primary(self) -> AccountV2Handle | None:
"""Internal - should be overridden by subclass."""
@ -60,9 +84,11 @@ class AccountV2Subsystem:
) -> None:
"""Callback run after the primary account changes.
Will be called with None on log-outs or when new credentials
Will be called with None on log-outs and when new credentials
are set but have not yet been verified.
"""
assert _ba.in_logic_thread()
# Currently don't do anything special on sign-outs.
if account is None:
return
@ -99,6 +125,30 @@ class AccountV2Subsystem:
self._initial_login_completed = True
_ba.app.on_initial_login_completed()
def on_active_logins_changed(self, logins: dict[LoginType, str]) -> None:
"""Should be called when logins for the active account change."""
for adapter in self.login_adapters.values():
adapter.set_active_logins(logins)
def on_implicit_sign_in(
self, login_type: LoginType, login_id: str, display_name: str
) -> None:
"""An implicit sign-in happened (called by native layer)."""
from ba._login import LoginAdapter
with _ba.Context('ui'):
self.login_adapters[login_type].set_implicit_login_state(
LoginAdapter.ImplicitLoginState(
login_id=login_id, display_name=display_name
)
)
def on_implicit_sign_out(self, login_type: LoginType) -> None:
"""An implicit sign-out happened (called by native layer)."""
with _ba.Context('ui'):
self.login_adapters[login_type].set_implicit_login_state(None)
def on_no_initial_primary_account(self) -> None:
"""Callback run if the app has no primary account after launch.
@ -110,6 +160,240 @@ class AccountV2Subsystem:
self._initial_login_completed = True
_ba.app.on_initial_login_completed()
@staticmethod
def _hashstr(val: str) -> str:
md5 = hashlib.md5()
md5.update(val.encode())
return md5.hexdigest()
def on_implicit_login_state_changed(
self,
login_type: LoginType,
state: LoginAdapter.ImplicitLoginState | None,
) -> None:
"""Called when implicit login state changes.
Login systems that tend to sign themselves in/out in the
background are considered implicit. We may choose to honor or
ignore their states, allowing the user to opt for other login
types even if the default implicit one can't be explicitly
logged out or otherwise controlled.
"""
from ba._language import Lstr
assert _ba.in_logic_thread()
cfg = _ba.app.config
cfgkey = 'ImplicitLoginStates'
cfgdict = _ba.app.config.setdefault(cfgkey, {})
# Store which (if any) adapter is currently implicitly signed in.
# Making the assumption there will only ever be one implicit
# adapter at a time; may need to update this if that changes.
prev_state = cfgdict.get(login_type.value)
if state is None:
self._implicit_signed_in_adapter = None
new_state = cfgdict[login_type.value] = None
else:
self._implicit_signed_in_adapter = self.login_adapters[login_type]
new_state = cfgdict[login_type.value] = self._hashstr(
state.login_id
)
# Special case: if the user is already signed in but not with
# this implicit login, we may want to let them know that the
# 'Welcome back FOO' they likely just saw is not actually
# accurate.
if (
self.primary is not None
and not self.login_adapters[login_type].is_back_end_active()
):
if login_type is LoginType.GPGS:
service_str = Lstr(resource='googlePlayText')
else:
service_str = None
if service_str is not None:
_ba.timer(
2.0,
tpartial(
_ba.screenmessage,
Lstr(
resource='notUsingAccountText',
subs=[
('${ACCOUNT}', state.display_name),
('${SERVICE}', service_str),
],
),
(1, 0.5, 0),
),
)
cfg.commit()
# We want to respond any time the implicit state changes;
# generally this means the user has explicitly signed in/out or
# switched accounts within that back-end.
if prev_state != new_state:
if DEBUG_LOG:
logging.debug(
'AccountV2: Implicit state changed (%s -> %s);'
' will update app sign-in state accordingly.',
prev_state,
new_state,
)
self._implicit_state_changed = True
# We may want to auto-sign-in based on this new state.
self._update_auto_sign_in()
def on_cloud_connectivity_changed(self, connected: bool) -> None:
"""Should be called with cloud connectivity changes."""
del connected # Unused.
assert _ba.in_logic_thread()
# We may want to auto-sign-in based on this new state.
self._update_auto_sign_in()
def _update_auto_sign_in(self) -> None:
from ba._internal import get_v1_account_state
# If implicit state has changed, try to respond.
if self._implicit_state_changed:
if self._implicit_signed_in_adapter is None:
# If implicit back-end is signed out, follow suit
# immediately; no need to wait for network connectivity.
if DEBUG_LOG:
logging.debug(
'AccountV2: Signing out as result'
' of implicit state change...',
)
_ba.app.accounts_v2.set_primary_credentials(None)
self._implicit_state_changed = False
# Once we've made a move here we don't want to
# do any more automatic ones.
self._can_do_auto_sign_in = False
else:
# Ok; we've got a new implicit state. If we've got
# connectivity, let's attempt to sign in with it.
# Consider this an 'explicit' sign in because the
# implicit-login state change presumably was triggered
# by some user action (signing in, signing out, or
# switching accounts via the back-end).
# NOTE: should test case where we don't have
# connectivity here.
if _ba.app.cloud.is_connected():
if DEBUG_LOG:
logging.debug(
'AccountV2: Signing in as result'
' of implicit state change...',
)
self._implicit_signed_in_adapter.sign_in(
self._on_explicit_sign_in_completed
)
self._implicit_state_changed = False
# Once we've made a move here we don't want to
# do any more automatic ones.
self._can_do_auto_sign_in = False
if not self._can_do_auto_sign_in:
return
# If we're not currently signed in, we have connectivity, and
# we have an available implicit login, auto-sign-in with it.
# The implicit-state-change logic above should keep things
# mostly in-sync, but due to connectivity or other issues that
# might not always be the case. We prefer to keep people signed
# in as a rule, even if there are corner cases where this might
# not be what they want (A user signing out and then restarting
# may be auto-signed back in).
connected = _ba.app.cloud.is_connected()
signed_in_v1 = get_v1_account_state() == 'signed_in'
signed_in_v2 = _ba.app.accounts_v2.have_primary_credentials()
if (
connected
and not signed_in_v1
and not signed_in_v2
and self._implicit_signed_in_adapter is not None
):
if DEBUG_LOG:
logging.debug(
'AccountV2: Signing in due to on-launch-auto-sign-in...',
)
self._can_do_auto_sign_in = False # Only ATTEMPT once
self._implicit_signed_in_adapter.sign_in(
self._on_implicit_sign_in_completed
)
def _on_explicit_sign_in_completed(
self,
adapter: LoginAdapter,
result: LoginAdapter.SignInResult | Exception,
) -> None:
"""A sign-in has completed that the user asked for explicitly."""
from ba._language import Lstr
del adapter # Unused.
# Make some noise on errors since the user knows
# a sign-in attempt is happening in this case.
if isinstance(result, Exception):
# We expect the occasional communication errors;
# Log a full exception for anything else though.
if not isinstance(result, CommunicationError):
logging.warning(
'Error on explicit accountv2 sign in attempt.',
exc_info=result,
)
with _ba.Context('ui'):
_ba.screenmessage(
Lstr(resource='internal.signInErrorText'),
color=(1, 0, 0),
)
_ba.playsound(_ba.getsound('error'))
# Also I suppose we should sign them out in this case since
# it could be misleading to be still signed in with the old
# account.
_ba.app.accounts_v2.set_primary_credentials(None)
return
_ba.app.accounts_v2.set_primary_credentials(result.credentials)
def _on_implicit_sign_in_completed(
self,
adapter: LoginAdapter,
result: LoginAdapter.SignInResult | Exception,
) -> None:
"""A sign-in has completed that the user didn't ask for explicitly."""
from ba._internal import get_v1_account_state
del adapter # Unused.
# Log errors but don't inform the user; they're not aware of this
# attempt and ignorance is bliss.
if isinstance(result, Exception):
# We expect the occasional communication errors;
# Log a full exception for anything else though.
if not isinstance(result, CommunicationError):
logging.warning(
'Error on implicit accountv2 sign in attempt.',
exc_info=result,
)
return
# If we're still connected and still not signed in,
# plug in the credentials we got. We want to be extra cautious
# in case the user has since explicitly signed in since we
# kicked off.
connected = _ba.app.cloud.is_connected()
signed_in_v1 = get_v1_account_state() == 'signed_in'
signed_in_v2 = _ba.app.accounts_v2.have_primary_credentials()
if connected and not signed_in_v1 and not signed_in_v2:
_ba.app.accounts_v2.set_primary_credentials(result.credentials)
def _on_set_active_workspace_completed(self) -> None:
if not self._initial_login_completed:
self._initial_login_completed = True
@ -129,8 +413,17 @@ class AccountV2Handle:
self.workspacename: str | None = None
self.workspaceid: str | None = None
# Login types and their display-names associated with this account.
self.logins: dict[LoginType, str] = {}
def __enter__(self) -> None:
"""Support for "with" statement."""
"""Support for "with" statement.
This allows cloud messages to be sent on our behalf.
"""
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> Any:
"""Support for "with" statement."""
"""Support for "with" statement.
This allows cloud messages to be sent on our behalf.
"""

View file

@ -697,7 +697,7 @@ class Achievement:
# Even though there are technically achievements when we're not
# signed in, lets not show them (otherwise we tend to get
# confusing 'controller connected' achievements popping up while
# waiting to log in which can be confusing).
# waiting to sign in which can be confusing).
if _internal.get_v1_account_state() != 'signed_in':
return

View file

@ -32,6 +32,7 @@ if TYPE_CHECKING:
from bastd.actor import spazappearance
from ba._accountv2 import AccountV2Subsystem
from ba._level import Level
from ba._apputils import AppHealthMonitor
class App:
@ -50,7 +51,9 @@ class App:
# Implementations for these will be filled in by internal libs.
accounts_v2: AccountV2Subsystem
cloud: CloudSubsystem
log_handler: efro.log.LogHandler
health_monitor: AppHealthMonitor
class State(Enum):
"""High level state the app can be in."""
@ -346,7 +349,6 @@ class App:
# pylint: disable=cyclic-import
# pylint: disable=too-many-locals
from ba import _asyncio
from ba import _apputils
from ba import _appconfig
from ba import _map
from ba import _campaign
@ -354,10 +356,16 @@ class App:
from bastd import maps as stdmaps
from bastd.actor import spazappearance
from ba._generated.enums import TimeType
from ba._apputils import (
log_dumped_app_state,
handle_leftover_v1_cloud_log_file,
AppHealthMonitor,
)
assert _ba.in_logic_thread()
self._aioloop = _asyncio.setup_asyncio()
self.health_monitor = AppHealthMonitor()
cfg = self.config
@ -401,15 +409,15 @@ class App:
# If there's a leftover log file, attempt to upload it to the
# master-server and/or get rid of it.
_apputils.handle_leftover_v1_cloud_log_file()
handle_leftover_v1_cloud_log_file()
# Only do this stuff if our config file is healthy so we don't
# overwrite a broken one or whatnot and wipe out data.
if not self.config_file_healthy:
if self.platform in ('mac', 'linux', 'windows'):
from bastd.ui import configerror
from bastd.ui.configerror import ConfigErrorWindow
configerror.ConfigErrorWindow()
_ba.pushcall(ConfigErrorWindow)
return
# For now on other systems we just overwrite the bum config.
@ -459,6 +467,9 @@ class App:
'on_app_launch found state %s; expected LAUNCHING.', self.state
)
# If any traceback dumps happened last run, log and clear them.
log_dumped_app_state()
self._launch_completed = True
self._update_state()
@ -483,8 +494,23 @@ class App:
assert _ba.in_logic_thread()
if self._app_paused:
# Entering paused state:
if self.state is not self.State.PAUSED:
self.state = self.State.PAUSED
self.cloud.on_app_pause()
self.accounts_v1.on_app_pause()
self.plugins.on_app_pause()
self.health_monitor.on_app_pause()
else:
# Leaving paused state:
if self.state is self.State.PAUSED:
self.fg_state += 1
self.cloud.on_app_resume()
self.accounts_v1.on_app_resume()
self.music.on_app_resume()
self.plugins.on_app_resume()
self.health_monitor.on_app_resume()
if self._initial_login_completed and self._meta_scan_completed:
self.state = self.State.RUNNING
if not self._called_on_app_running:
@ -498,19 +524,16 @@ class App:
def on_app_pause(self) -> None:
"""Called when the app goes to a suspended state."""
assert not self._app_paused # Should avoid redundant calls.
self._app_paused = True
self._update_state()
self.plugins.on_app_pause()
def on_app_resume(self) -> None:
"""Run when the app resumes from a suspended state."""
assert self._app_paused # Should avoid redundant calls.
self._app_paused = False
self._update_state()
self.fg_state += 1
self.accounts_v1.on_app_resume()
self.music.on_app_resume()
self.plugins.on_app_resume()
def on_app_shutdown(self) -> None:
"""(internal)"""

View file

@ -5,12 +5,17 @@ from __future__ import annotations
import gc
import os
import logging
from threading import Thread
from dataclasses import dataclass
from typing import TYPE_CHECKING
from efro.log import LogLevel
from efro.dataclassio import ioprepped, dataclass_to_json, dataclass_from_json
import _ba
if TYPE_CHECKING:
from typing import Any
from typing import Any, TextIO
import ba
@ -260,3 +265,262 @@ def print_corrupt_file_error() -> None:
_ba.timer(
2.0, Call(_ba.playsound, _ba.getsound('error')), timetype=TimeType.REAL
)
_tbfiles: list[TextIO] = []
@ioprepped
@dataclass
class DumpedAppStateMetadata:
"""High level info about a dumped app state."""
reason: str
app_time: float
log_level: LogLevel
def dump_app_state(
delay: float = 0.0,
reason: str = 'Unspecified',
log_level: LogLevel = LogLevel.WARNING,
) -> None:
"""Dump various app state for debugging purposes.
This includes stack traces for all Python threads (and potentially
other info in the future).
This is intended for use debugging deadlock situations. It will dump
to preset file location(s) in the app config dir, and will attempt to
log and clear the results after dumping. If that should fail (due to
a hung app, etc.), then the results will be logged and cleared on the
next app run.
Do not use this call during regular smooth operation of the app; it
is should only be used for debugging or in response to confirmed
problems as it can leak file descriptors, cause hitches, etc.
"""
# pylint: disable=consider-using-with
import faulthandler
from ba._generated.enums import TimeType
# Dump our metadata immediately. If a delay is passed, it generally
# means we expect things to hang momentarily, so we should not delay
# writing our metadata or it will likely not happen. Though we
# should remember that metadata doesn't line up perfectly in time with
# the dump in that case.
try:
mdpath = os.path.join(
os.path.dirname(_ba.app.config_file_path), '_appstate_dump_md'
)
with open(mdpath, 'w', encoding='utf-8') as outfile:
outfile.write(
dataclass_to_json(
DumpedAppStateMetadata(
reason=reason,
app_time=_ba.time(TimeType.REAL),
log_level=log_level,
)
)
)
except Exception:
# Abandon whole dump if we can't write metadata.
logging.exception('Error writing app state dump metadata.')
return
tbpath = os.path.join(
os.path.dirname(_ba.app.config_file_path), '_appstate_dump_tb'
)
# faulthandler needs the raw file descriptor to still be valid when
# it fires, so stuff this into a global var to make sure it doesn't get
# cleaned up.
tbfile = open(tbpath, 'w', encoding='utf-8')
_tbfiles.append(tbfile)
if delay > 0.0:
faulthandler.dump_traceback_later(delay, file=tbfile)
else:
faulthandler.dump_traceback(file=tbfile)
# Attempt to log shortly after dumping.
# Allow sufficient time since we don't know how long the dump takes.
# We want this to work from any thread, so need to kick this part
# over to the logic thread so timer works.
_ba.pushcall(
lambda: _ba.timer(
delay + 1.0, log_dumped_app_state, timetype=TimeType.REAL
),
from_other_thread=True,
suppress_other_thread_warning=True,
)
def log_dumped_app_state() -> None:
"""If an app-state dump exists, log it and clear it. No-op otherwise."""
try:
out = ''
mdpath = os.path.join(
os.path.dirname(_ba.app.config_file_path), '_appstate_dump_md'
)
if os.path.exists(mdpath):
with open(mdpath, 'r', encoding='utf-8') as infile:
metadata = dataclass_from_json(
DumpedAppStateMetadata, infile.read()
)
os.unlink(mdpath)
out += (
f'App state dump:\nReason: {metadata.reason}\n'
f'Time: {metadata.app_time:.2f}'
)
tbpath = os.path.join(
os.path.dirname(_ba.app.config_file_path), '_appstate_dump_tb'
)
if os.path.exists(tbpath):
with open(tbpath, 'r', encoding='utf-8') as infile:
out += '\nPython tracebacks:\n' + infile.read()
os.unlink(tbpath)
logging.log(metadata.log_level.python_logging_level, out)
except Exception:
logging.exception('Error logging dumped app state.')
class AppHealthMonitor:
"""Logs things like app-not-responding issues."""
def __init__(self) -> None:
assert _ba.in_logic_thread()
self._running = True
self._thread = Thread(target=self._app_monitor_thread_main, daemon=True)
self._thread.start()
self._response = False
self._first_check = True
def _app_monitor_thread_main(self) -> None:
try:
self._monitor_app()
except Exception:
logging.exception('Error in AppHealthMonitor thread.')
def _set_response(self) -> None:
assert _ba.in_logic_thread()
self._response = True
def _check_running(self) -> bool:
# Workaround for the fact that mypy assumes _running
# doesn't change during the course of a function.
return self._running
def _monitor_app(self) -> None:
import time
while bool(True):
# Always sleep a bit between checks.
time.sleep(1.234)
# Do nothing while backgrounded.
while not self._running:
time.sleep(2.3456)
# Wait for the logic thread to run something we send it.
starttime = time.monotonic()
self._response = False
_ba.pushcall(self._set_response, raw=True)
while not self._response:
# Abort this check if we went into the background.
if not self._check_running():
break
# Wait a bit longer the first time through since the app
# could still be starting up; we generally don't want to
# report that.
threshold = 10 if self._first_check else 5
# If we've been waiting too long (and the app is running)
# dump the app state and bail. Make an exception for the
# first check though since the app could just be taking
# a while to get going; we don't want to report that.
duration = time.monotonic() - starttime
if duration > threshold:
dump_app_state(
reason=f'Logic thread unresponsive'
f' for {threshold} seconds.'
)
# We just do one alert for now.
return
time.sleep(1.042)
self._first_check = False
def on_app_pause(self) -> None:
"""Should be called when the app pauses."""
assert _ba.in_logic_thread()
self._running = False
def on_app_resume(self) -> None:
"""Should be called when the app resumes."""
assert _ba.in_logic_thread()
self._running = True
def on_too_many_file_descriptors() -> None:
"""Called when too many file descriptors are open; trying to debug."""
from ba._generated.enums import TimeType
real_time = _ba.time(TimeType.REAL)
def _do_log() -> None:
import subprocess
pid = os.getpid()
out = f'TOO MANY FDS at {real_time}.\nWe are pid {pid}\n'
out += (
'FD Count: '
+ subprocess.run(
f'ls -l /proc/{pid}/fd | wc -l',
shell=True,
check=False,
capture_output=True,
).stdout.decode()
+ '\n'
)
out += (
'lsof output:\n'
+ subprocess.run(
f'lsof -p {pid}',
shell=True,
check=False,
capture_output=True,
).stdout.decode()
+ '\n'
)
logging.warning(out)
Thread(target=_do_log, daemon=True).start()
# import io
# from efro.debug import printtypes
# with io.StringIO() as fstr:
# fstr.write('Too many FDs.\n')
# printtypes(file=fstr)
# fstr.seek(0)
# logging.warning(fstr.read())
# import socket
# objs: list[Any] = []
# for obj in gc.get_objects():
# if isinstance(obj, socket.socket):
# objs.append(obj)
# test = open('/Users/ericf/.zshrc', 'r', encoding='utf-8')
# reveal_type(test)
# print('FOUND', len(objs))

View file

@ -47,7 +47,7 @@ def bootstrap() -> None:
# Give a soft warning if we're being used with a different binary
# version than we expect.
expected_build = 20921
expected_build = 20982
running_build: int = env['build_number']
if running_build != expected_build:
print(
@ -120,7 +120,8 @@ def bootstrap() -> None:
import __main__
# Clear out the standard quit/exit messages since they don't
# work for us.
# work in our embedded situation (should revisit this once we're
# usable from a standard interpreter).
del __main__.__builtins__.quit
del __main__.__builtins__.exit

View file

@ -4,6 +4,7 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, overload
import _ba
@ -14,6 +15,8 @@ if TYPE_CHECKING:
from efro.message import Message, Response
import bacommon.cloud
DEBUG_LOG = False
# TODO: Should make it possible to define a protocol in bacommon.cloud and
# autogenerate this. That would give us type safety between this and
# internal protocols.
@ -30,6 +33,21 @@ class CloudSubsystem:
"""
return False # Needs to be overridden
def on_app_pause(self) -> None:
"""Should be called when the app pauses."""
def on_app_resume(self) -> None:
"""Should be called when the app resumes."""
def on_connectivity_changed(self, connected: bool) -> None:
"""Called when cloud connectivity state changes."""
if DEBUG_LOG:
logging.debug('CloudSubsystem: Connectivity is now %s.', connected)
# Inform things that use this.
# (TODO: should generalize this into some sort of registration system)
_ba.app.accounts_v2.on_cloud_connectivity_changed(connected)
@overload
def send_message_cb(
self,
@ -66,6 +84,26 @@ class CloudSubsystem:
) -> None:
...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.SignInMessage,
on_response: Callable[
[bacommon.cloud.SignInResponse | Exception], None
],
) -> None:
...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.ManageAccountMessage,
on_response: Callable[
[bacommon.cloud.ManageAccountResponse | Exception], None
],
) -> None:
...
def send_message_cb(
self,
msg: Message,
@ -93,6 +131,12 @@ class CloudSubsystem:
) -> bacommon.cloud.WorkspaceFetchResponse:
...
@overload
def send_message(
self, msg: bacommon.cloud.MerchAvailabilityMessage
) -> bacommon.cloud.MerchAvailabilityResponse:
...
@overload
def send_message(
self, msg: bacommon.cloud.TestMessage
@ -110,7 +154,6 @@ class CloudSubsystem:
def cloud_console_exec(code: str) -> None:
"""Called by the cloud console to run code in the logic thread."""
import sys
import logging
import __main__
from ba._generated.enums import TimeType

View file

@ -11,7 +11,7 @@ from ba._gameactivity import GameActivity
from ba._general import WeakCall
if TYPE_CHECKING:
from typing import Any, Sequence
from typing import Sequence
from bastd.actor.playerspaz import PlayerSpaz
import ba
@ -56,56 +56,6 @@ class CoopGameActivity(GameActivity[PlayerType, TeamType]):
# Preload achievement images in case we get some.
_ba.timer(2.0, WeakCall(self._preload_achievements))
def _show_standard_scores_to_beat_ui(
self, scores: list[dict[str, Any]]
) -> None:
from efro.util import asserttype
from ba._gameutils import timestring, animate
from ba._nodeactor import NodeActor
from ba._generated.enums import TimeFormat
display_type = self.get_score_type()
if scores is not None:
# Sort by originating date so that the most recent is first.
scores.sort(reverse=True, key=lambda s: asserttype(s['time'], int))
# Now make a display for the most recent challenge.
for score in scores:
if score['type'] == 'score_challenge':
tval = (
score['player']
+ ': '
+ timestring(
int(score['value']) * 10,
timeformat=TimeFormat.MILLISECONDS,
).evaluate()
if display_type == 'time'
else str(score['value'])
)
hattach = 'center' if display_type == 'time' else 'left'
halign = 'center' if display_type == 'time' else 'left'
pos = (20, -70) if display_type == 'time' else (20, -130)
txt = NodeActor(
_ba.newnode(
'text',
attrs={
'v_attach': 'top',
'h_attach': hattach,
'h_align': halign,
'color': (0.7, 0.4, 1, 1),
'shadow': 0.5,
'flatness': 1.0,
'position': pos,
'scale': 0.6,
'text': tval,
},
)
).autoretain()
assert txt.node is not None
animate(txt.node, 'scale', {1.0: 0.0, 1.1: 0.7, 1.2: 0.6})
break
# FIXME: this is now redundant with activityutils.getscoreconfig();
# need to kill this.
def get_score_type(self) -> str:

View file

@ -278,7 +278,7 @@ def show_damage_count(
def timestring(
timeval: float,
timeval: float | int,
centi: bool = True,
timeformat: ba.TimeFormat = TimeFormat.SECONDS,
suppress_format_warning: bool = False,

View file

@ -247,6 +247,14 @@ def google_play_purchases_not_available_message() -> None:
)
def google_play_services_not_available_message() -> None:
from ba._language import Lstr
_ba.screenmessage(
Lstr(resource='googlePlayServicesNotAvailableText'), color=(1, 0, 0)
)
def empty_call() -> None:
pass
@ -449,3 +457,41 @@ def hash_strings(inputs: list[str]) -> str:
def have_account_v2_credentials() -> bool:
"""Do we have primary account-v2 credentials set?"""
return _ba.app.accounts_v2.have_primary_credentials()
def implicit_sign_in(
login_type_str: str, login_id: str, display_name: str
) -> None:
"""An implicit login happened."""
from bacommon.login import LoginType
_ba.app.accounts_v2.on_implicit_sign_in(
login_type=LoginType(login_type_str),
login_id=login_id,
display_name=display_name,
)
def implicit_sign_out(login_type_str: str) -> None:
"""An implicit logout happened."""
from bacommon.login import LoginType
_ba.app.accounts_v2.on_implicit_sign_out(
login_type=LoginType(login_type_str)
)
def login_adapter_get_sign_in_token_response(
login_type_str: str, attempt_id_str: str, result_str: str
) -> None:
"""Login adapter do-sign-in completed."""
from bacommon.login import LoginType
from ba._login import LoginAdapterNative
login_type = LoginType(login_type_str)
attempt_id = int(attempt_id_str)
result = None if result_str == '' else result_str
with _ba.Context('ui'):
adapter = _ba.app.accounts_v2.login_adapters[login_type]
assert isinstance(adapter, LoginAdapterNative)
adapter.on_sign_in_complete(attempt_id=attempt_id, result=result)

View file

@ -104,7 +104,6 @@ def submit_score(
name: Any,
score: int | None,
callback: Callable,
friend_callback: Callable | None,
order: str = 'increasing',
tournament_id: str | None = None,
score_type: str = 'points',
@ -125,7 +124,6 @@ def submit_score(
name=name,
score=score,
callback=callback,
friend_callback=friend_callback,
order=order,
tournament_id=tournament_id,
score_type=score_type,
@ -321,7 +319,9 @@ def get_v1_account_state() -> str:
"""(internal)"""
if HAVE_INTERNAL:
return _bainternal.get_v1_account_state()
raise _no_bainternal_error()
# Without internal present just consider ourself always signed out.
return 'signed_out'
def get_v1_account_display_string(full: bool = True) -> str:

View file

@ -67,38 +67,39 @@ class LanguageSubsystem:
def _get_default_language(self) -> str:
languages = {
'ar': 'Arabic',
'be': 'Belarussian',
'zh': 'Chinese',
'hr': 'Croatian',
'cs': 'Czech',
'da': 'Danish',
'nl': 'Dutch',
'eo': 'Esperanto',
'fil': 'Filipino',
'fr': 'French',
'de': 'German',
'el': 'Greek',
'hi': 'Hindi',
'hu': 'Hungarian',
'id': 'Indonesian',
'it': 'Italian',
'ko': 'Korean',
'ms': 'Malay',
'fa': 'Persian',
'pl': 'Polish',
'pt': 'Portuguese',
'ro': 'Romanian',
'ru': 'Russian',
'sr': 'Serbian',
'es': 'Spanish',
'sk': 'Slovak',
'it': 'Italian',
'nl': 'Dutch',
'da': 'Danish',
'pt': 'Portuguese',
'fr': 'French',
'el': 'Greek',
'ru': 'Russian',
'pl': 'Polish',
'sv': 'Swedish',
'eo': 'Esperanto',
'cs': 'Czech',
'hr': 'Croatian',
'hu': 'Hungarian',
'be': 'Belarussian',
'ro': 'Romanian',
'ko': 'Korean',
'fa': 'Persian',
'ar': 'Arabic',
'zh': 'Chinese',
'tr': 'Turkish',
'th': 'Thai',
'id': 'Indonesian',
'sr': 'Serbian',
'uk': 'Ukrainian',
'vi': 'Vietnamese',
'vec': 'Venetian',
'hi': 'Hindi',
'ta': 'Tamil',
'fil': 'Filipino',
'th': 'Thai',
'tr': 'Turkish',
'uk': 'Ukrainian',
'vec': 'Venetian',
'vi': 'Vietnamese',
}
# Special case for Chinese: map specific variations to traditional.

330
dist/ba_data/python/ba/_login.py vendored Normal file
View file

@ -0,0 +1,330 @@
# Released under the MIT License. See LICENSE for details.
#
"""Login related functionality."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, final
from bacommon.login import LoginType
import _ba
if TYPE_CHECKING:
from typing import Callable
DEBUG_LOG = False
class LoginAdapter:
"""Allows using implicit login types in an explicit way.
Some login types such as Google Play Game Services or Game Center are
basically always present and often do not provide a way to log out
from within a running app, so this adapter exists to use them in a
flexible manner by 'attaching' and 'detaching' from an always-present
login, allowing for its use alongside other login types. It also
provides common functionality for server-side account verification and
other handy bits.
"""
@dataclass
class SignInResult:
"""Describes the final result of a sign-in attempt."""
credentials: str
@dataclass
class ImplicitLoginState:
"""Describes the current state of an implicit login."""
login_id: str
display_name: str
def __init__(self, login_type: LoginType):
assert _ba.in_logic_thread()
self.login_type = login_type
self._implicit_login_state: LoginAdapter.ImplicitLoginState | None = (
None
)
self._on_app_launch_called = False
self._implicit_login_state_dirty = False
self._back_end_active = False
# Which login of our type (if any) is associated with the
# current active primary account.
self._active_login_id: str | None = None
def on_app_launch(self) -> None:
"""Should be called for each adapter in on_app_launch."""
assert not self._on_app_launch_called
self._on_app_launch_called = True
# Any implicit state we received up until now needs to be pushed
# to the app account subsystem.
self._update_implicit_login_state()
def set_implicit_login_state(
self, state: ImplicitLoginState | None
) -> None:
"""Keep the adapter informed of implicit login states.
This should be called by the adapter back-end when an account
of their associated type gets logged in or out.
"""
assert _ba.in_logic_thread()
# Ignore redundant sets.
if state == self._implicit_login_state:
return
if DEBUG_LOG:
if state is None:
logging.debug(
'LoginAdapter: %s implicit state changed;'
' now signed out.',
self.login_type.name,
)
else:
logging.debug(
'LoginAdapter: %s implicit state changed;'
' now signed in as %s.',
self.login_type.name,
state.display_name,
)
self._implicit_login_state = state
self._implicit_login_state_dirty = True
# (possibly) push it to the app for handling.
self._update_implicit_login_state()
# This might affect whether we consider that back-end as 'active'.
self._update_back_end_active()
def set_active_logins(self, logins: dict[LoginType, str]) -> None:
"""Keep the adapter informed of actively used logins.
This should be called by the app's account subsystem to
keep adapters up to date on the full set of logins attached
to the currently-in-use account.
Note that the logins dict passed in should be immutable as
only a reference to it is stored, not a copy.
"""
assert _ba.in_logic_thread()
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter got active logins %s.',
self.login_type.name,
{k: v[:4] + '...' + v[-4:] for k, v in logins.items()},
)
self._active_login_id = logins.get(self.login_type)
self._update_back_end_active()
def on_back_end_active_change(self, active: bool) -> None:
"""Called when active state for the back-end is (possibly) changing.
Meant to be overridden by subclasses.
Being active means that the implicit login provided by the back-end
is actually being used by the app. It should therefore register
unlocked achievements, leaderboard scores, allow viewing native
UIs, etc. When not active it should ignore everything and behave
as if logged out, even if it technically is still logged in.
"""
assert _ba.in_logic_thread()
del active # Unused.
@final
def sign_in(
self,
result_cb: Callable[[LoginAdapter, SignInResult | Exception], None],
) -> None:
"""Attempt an explicit sign in via this adapter.
This can be called even if the back-end is not implicitly signed in;
the adapter will attempt to sign in if possible. An exception will
be returned if the sign-in attempt fails.
"""
assert _ba.in_logic_thread()
from ba._general import Call
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter sign_in() called;'
' fetching sign-in-token...',
self.login_type.name,
)
def _got_sign_in_token_result(result: str | None) -> None:
import bacommon.cloud
# Failed to get a sign-in-token.
if result is None:
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter sign-in-token fetch failed;'
' aborting sign-in.',
self.login_type.name,
)
_ba.pushcall(
Call(
result_cb,
self,
RuntimeError('fetch-sign-in-token failed.'),
)
)
return
# Got a sign-in token! Now pass it to the cloud which will use
# it to verify our identity and give us app credentials on
# success.
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter sign-in-token fetch succeeded;'
' passing to cloud for verification...',
self.login_type.name,
)
def _got_sign_in_response(
response: bacommon.cloud.SignInResponse | Exception,
) -> None:
if isinstance(response, Exception):
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter got error'
' sign-in response: %s',
self.login_type.name,
response,
)
_ba.pushcall(Call(result_cb, self, response))
else:
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter got successful'
' sign-in response',
self.login_type.name,
)
if response.credentials is None:
result2: LoginAdapter.SignInResult | Exception = (
RuntimeError(
'No credentials returned after'
' submitting sign-in-token.'
)
)
else:
result2 = self.SignInResult(
credentials=response.credentials
)
_ba.pushcall(Call(result_cb, self, result2))
_ba.app.cloud.send_message_cb(
bacommon.cloud.SignInMessage(self.login_type, result),
on_response=_got_sign_in_response,
)
# Kick off the process by fetching a sign-in token.
self.get_sign_in_token(completion_cb=_got_sign_in_token_result)
def is_back_end_active(self) -> bool:
"""Is this adapter's back-end currently active?"""
return self._back_end_active
def get_sign_in_token(
self, completion_cb: Callable[[str | None], None]
) -> None:
"""Get a sign-in token from the adapter back end.
This token is then passed to the master-server to complete the
login process.
The adapter can use this opportunity to bring up account creation
UI, call its internal sign_in function, etc. as needed.
The provided completion_cb should then be called with either a token
or None if sign in failed or was cancelled.
"""
from ba._general import Call
# Default implementation simply fails immediately.
_ba.pushcall(Call(completion_cb, None))
def _update_implicit_login_state(self) -> None:
# If we've received an implicit login state, schedule it to be
# sent along to the app. We wait until on-app-launch has been
# called so that account-client-v2 has had a chance to load
# any existing state so it can properly respond to this.
if self._implicit_login_state_dirty and self._on_app_launch_called:
from ba._general import Call
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter sending'
' implicit-state-changed to app.',
self.login_type.name,
)
_ba.pushcall(
Call(
_ba.app.accounts_v2.on_implicit_login_state_changed,
self.login_type,
self._implicit_login_state,
)
)
self._implicit_login_state_dirty = False
def _update_back_end_active(self) -> None:
was_active = self._back_end_active
if self._implicit_login_state is None:
is_active = False
else:
is_active = (
self._implicit_login_state.login_id == self._active_login_id
)
if was_active != is_active:
if DEBUG_LOG:
logging.debug(
'LoginAdapter: %s adapter back-end-active is now %s.',
self.login_type.name,
is_active,
)
self.on_back_end_active_change(is_active)
self._back_end_active = is_active
class LoginAdapterNative(LoginAdapter):
"""A login adapter that does its work in the native layer."""
def __init__(self) -> None:
super().__init__(LoginType.GPGS)
# Store int ids for in-flight attempts since they may go through
# various platform layers and back.
self._sign_in_attempt_num = 123
self._sign_in_attempts: dict[int, Callable[[str | None], None]] = {}
def get_sign_in_token(
self, completion_cb: Callable[[str | None], None]
) -> None:
attempt_id = self._sign_in_attempt_num
self._sign_in_attempts[attempt_id] = completion_cb
self._sign_in_attempt_num += 1
_ba.login_adapter_get_sign_in_token(self.login_type.value, attempt_id)
def on_back_end_active_change(self, active: bool) -> None:
_ba.login_adapter_back_end_active_change(self.login_type.value, active)
def on_sign_in_complete(self, attempt_id: int, result: str | None) -> None:
"""Called by the native layer on a completed attempt."""
assert _ba.in_logic_thread()
if attempt_id not in self._sign_in_attempts:
logging.exception('sign-in attempt_id %d not found', attempt_id)
return
callback = self._sign_in_attempts.pop(attempt_id)
callback(result)
class LoginAdapterGPGS(LoginAdapterNative):
"""Google Play Game Services adapter."""

View file

@ -4,6 +4,7 @@
from __future__ import annotations
import _ba
import copy
import logging
from typing import Any, TYPE_CHECKING
@ -39,6 +40,7 @@ def filter_playlist(
goodlist: list[dict] = []
unowned_maps: Sequence[str]
available_maps: list[str] = list(_ba.app.maps.keys())
if remove_unowned or mark_unowned:
unowned_maps = get_unowned_maps()
unowned_game_types = get_unowned_game_types()
@ -157,6 +159,9 @@ def filter_playlist(
gameclass = getclass(entry['type'], GameActivity)
if entry['settings']['map'] not in available_maps:
raise ImportError(f"Map not found: '{entry['settings']['map']}'")
if remove_unowned and gameclass in unowned_game_types:
continue
if add_resolved_type:

View file

@ -173,7 +173,7 @@ class PluginSubsystem:
color=(1, 1, 0),
)
plugnames = ', '.join(disappeared_plugs)
logging.warning(
logging.info(
'%d plugin(s) no longer found: %s.',
len(disappeared_plugs),
plugnames,

View file

@ -418,6 +418,7 @@ class ServerController:
# Call set-enabled last (will push state to the cloud).
_ba.set_public_party_max_size(self._config.max_party_size)
_ba.set_public_party_queue_enabled(self._config.enable_queue)
_ba.set_public_party_name(self._config.party_name)
_ba.set_public_party_stats_url(self._config.stats_url)
_ba.set_public_party_enabled(self._config.party_is_public)

View file

@ -30,6 +30,8 @@ def get_store_item_name_translated(item_name: str) -> ba.Lstr:
return _language.Lstr(
translate=('characterNames', item_info['character'])
)
if item_name in ['merch']:
return _language.Lstr(resource='merchText')
if item_name in ['upgrades.pro', 'pro']:
return _language.Lstr(
resource='store.bombSquadProNameText',
@ -50,8 +52,17 @@ def get_store_item_display_size(item_name: str) -> tuple[float, float]:
"""(internal)"""
if item_name.startswith('characters.'):
return 340 * 0.6, 430 * 0.6
if item_name in ['pro', 'upgrades.pro']:
return 650 * 0.9, 500 * 0.85
if item_name in ['pro', 'upgrades.pro', 'merch']:
from ba._generated.enums import UIScale
return 650 * 0.9, 500 * (
0.72
if (
_ba.app.config.get('Merch Link')
and _ba.app.ui.uiscale is UIScale.SMALL
)
else 0.85
)
if item_name.startswith('maps.'):
return 510 * 0.6, 450 * 0.6
if item_name.startswith('icons.'):
@ -96,6 +107,7 @@ def get_store_items() -> dict[str, dict]:
'characters.taobaomascot': {'character': 'Taobao Mascot'},
'characters.santa': {'character': 'Santa Claus'},
'characters.bunny': {'character': 'Easter Bunny'},
'merch': {},
'pro': {},
'maps.lake_frigid': {'map_type': maps.LakeFrigid},
'games.ninja_fight': {
@ -193,9 +205,7 @@ def get_store_items() -> dict[str, dict]:
'icons.fireball': {'icon': _ba.charstr(SpecialChar.FIREBALL)},
'icons.mikirog': {'icon': _ba.charstr(SpecialChar.MIKIROG)},
}
store_items = _ba.app.store_items
assert store_items is not None
return store_items
return _ba.app.store_items
def get_store_layout() -> dict[str, list[dict[str, Any]]]:
@ -261,7 +271,6 @@ def get_store_layout() -> dict[str, list[dict[str, Any]]]:
],
}
store_layout = _ba.app.store_layout
assert store_layout is not None
store_layout['characters'] = [
{
'items': [
@ -302,6 +311,12 @@ def get_store_layout() -> dict[str, list[dict[str, Any]]]:
'items': ['games.easter_egg_hunt'],
}
)
# This will cause merch to show only if the master-server has
# given us a link (which means merch is available in our region).
store_layout['extras'] = [{'items': ['pro']}]
if _ba.app.config.get('Merch Link'):
store_layout['extras'][0]['items'].append('merch')
return store_layout

View file

@ -40,6 +40,7 @@ from _ba import (
get_public_party_max_size,
set_public_party_name,
set_public_party_max_size,
set_public_party_queue_enabled,
set_authenticate_clients,
set_public_party_enabled,
reset_random_player_names,
@ -80,6 +81,7 @@ from _ba import (
get_replays_dir,
)
from ba._login import LoginAdapter
from ba._map import (
get_map_class,
register_map,
@ -99,6 +101,8 @@ from ba._apputils import (
is_browser_likely_available,
get_remote_app_name,
should_submit_debug_info,
dump_app_state,
log_dumped_app_state,
)
from ba._benchmark import (
run_gpu_benchmark,
@ -178,6 +182,7 @@ from ba._internal import (
)
__all__ = [
'LoginAdapter',
'show_online_score_ui',
'set_ui_input_device',
'is_party_icon_visible',
@ -209,6 +214,7 @@ __all__ = [
'get_public_party_max_size',
'set_public_party_name',
'set_public_party_max_size',
'set_public_party_queue_enabled',
'set_authenticate_clients',
'set_public_party_enabled',
'reset_random_player_names',
@ -247,7 +253,6 @@ __all__ = [
'set_telnet_access_enabled',
'new_replay_session',
'get_replays_dir',
# DIVIDER
'get_unowned_maps',
'get_unowned_game_types',
'get_map_class',
@ -329,4 +334,6 @@ __all__ = [
'sign_out_v1',
'sign_in_v1',
'mark_config_dirty',
'dump_app_state',
'log_dumped_app_state',
]

View file

@ -87,7 +87,7 @@ def show_user_scripts() -> None:
' See settings/advanced'
' in the game for more info.'
)
_ba.android_media_scan_file(file_name)
except Exception:
from ba import _error

View file

@ -10,6 +10,7 @@ from enum import Enum
from efro.message import Message, Response
from efro.dataclassio import ioprepped, IOAttrs
from bacommon.transfer import DirectoryManifest
from bacommon.login import LoginType
if TYPE_CHECKING:
pass
@ -154,3 +155,60 @@ class WorkspaceFetchResponse(Response):
] = field(default_factory=dict)
done: Annotated[bool, IOAttrs('d')] = False
@ioprepped
@dataclass
class MerchAvailabilityMessage(Message):
"""Can we show merch link?"""
@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
return [MerchAvailabilityResponse]
@ioprepped
@dataclass
class MerchAvailabilityResponse(Response):
"""About that merch..."""
url: Annotated[str | None, IOAttrs('u')]
@ioprepped
@dataclass
class SignInMessage(Message):
"""Can I sign in please?"""
login_type: Annotated[LoginType, IOAttrs('l')]
sign_in_token: Annotated[str, IOAttrs('t')]
@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
return [SignInResponse]
@ioprepped
@dataclass
class SignInResponse(Response):
"""Here's that sign-in result you asked for, boss."""
credentials: Annotated[str | None, IOAttrs('c')]
@ioprepped
@dataclass
class ManageAccountMessage(Message):
"""Message asking for a manage-account url."""
@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
return [ManageAccountResponse]
@ioprepped
@dataclass
class ManageAccountResponse(Response):
"""Here's that sign-in result you asked for, boss."""
url: Annotated[str | None, IOAttrs('u')]

31
dist/ba_data/python/bacommon/login.py vendored Normal file
View file

@ -0,0 +1,31 @@
# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to cloud based assets."""
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING
if TYPE_CHECKING:
pass
class LoginType(Enum):
"""Types of logins available."""
# Email/password
EMAIL = 'email'
# Google Play Game Services
GPGS = 'gpgs'
@property
def displayname(self) -> str:
"""Human readable name for this value."""
cls = type(self)
match self:
case cls.EMAIL:
return 'Email/Password'
case cls.GPGS:
return 'Google Play Games'

View file

@ -138,6 +138,11 @@ class ServerConfig:
tuple[float, float, float], tuple[float, float, float]
] | None = None
# Whether to enable the queue where players can line up before entering
# your server. Disabling this can be used as a workaround to deal with
# queue spamming attacks.
enable_queue: bool = True
# (internal) stress-testing mode.
stress_test_players: int | None = None

View file

@ -116,7 +116,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
self._newly_complete: bool | None = None
self._is_more_levels: bool | None = None
self._next_level_name: str | None = None
self._show_friend_scores: bool | None = None
self._show_info: dict[str, Any] | None = None
self._name_str: str | None = None
self._friends_loading_status: ba.Actor | None = None
@ -177,12 +176,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
.replace(' ', '_')
)
# If game-center/etc scores are available we show our friends'
# scores. Otherwise we show our local high scores.
self._show_friend_scores = ba.internal.game_service_has_leaderboard(
self._game_name_str, self._game_config_str
)
try:
self._old_best_rank = self._campaign.getlevel(
self._level_name
@ -366,21 +359,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
ba.internal.set_ui_input_device(None) # Menu is up for grabs.
if self._show_friend_scores:
ba.buttonwidget(
parent=rootc,
color=(0.45, 0.4, 0.5),
position=(h_offs - 520, v_offs + 480),
size=(300, 60),
label=ba.Lstr(resource='topFriendsText'),
on_activate_call=ba.WeakCall(self._ui_gc),
transition_delay=delay + 0.5,
icon=self._game_service_leaderboards_texture,
icon_color=self._game_service_icon_color,
autoselect=True,
selectable=can_select_extra_buttons,
)
if self._have_achievements and self._account_has_achievements:
ba.buttonwidget(
parent=rootc,
@ -773,18 +751,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
[p.name for p in self._playerinfos]
)
if self._show_friend_scores:
self._friends_loading_status = Text(
ba.Lstr(
value='${A}...',
subs=[('${A}', ba.Lstr(resource='loadingText'))],
),
position=(-405, 150 + 30),
color=(1, 1, 1, 0.4),
transition=Text.Transition.FADE_IN,
scale=0.7,
transition_delay=2.0,
)
self._score_loading_status = Text(
ba.Lstr(
value='${A}...',
@ -850,8 +816,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
# We expect this only in kiosk mode; complain otherwise.
if not (ba.app.demo_mode or ba.app.arcade_mode):
print('got not-signed-in at score-submit; unexpected')
if self._show_friend_scores:
ba.pushcall(ba.WeakCall(self._got_friend_score_results, None))
ba.pushcall(ba.WeakCall(self._got_score_results, None))
else:
assert self._game_name_str is not None
@ -862,9 +826,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
name_str,
self._score,
ba.WeakCall(self._got_score_results),
ba.WeakCall(self._got_friend_score_results)
if self._show_friend_scores
else None,
order=self._score_order,
tournament_id=self.session.tournament_id,
score_type=self._score_type,
@ -899,26 +860,6 @@ class CoopScoreScreen(ba.Activity[ba.Player, ba.Team]):
assert txt.node
txt.node.client_only = True
# If we have no friend scores, display local best scores.
if self._show_friend_scores:
# Host has a button, so we need client-only text.
ts_height = 300
ts_h_offs = -480
v_offs = 40
txt = Text(
ba.Lstr(resource='topFriendsText'),
maxwidth=210,
position=(ts_h_offs - 10, ts_height / 2 + 25 + v_offs + 20),
transition=Text.Transition.IN_RIGHT,
v_align=Text.VAlign.CENTER,
scale=1.2,
transition_delay=1.8,
).autoretain()
assert txt.node
txt.node.client_only = True
else:
ts_height = 300
ts_h_offs = -480
v_offs = 40

View file

@ -616,9 +616,6 @@ class FootballCoopGame(ba.CoopGameActivity[Player, Team]):
for bottype in self._bot_types_initial:
self._spawn_bot(bottype)
def _on_got_scores_to_beat(self, scores: list[dict[str, Any]]) -> None:
self._show_standard_scores_to_beat_ui(scores)
def _on_bot_spawn(self, spaz: SpazBot) -> None:
# We want to move to the left by default.
spaz.target_point_default = ba.Vec3(0, 0, 0)

View file

@ -682,9 +682,6 @@ class OnslaughtGame(ba.CoopGameActivity[Player, Team]):
self._bots = SpazBotSet()
ba.timer(4.0, self._start_updating_waves)
def _on_got_scores_to_beat(self, scores: list[dict[str, Any]]) -> None:
self._show_standard_scores_to_beat_ui(scores)
def _get_dist_grp_totals(self, grps: list[Any]) -> tuple[int, int]:
totalpts = 0
totaldudes = 0

View file

@ -684,9 +684,6 @@ class RunaroundGame(ba.CoopGameActivity[Player, Team]):
},
)
def _on_got_scores_to_beat(self, scores: list[dict[str, Any]]) -> None:
self._show_standard_scores_to_beat_ui(scores)
def _update_waves(self) -> None:
# pylint: disable=too-many-branches

View file

@ -326,9 +326,6 @@ class TheLastStandGame(ba.CoopGameActivity[Player, Team]):
else:
super().handlemessage(msg)
def _on_got_scores_to_beat(self, scores: list[dict[str, Any]]) -> None:
self._show_standard_scores_to_beat_ui(scores)
def end_game(self) -> None:
# Tell our bots to celebrate just to rub it in.
self._bots.final_celebrate()

View file

@ -6,13 +6,21 @@
from __future__ import annotations
import time
import logging
from typing import TYPE_CHECKING
import bacommon.cloud
from bacommon.login import LoginType
import ba
import ba.internal
if TYPE_CHECKING:
pass
from ba.internal import LoginAdapter
# These days we're directing people to the web based account settings
# for V2 account linking and trying to get them to disconnect remaining
# V1 links, but leaving this escape hatch here in case needed.
FORCE_ENABLE_V1_LINKING = False
class AccountSettingsWindow(ba.Window):
@ -27,12 +35,17 @@ class AccountSettingsWindow(ba.Window):
):
# pylint: disable=too-many-statements
self._sign_in_v2_button: ba.Widget | None = None
self._sign_in_v2_proxy_button: ba.Widget | None = None
self._sign_in_device_button: ba.Widget | None = None
self._show_legacy_unlink_button = False
self._signing_in_adapter: LoginAdapter | None = None
self._close_once_signed_in = close_once_signed_in
ba.set_analytics_screen('Account Window')
self._explicitly_signed_out_of_gpgs = False
# If they provided an origin-widget, scale up from that.
scale_origin: tuple[float, float] | None
if origin_widget is not None:
@ -46,14 +59,8 @@ class AccountSettingsWindow(ba.Window):
self._r = 'accountSettingsWindow'
self._modal = modal
self._needs_refresh = False
self._signed_in = ba.internal.get_v1_account_state() == 'signed_in'
self._account_state_num = ba.internal.get_v1_account_state_num()
self._show_linked = (
self._signed_in
and ba.internal.get_v1_account_misc_read_val(
'allowAccountLinking2', False
)
)
self._v1_signed_in = ba.internal.get_v1_account_state() == 'signed_in'
self._v1_account_state_num = ba.internal.get_v1_account_state_num()
self._check_sign_in_timer = ba.Timer(
1.0,
ba.WeakCall(self._update),
@ -62,12 +69,12 @@ class AccountSettingsWindow(ba.Window):
)
# Currently we can only reset achievements on game-center.
account_type: str | None
if self._signed_in:
account_type = ba.internal.get_v1_account_type()
v1_account_type: str | None
if self._v1_signed_in:
v1_account_type = ba.internal.get_v1_account_type()
else:
account_type = None
self._can_reset_achievements = account_type == 'Game Center'
v1_account_type = None
self._can_reset_achievements = v1_account_type == 'Game Center'
app = ba.app
uiscale = app.ui.uiscale
@ -92,16 +99,15 @@ class AccountSettingsWindow(ba.Window):
# Determine which sign-in/sign-out buttons we should show.
self._show_sign_in_buttons: list[str] = []
if app.platform == 'android' and app.subplatform == 'google':
if LoginType.GPGS in ba.app.accounts_v2.login_adapters:
self._show_sign_in_buttons.append('Google Play')
# Local accounts are generally always available with a few key
# exceptions.
self._show_sign_in_buttons.append('Local')
# Always want to show our web-based v2 login option.
self._show_sign_in_buttons.append('V2Proxy')
# Ditto with shiny new V2 ones.
if bool(True):
self._show_sign_in_buttons.append('V2')
# Legacy v1 device accounts are currently always available
# (though we need to start phasing them out at some point).
self._show_sign_in_buttons.append('Device')
top_extra = 15 if uiscale is ba.UIScale.SMALL else 0
super().__init__(
@ -180,31 +186,25 @@ class AccountSettingsWindow(ba.Window):
def _update(self) -> None:
# If they want us to close once we're signed in, do so.
if self._close_once_signed_in and self._signed_in:
if self._close_once_signed_in and self._v1_signed_in:
self._back()
return
# Hmm should update this to use get_account_state_num.
# Theoretically if we switch from one signed-in account to another
# in the background this would break.
account_state_num = ba.internal.get_v1_account_state_num()
account_state = ba.internal.get_v1_account_state()
show_linked = (
self._signed_in
and ba.internal.get_v1_account_misc_read_val(
'allowAccountLinking2', False
)
)
v1_account_state_num = ba.internal.get_v1_account_state_num()
v1_account_state = ba.internal.get_v1_account_state()
show_legacy_unlink_button = self._should_show_legacy_unlink_button()
if (
account_state_num != self._account_state_num
or self._show_linked != show_linked
v1_account_state_num != self._v1_account_state_num
or show_legacy_unlink_button != self._show_legacy_unlink_button
or self._needs_refresh
):
self._show_linked = show_linked
self._account_state_num = account_state_num
self._signed_in = account_state == 'signed_in'
self._v1_account_state_num = v1_account_state_num
self._v1_signed_in = v1_account_state == 'signed_in'
self._show_legacy_unlink_button = show_legacy_unlink_button
self._refresh()
# Go ahead and refresh some individual things
@ -226,128 +226,138 @@ class AccountSettingsWindow(ba.Window):
# pylint: disable=cyclic-import
from bastd.ui import confirm
account_state = ba.internal.get_v1_account_state()
account_type = (
primary_v2_account = ba.app.accounts_v2.primary
v1_state = ba.internal.get_v1_account_state()
v1_account_type = (
ba.internal.get_v1_account_type()
if account_state == 'signed_in'
if v1_state == 'signed_in'
else 'unknown'
)
is_google = account_type == 'Google Play'
# We expose GPGS-specific functionality only if it is 'active'
# (meaning the current GPGS player matches one of our account's
# logins).
gpgs_adapter = ba.app.accounts_v2.login_adapters.get(LoginType.GPGS)
is_gpgs = (
False if gpgs_adapter is None else gpgs_adapter.is_back_end_active()
)
show_local_signed_in_as = False
local_signed_in_as_space = 50.0
show_signed_in_as = self._signed_in
show_signed_in_as = self._v1_signed_in
signed_in_as_space = 95.0
show_sign_in_benefits = not self._signed_in
show_sign_in_benefits = not self._v1_signed_in
sign_in_benefits_space = 80.0
show_signing_in_text = account_state == 'signing_in'
show_signing_in_text = (
v1_state == 'signing_in' or self._signing_in_adapter is not None
)
signing_in_text_space = 80.0
show_google_play_sign_in_button = (
account_state == 'signed_out'
v1_state == 'signed_out'
and self._signing_in_adapter is None
and 'Google Play' in self._show_sign_in_buttons
)
show_device_sign_in_button = (
account_state == 'signed_out'
and 'Local' in self._show_sign_in_buttons
show_v2_proxy_sign_in_button = (
v1_state == 'signed_out'
and self._signing_in_adapter is None
and 'V2Proxy' in self._show_sign_in_buttons
)
show_v2_sign_in_button = (
account_state == 'signed_out' and 'V2' in self._show_sign_in_buttons
show_device_sign_in_button = (
v1_state == 'signed_out'
and self._signing_in_adapter is None
and 'Device' in self._show_sign_in_buttons
)
sign_in_button_space = 70.0
deprecated_space = 60
show_game_service_button = self._signed_in and account_type in [
show_game_service_button = self._v1_signed_in and v1_account_type in [
'Game Center'
]
game_service_button_space = 60.0
show_linked_accounts_text = (
self._signed_in
and ba.internal.get_v1_account_misc_read_val(
'allowAccountLinking2', False
)
)
show_what_is_v2 = self._v1_signed_in and v1_account_type == 'V2'
show_linked_accounts_text = self._v1_signed_in
linked_accounts_text_space = 60.0
show_achievements_button = self._signed_in and account_type in (
show_achievements_button = self._v1_signed_in and v1_account_type in (
'Google Play',
'Alibaba',
'Local',
'OUYA',
'V2',
)
achievements_button_space = 60.0
show_achievements_text = (
self._signed_in and not show_achievements_button
self._v1_signed_in and not show_achievements_button
)
achievements_text_space = 27.0
show_leaderboards_button = self._signed_in and is_google
show_leaderboards_button = self._v1_signed_in and is_gpgs
leaderboards_button_space = 60.0
show_campaign_progress = self._signed_in
show_campaign_progress = self._v1_signed_in
campaign_progress_space = 27.0
show_tickets = self._signed_in
show_tickets = self._v1_signed_in
tickets_space = 27.0
show_reset_progress_button = False
reset_progress_button_space = 70.0
show_manage_v2_account_button = (
self._signed_in and account_type == 'V2' and bool(False)
) # Disabled for now.
self._v1_signed_in and v1_account_type == 'V2'
)
manage_v2_account_button_space = 100.0
show_player_profiles_button = self._signed_in
show_player_profiles_button = self._v1_signed_in
player_profiles_button_space = (
70.0 if show_manage_v2_account_button else 100.0
)
show_link_accounts_button = (
self._signed_in
and ba.internal.get_v1_account_misc_read_val(
'allowAccountLinking2', False
)
show_link_accounts_button = self._v1_signed_in and (
primary_v2_account is None or FORCE_ENABLE_V1_LINKING
)
link_accounts_button_space = 70.0
show_unlink_accounts_button = show_link_accounts_button
unlink_accounts_button_space = 90.0
show_sign_out_button = self._signed_in and account_type in [
show_v2_link_info = self._v1_signed_in and not show_link_accounts_button
v2_link_info_space = 70.0
legacy_unlink_button_space = 120.0
show_sign_out_button = self._v1_signed_in and v1_account_type in [
'Local',
'Google Play',
'V2',
]
sign_out_button_space = 70.0
show_cancel_v2_sign_in_button = (
account_state == 'signing_in'
and ba.app.accounts_v2.have_primary_credentials()
# We can show cancel if we're either waiting on an adapter to
# provide us with v2 credentials or waiting for those credentials
# to be verified.
show_cancel_sign_in_button = self._signing_in_adapter is not None or (
ba.app.accounts_v2.have_primary_credentials()
and primary_v2_account is None
)
cancel_v2_sign_in_button_space = 70.0
cancel_sign_in_button_space = 70.0
if self._subcontainer is not None:
self._subcontainer.delete()
self._sub_height = 60.0
if show_local_signed_in_as:
self._sub_height += local_signed_in_as_space
if show_signed_in_as:
self._sub_height += signed_in_as_space
if show_signing_in_text:
self._sub_height += signing_in_text_space
if show_google_play_sign_in_button:
self._sub_height += sign_in_button_space
if show_v2_proxy_sign_in_button:
self._sub_height += sign_in_button_space
if show_device_sign_in_button:
self._sub_height += sign_in_button_space
if show_v2_sign_in_button:
self._sub_height += sign_in_button_space
self._sub_height += sign_in_button_space + deprecated_space
if show_game_service_button:
self._sub_height += game_service_button_space
if show_linked_accounts_text:
@ -374,10 +384,14 @@ class AccountSettingsWindow(ba.Window):
self._sub_height += link_accounts_button_space
if show_unlink_accounts_button:
self._sub_height += unlink_accounts_button_space
if show_v2_link_info:
self._sub_height += v2_link_info_space
if self._show_legacy_unlink_button:
self._sub_height += legacy_unlink_button_space
if show_sign_out_button:
self._sub_height += sign_out_button_space
if show_cancel_v2_sign_in_button:
self._sub_height += cancel_v2_sign_in_button_space
if show_cancel_sign_in_button:
self._sub_height += cancel_sign_in_button_space
self._subcontainer = ba.containerwidget(
parent=self._scrollwidget,
size=(self._sub_width, self._sub_height),
@ -390,27 +404,8 @@ class AccountSettingsWindow(ba.Window):
first_selectable = None
v = self._sub_height - 10.0
if show_local_signed_in_as:
v -= local_signed_in_as_space * 0.6
ba.textwidget(
parent=self._subcontainer,
position=(self._sub_width * 0.5, v),
size=(0, 0),
text=ba.Lstr(
resource='accountSettingsWindow.deviceSpecificAccountText',
subs=[
('${NAME}', ba.internal.get_v1_account_display_string())
],
),
scale=0.7,
color=(0.5, 0.5, 0.6),
maxwidth=self._sub_width * 0.9,
flatness=1.0,
h_align='center',
v_align='center',
)
v -= local_signed_in_as_space * 0.4
self._account_name_what_is_text: ba.Widget | None
self._account_name_what_is_y = 0.0
self._account_name_text: ba.Widget | None
if show_signed_in_as:
v -= signed_in_as_space * 0.2
@ -429,7 +424,7 @@ class AccountSettingsWindow(ba.Window):
h_align='center',
v_align='center',
)
v -= signed_in_as_space * 0.4
v -= signed_in_as_space * 0.5
self._account_name_text = ba.textwidget(
parent=self._subcontainer,
position=(self._sub_width * 0.5, v),
@ -441,10 +436,39 @@ class AccountSettingsWindow(ba.Window):
h_align='center',
v_align='center',
)
if show_what_is_v2:
self._account_name_what_is_y = v - 23.0
self._account_name_what_is_text = ba.textwidget(
parent=self._subcontainer,
position=(0.0, self._account_name_what_is_y),
size=(200.0, 60),
text=ba.Lstr(
value='${WHAT} -->',
subs=[('${WHAT}', ba.Lstr(resource='whatIsThisText'))],
),
scale=0.6,
color=(0.3, 0.7, 0.05),
maxwidth=200.0,
h_align='right',
v_align='center',
autoselect=True,
selectable=True,
on_activate_call=show_what_is_v2_page,
click_activate=True,
)
if first_selectable is None:
first_selectable = self._account_name_what_is_text
else:
self._account_name_what_is_text = None
self._refresh_account_name_text()
v -= signed_in_as_space * 0.4
else:
self._account_name_text = None
self._account_name_what_is_text = None
if self._back_button is None:
bbtn = ba.internal.get_special_widget('back_button')
@ -528,7 +552,7 @@ class AccountSettingsWindow(ba.Window):
),
],
),
on_activate_call=lambda: self._sign_in_press('Google Play'),
on_activate_call=lambda: self._sign_in_press(LoginType.GPGS),
)
if first_selectable is None:
first_selectable = btn
@ -541,16 +565,16 @@ class AccountSettingsWindow(ba.Window):
ba.widget(edit=btn, show_buffer_bottom=40, show_buffer_top=100)
self._sign_in_text = None
if show_v2_sign_in_button:
if show_v2_proxy_sign_in_button:
button_width = 350
v -= sign_in_button_space
self._sign_in_v2_button = btn = ba.buttonwidget(
self._sign_in_v2_proxy_button = btn = ba.buttonwidget(
parent=self._subcontainer,
position=((self._sub_width - button_width) * 0.5, v - 20),
autoselect=True,
size=(button_width, 60),
label='',
on_activate_call=self._v2_sign_in_press,
on_activate_call=self._v2_proxy_sign_in_press,
)
ba.textwidget(
parent=self._subcontainer,
@ -598,7 +622,7 @@ class AccountSettingsWindow(ba.Window):
if show_device_sign_in_button:
button_width = 350
v -= sign_in_button_space
v -= sign_in_button_space + deprecated_space
self._sign_in_device_button = btn = ba.buttonwidget(
parent=self._subcontainer,
position=((self._sub_width - button_width) * 0.5, v - 20),
@ -607,6 +631,18 @@ class AccountSettingsWindow(ba.Window):
label='',
on_activate_call=lambda: self._sign_in_press('Local'),
)
ba.textwidget(
parent=self._subcontainer,
h_align='center',
v_align='center',
size=(0, 0),
position=(self._sub_width * 0.5, v + 60),
text=ba.Lstr(resource='deprecatedText'),
scale=0.8,
maxwidth=300,
color=(0.6, 0.55, 0.45),
)
ba.textwidget(
parent=self._subcontainer,
draw_controller=btn,
@ -663,9 +699,7 @@ class AccountSettingsWindow(ba.Window):
color=(0.55, 0.5, 0.6),
icon=ba.gettexture('settingsIcon'),
textcolor=(0.75, 0.7, 0.8),
on_activate_call=lambda: ba.open_url(
'https://ballistica.net/accountsettings'
),
on_activate_call=ba.WeakCall(self._on_manage_account_press),
)
if first_selectable is None:
first_selectable = btn
@ -703,12 +737,12 @@ class AccountSettingsWindow(ba.Window):
if show_game_service_button:
button_width = 300
v -= game_service_button_space * 0.85
account_type = ba.internal.get_v1_account_type()
if account_type == 'Game Center':
account_type_name = ba.Lstr(resource='gameCenterText')
v1_account_type = ba.internal.get_v1_account_type()
if v1_account_type == 'Game Center':
v1_account_type_name = ba.Lstr(resource='gameCenterText')
else:
raise ValueError(
"unknown account type: '" + str(account_type) + "'"
"unknown account type: '" + str(v1_account_type) + "'"
)
self._game_service_button = btn = ba.buttonwidget(
parent=self._subcontainer,
@ -718,7 +752,7 @@ class AccountSettingsWindow(ba.Window):
autoselect=True,
on_activate_call=ba.internal.show_online_score_ui,
size=(button_width, 50),
label=account_type_name,
label=v1_account_type_name,
)
if first_selectable is None:
first_selectable = btn
@ -761,11 +795,15 @@ class AccountSettingsWindow(ba.Window):
autoselect=True,
icon=ba.gettexture(
'googlePlayAchievementsIcon'
if is_google
if is_gpgs
else 'achievementsIcon'
),
icon_color=(0.8, 0.95, 0.7) if is_google else (0.85, 0.8, 0.9),
on_activate_call=self._on_achievements_press,
icon_color=(0.8, 0.95, 0.7) if is_gpgs else (0.85, 0.8, 0.9),
on_activate_call=(
self._on_custom_achievements_press
if is_gpgs
else self._on_achievements_press
),
size=(button_width, 50),
label='',
)
@ -897,6 +935,7 @@ class AccountSettingsWindow(ba.Window):
scale=0.9,
color=(0.75, 0.7, 0.8),
maxwidth=self._sub_width * 0.95,
text=ba.Lstr(resource=self._r + '.linkedAccountsText'),
h_align='center',
v_align='center',
)
@ -905,6 +944,8 @@ class AccountSettingsWindow(ba.Window):
else:
self._linked_accounts_text = None
# Show link/unlink buttons only for V1 accounts.
if show_link_accounts_button:
v -= link_accounts_button_space
self._link_accounts_button = btn = ba.buttonwidget(
@ -984,6 +1025,50 @@ class AccountSettingsWindow(ba.Window):
else:
self._unlink_accounts_button = None
if show_v2_link_info:
v -= v2_link_info_space
ba.textwidget(
parent=self._subcontainer,
h_align='center',
v_align='center',
size=(0, 0),
position=(self._sub_width * 0.5, v + v2_link_info_space - 20),
text=ba.Lstr(resource='v2AccountLinkingInfoText'),
flatness=1.0,
scale=0.8,
maxwidth=450,
color=(0.5, 0.45, 0.55),
)
if self._show_legacy_unlink_button:
v -= legacy_unlink_button_space
button_width_w = button_width * 1.5
ba.textwidget(
parent=self._subcontainer,
position=(self._sub_width * 0.5 - 150.0, v + 75),
size=(300.0, 60),
text=ba.Lstr(resource='whatIsThisText'),
scale=0.8,
color=(0.3, 0.7, 0.05),
maxwidth=200.0,
h_align='center',
v_align='center',
autoselect=True,
selectable=True,
on_activate_call=show_what_is_legacy_unlinking_page,
click_activate=True,
)
btn = ba.buttonwidget(
parent=self._subcontainer,
position=((self._sub_width - button_width_w) * 0.5, v + 25),
autoselect=True,
size=(button_width_w, 60),
label=ba.Lstr(resource=self._r + '.unlinkLegacyV1AccountsText'),
textcolor=(0.8, 0.4, 0),
color=(0.55, 0.5, 0.6),
on_activate_call=self._unlink_accounts_press,
)
if show_sign_out_button:
v -= sign_out_button_space
self._sign_out_button = btn = ba.buttonwidget(
@ -1005,9 +1090,9 @@ class AccountSettingsWindow(ba.Window):
)
ba.widget(edit=btn, left_widget=bbtn, show_buffer_bottom=15)
if show_cancel_v2_sign_in_button:
v -= cancel_v2_sign_in_button_space
self._cancel_v2_sign_in_button = btn = ba.buttonwidget(
if show_cancel_sign_in_button:
v -= cancel_sign_in_button_space
self._cancel_sign_in_button = btn = ba.buttonwidget(
parent=self._subcontainer,
position=((self._sub_width - button_width) * 0.5, v),
size=(button_width, 60),
@ -1015,7 +1100,7 @@ class AccountSettingsWindow(ba.Window):
color=(0.55, 0.5, 0.6),
textcolor=(0.75, 0.7, 0.8),
autoselect=True,
on_activate_call=self._cancel_v2_sign_in_press,
on_activate_call=self._cancel_sign_in_press,
)
if first_selectable is None:
first_selectable = btn
@ -1038,33 +1123,51 @@ class AccountSettingsWindow(ba.Window):
)
self._needs_refresh = False
def _on_achievements_press(self) -> None:
# pylint: disable=cyclic-import
from bastd.ui import achievements
account_state = ba.internal.get_v1_account_state()
account_type = (
ba.internal.get_v1_account_type()
if account_state == 'signed_in'
else 'unknown'
)
# for google play we use the built-in UI; otherwise pop up our own
if account_type == 'Google Play':
def _on_custom_achievements_press(self) -> None:
ba.timer(
0.15,
ba.Call(ba.internal.show_online_score_ui, 'achievements'),
timetype=ba.TimeType.REAL,
)
elif account_type != 'unknown':
def _on_achievements_press(self) -> None:
# pylint: disable=cyclic-import
from bastd.ui import achievements
assert self._achievements_button is not None
achievements.AchievementsWindow(
position=self._achievements_button.get_screen_space_center()
)
else:
print(
'ERROR: unknown account type in on_achievements_press:',
account_type,
def _on_what_is_v2_press(self) -> None:
show_what_is_v2_page()
def _on_manage_account_press(self) -> None:
ba.screenmessage(ba.Lstr(resource='oneMomentText'))
# We expect to have a v2 account signed in if we get here.
if ba.app.accounts_v2.primary is None:
logging.exception(
'got manage-account press without v2 account present'
)
return
with ba.app.accounts_v2.primary:
ba.app.cloud.send_message_cb(
bacommon.cloud.ManageAccountMessage(),
on_response=ba.WeakCall(self._on_manage_account_response),
)
def _on_manage_account_response(
self, response: bacommon.cloud.ManageAccountResponse | Exception
) -> None:
if isinstance(response, Exception) or response.url is None:
ba.screenmessage(ba.Lstr(resource='errorText'), color=(1, 0, 0))
ba.playsound(ba.getsound('error'))
return
ba.open_url(response.url)
def _on_leaderboards_press(self) -> None:
ba.timer(
@ -1073,7 +1176,7 @@ class AccountSettingsWindow(ba.Window):
timetype=ba.TimeType.REAL,
)
def _have_unlinkable_accounts(self) -> bool:
def _have_unlinkable_v1_accounts(self) -> bool:
# if this is not present, we haven't had contact from the server so
# let's not proceed..
if ba.internal.get_public_login_id() is None:
@ -1086,16 +1189,33 @@ class AccountSettingsWindow(ba.Window):
def _update_unlink_accounts_button(self) -> None:
if self._unlink_accounts_button is None:
return
if self._have_unlinkable_accounts():
if self._have_unlinkable_v1_accounts():
clr = (0.75, 0.7, 0.8, 1.0)
else:
clr = (1.0, 1.0, 1.0, 0.25)
ba.textwidget(edit=self._unlink_accounts_button_label, color=clr)
def _should_show_legacy_unlink_button(self) -> bool:
# Only show this when fully signed in to a v2 account.
if not self._v1_signed_in or ba.app.accounts_v2.primary is None:
return False
out = self._have_unlinkable_v1_accounts()
return out
def _update_linked_accounts_text(self) -> None:
if self._linked_accounts_text is None:
return
# Disable this by default when signed in to a V2 account
# (since this shows V1 links which we should no longer care about).
if (
ba.app.accounts_v2.primary is not None
and not FORCE_ENABLE_V1_LINKING
):
return
# if this is not present, we haven't had contact from the server so
# let's not proceed..
if ba.internal.get_public_login_id() is None:
@ -1105,13 +1225,9 @@ class AccountSettingsWindow(ba.Window):
accounts = ba.internal.get_v1_account_misc_read_val_2(
'linkedAccounts', []
)
# our_account = _bs.get_v1_account_display_string()
# accounts = [a for a in accounts if a != our_account]
# accounts_str = u', '.join(accounts) if accounts else
# ba.Lstr(translate=('settingNames', 'None'))
# UPDATE - we now just print the number here; not the actual
# accounts
# (they can see that in the unlink section if they're curious)
# accounts (they can see that in the unlink section if they're
# curious)
accounts_str = str(max(0, len(accounts) - 1))
ba.textwidget(
edit=self._linked_accounts_text,
@ -1162,6 +1278,7 @@ class AccountSettingsWindow(ba.Window):
)
def _refresh_account_name_text(self) -> None:
if self._account_name_text is None:
return
try:
@ -1169,7 +1286,20 @@ class AccountSettingsWindow(ba.Window):
except Exception:
ba.print_exception()
name_str = '??'
ba.textwidget(edit=self._account_name_text, text=name_str)
if self._account_name_what_is_text is not None:
swidth = ba.internal.get_string_width(
name_str, suppress_warning=True
)
# Eww; number-fudging. Need to recalibrate this if
# account name scaling changes.
x = self._sub_width * 0.5 - swidth * 0.75 - 170
ba.textwidget(
edit=self._account_name_what_is_text,
position=(x, self._account_name_what_is_y),
)
def _refresh_achievements(self) -> None:
if (
@ -1199,7 +1329,7 @@ class AccountSettingsWindow(ba.Window):
# pylint: disable=cyclic-import
from bastd.ui.account import unlink
if not self._have_unlinkable_accounts():
if not self._have_unlinkable_v1_accounts():
ba.playsound(ba.getsound('error'))
return
unlink.AccountUnlinkWindow(origin_widget=self._unlink_accounts_button)
@ -1214,16 +1344,27 @@ class AccountSettingsWindow(ba.Window):
origin_widget=self._player_profiles_button
)
def _cancel_v2_sign_in_press(self) -> None:
# Just say we don't wanna be signed in anymore.
def _cancel_sign_in_press(self) -> None:
# If we're waiting on an adapter to give us credentials, abort.
self._signing_in_adapter = None
# Say we don't wanna be signed in anymore if we are.
ba.app.accounts_v2.set_primary_credentials(None)
self._needs_refresh = True
# Speed UI updates along.
ba.timer(0.1, ba.WeakCall(self._update), timetype=ba.TimeType.REAL)
def _sign_out_press(self) -> None:
if ba.app.accounts_v2.have_primary_credentials():
if (
ba.app.accounts_v2.primary is not None
and LoginType.GPGS in ba.app.accounts_v2.primary.logins
):
self._explicitly_signed_out_of_gpgs = True
ba.app.accounts_v2.set_primary_credentials(None)
else:
ba.internal.sign_out_v1()
@ -1242,25 +1383,90 @@ class AccountSettingsWindow(ba.Window):
# Speed UI updates along.
ba.timer(0.1, ba.WeakCall(self._update), timetype=ba.TimeType.REAL)
def _sign_in_press(
self, account_type: str, show_test_warning: bool = True
) -> None:
del show_test_warning # unused
ba.internal.sign_in_v1(account_type)
def _sign_in_press(self, login_type: str | LoginType) -> None:
# Make note of the type account we're *wanting* to be signed in with.
# V1 login types are strings.
if isinstance(login_type, str):
ba.internal.sign_in_v1(login_type)
# Make note of the type account we're *wanting*
# to be signed in with.
cfg = ba.app.config
cfg['Auto Account State'] = account_type
cfg['Auto Account State'] = login_type
cfg.commit()
self._needs_refresh = True
ba.timer(0.1, ba.WeakCall(self._update), timetype=ba.TimeType.REAL)
return
def _v2_sign_in_press(self) -> None:
# V2 login sign-in buttons generally go through adapters.
adapter = ba.app.accounts_v2.login_adapters.get(login_type)
if adapter is not None:
self._signing_in_adapter = adapter
adapter.sign_in(
result_cb=ba.WeakCall(self._on_adapter_sign_in_result)
)
# Will get 'Signing in...' to show.
self._needs_refresh = True
ba.timer(0.1, ba.WeakCall(self._update), timetype=ba.TimeType.REAL)
else:
ba.screenmessage(f'Unsupported login_type: {login_type.name}')
def _on_adapter_sign_in_result(
self,
adapter: LoginAdapter,
result: LoginAdapter.SignInResult | Exception,
) -> None:
is_us = self._signing_in_adapter is adapter
# If this isn't our current one we don't care.
if not is_us:
return
# If it is us, note that we're done.
self._signing_in_adapter = None
if isinstance(result, Exception):
# For now just make a bit of noise if anything went wrong;
# can get more specific as needed later.
ba.screenmessage(ba.Lstr(resource='errorText'), color=(1, 0, 0))
ba.playsound(ba.getsound('error'))
else:
# Success! Plug in these credentials which will begin
# verifying them and set our primary account-handle
# when finished.
ba.app.accounts_v2.set_primary_credentials(result.credentials)
# Special case - if the user has explicitly logged out and
# logged in again with GPGS via this button, warn them that
# they need to use the app if they want to switch to a
# different GPGS account.
if (
self._explicitly_signed_out_of_gpgs
and adapter.login_type is LoginType.GPGS
):
# Delay this slightly so it hopefully pops up after
# credentials go through and the account name shows up.
ba.timer(
1.5,
ba.Call(
ba.screenmessage,
ba.Lstr(
resource=self._r
+ '.googlePlayGamesAccountSwitchText'
),
),
)
# Speed any UI updates along.
self._needs_refresh = True
ba.timer(0.1, ba.WeakCall(self._update), timetype=ba.TimeType.REAL)
def _v2_proxy_sign_in_press(self) -> None:
# pylint: disable=cyclic-import
from bastd.ui.account.v2 import V2SignInWindow
from bastd.ui.account.v2proxy import V2ProxySignInWindow
assert self._sign_in_v2_button is not None
V2SignInWindow(origin_widget=self._sign_in_v2_button)
assert self._sign_in_v2_proxy_button is not None
V2ProxySignInWindow(origin_widget=self._sign_in_v2_proxy_button)
def _reset_progress(self) -> None:
try:
@ -1319,3 +1525,15 @@ class AccountSettingsWindow(ba.Window):
ba.containerwidget(edit=self._root_widget, selected_child=sel)
except Exception:
ba.print_exception(f'Error restoring state for {self}.')
def show_what_is_v2_page() -> None:
"""Show the webpage describing V2 accounts."""
bamasteraddr = ba.internal.get_master_server_address(version=2)
ba.open_url(f'{bamasteraddr}/whatisv2')
def show_what_is_legacy_unlinking_page() -> None:
"""Show the webpage describing legacy unlinking."""
bamasteraddr = ba.internal.get_master_server_address(version=2)
ba.open_url(f'{bamasteraddr}/whatarev1links')

View file

@ -0,0 +1,236 @@
# Released under the MIT License. See LICENSE for details.
#
"""V2 account ui bits."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import ba
import ba.internal
from efro.error import CommunicationError
import bacommon.cloud
if TYPE_CHECKING:
pass
STATUS_CHECK_INTERVAL_SECONDS = 2.0
class V2ProxySignInWindow(ba.Window):
"""A window allowing signing in to a v2 account."""
def __init__(self, origin_widget: ba.Widget):
self._width = 600
self._height = 550
self._proxyid: str | None = None
self._proxykey: str | None = None
uiscale = ba.app.ui.uiscale
super().__init__(
root_widget=ba.containerwidget(
size=(self._width, self._height),
transition='in_scale',
scale_origin_stack_offset=(
origin_widget.get_screen_space_center()
),
scale=(
1.25
if uiscale is ba.UIScale.SMALL
else 1.05
if uiscale is ba.UIScale.MEDIUM
else 0.9
),
)
)
self._loading_text = ba.textwidget(
parent=self._root_widget,
position=(self._width * 0.5, self._height * 0.5),
h_align='center',
v_align='center',
size=(0, 0),
maxwidth=0.9 * self._width,
text=ba.Lstr(
value='${A}...',
subs=[('${A}', ba.Lstr(resource='loadingText'))],
),
)
self._cancel_button = ba.buttonwidget(
parent=self._root_widget,
position=(30, self._height - 65),
size=(130, 50),
scale=0.8,
label=ba.Lstr(resource='cancelText'),
on_activate_call=self._done,
autoselect=True,
color=(0.55, 0.5, 0.6),
textcolor=(0.75, 0.7, 0.8),
)
if bool(False):
ba.containerwidget(
edit=self._root_widget, cancel_button=self._cancel_button
)
self._update_timer: ba.Timer | None = None
# Ask the cloud for a proxy login id.
ba.app.cloud.send_message_cb(
bacommon.cloud.LoginProxyRequestMessage(),
on_response=ba.WeakCall(self._on_proxy_request_response),
)
def _on_proxy_request_response(
self, response: bacommon.cloud.LoginProxyRequestResponse | Exception
) -> None:
from ba.internal import is_browser_likely_available
# Something went wrong. Show an error message and that's it.
if isinstance(response, Exception):
ba.textwidget(
edit=self._loading_text,
text=ba.Lstr(resource='internal.unavailableNoConnectionText'),
color=(1, 0, 0),
)
return
# Show link(s) the user can use to sign in.
address = (
ba.internal.get_master_server_address(version=2) + response.url
)
address_pretty = address.removeprefix('https://')
ba.textwidget(
parent=self._root_widget,
position=(self._width * 0.5, self._height - 95),
size=(0, 0),
text=ba.Lstr(
resource='accountSettingsWindow.v2LinkInstructionsText'
),
color=ba.app.ui.title_color,
maxwidth=self._width * 0.9,
h_align='center',
v_align='center',
)
button_width = 450
if is_browser_likely_available():
ba.buttonwidget(
parent=self._root_widget,
position=(
(self._width * 0.5 - button_width * 0.5),
self._height - 185,
),
autoselect=True,
size=(button_width, 60),
label=ba.Lstr(value=address_pretty),
color=(0.55, 0.5, 0.6),
textcolor=(0.75, 0.7, 0.8),
on_activate_call=lambda: ba.open_url(address),
)
qroffs = 0.0
else:
ba.textwidget(
parent=self._root_widget,
position=(self._width * 0.5, self._height - 145),
size=(0, 0),
text=ba.Lstr(value=address_pretty),
flatness=1.0,
maxwidth=self._width,
scale=0.75,
h_align='center',
v_align='center',
)
qroffs = 20.0
qr_size = 270
ba.imagewidget(
parent=self._root_widget,
position=(
self._width * 0.5 - qr_size * 0.5,
self._height * 0.36 + qroffs - qr_size * 0.5,
),
size=(qr_size, qr_size),
texture=ba.internal.get_qrcode_texture(address),
)
# Start querying for results.
self._proxyid = response.proxyid
self._proxykey = response.proxykey
ba.timer(
STATUS_CHECK_INTERVAL_SECONDS, ba.WeakCall(self._ask_for_status)
)
def _ask_for_status(self) -> None:
assert self._proxyid is not None
assert self._proxykey is not None
ba.app.cloud.send_message_cb(
bacommon.cloud.LoginProxyStateQueryMessage(
proxyid=self._proxyid, proxykey=self._proxykey
),
on_response=ba.WeakCall(self._got_status),
)
def _got_status(
self, response: bacommon.cloud.LoginProxyStateQueryResponse | Exception
) -> None:
# For now, if anything goes wrong on the server-side, just abort
# with a vague error message. Can be more verbose later if need be.
if (
isinstance(response, bacommon.cloud.LoginProxyStateQueryResponse)
and response.state is response.State.FAIL
):
ba.playsound(ba.getsound('error'))
ba.screenmessage(ba.Lstr(resource='errorText'), color=(1, 0, 0))
self._done()
return
# If we got a token, set ourself as signed in. Hooray!
if (
isinstance(response, bacommon.cloud.LoginProxyStateQueryResponse)
and response.state is response.State.SUCCESS
):
assert response.credentials is not None
ba.app.accounts_v2.set_primary_credentials(response.credentials)
# As a courtesy, tell the server we're done with this proxy
# so it can clean up (not a huge deal if this fails)
assert self._proxyid is not None
try:
ba.app.cloud.send_message_cb(
bacommon.cloud.LoginProxyCompleteMessage(
proxyid=self._proxyid
),
on_response=ba.WeakCall(self._proxy_complete_response),
)
except CommunicationError:
pass
except Exception:
logging.warning(
'Unexpected error sending login-proxy-complete message',
exc_info=True,
)
self._done()
return
# If we're still waiting, ask again soon.
if (
isinstance(response, Exception)
or response.state is response.State.WAITING
):
ba.timer(
STATUS_CHECK_INTERVAL_SECONDS, ba.WeakCall(self._ask_for_status)
)
def _proxy_complete_response(self, response: None | Exception) -> None:
del response # Not used.
# We could do something smart like retry on exceptions here, but
# this isn't critical so we'll just let anything slide.
def _done(self) -> None:
ba.containerwidget(edit=self._root_widget, transition='out_scale')

View file

@ -413,6 +413,9 @@ def handle_app_invites_press(force_code: bool = False) -> None:
and ba.internal.get_v1_account_misc_read_val('enableAppInvites', False)
and not app.on_tv
)
# Update: google's app invites are deprecated.
do_app_invites = False
if force_code:
do_app_invites = False

View file

@ -20,12 +20,12 @@ class ConfigErrorWindow(ba.Window):
self._config_file_path = ba.app.config_file_path
width = 800
super().__init__(
ba.containerwidget(size=(width, 300), transition='in_right')
ba.containerwidget(size=(width, 400), transition='in_right')
)
padding = 20
ba.textwidget(
parent=self._root_widget,
position=(padding, 220),
position=(padding, 220 + 60),
size=(width - 2 * padding, 100 - 2 * padding),
h_align='center',
v_align='top',
@ -41,7 +41,7 @@ class ConfigErrorWindow(ba.Window):
)
ba.textwidget(
parent=self._root_widget,
position=(padding, 198),
position=(padding, 198 + 60),
size=(width - 2 * padding, 100 - 2 * padding),
h_align='center',
v_align='top',

View file

@ -44,6 +44,7 @@ class GatherTab:
The tab should create and return a container widget covering the
specified region.
"""
raise RuntimeError('Should not get here.')
def on_deactivate(self) -> None:
"""Called when the tab will no longer be the active one."""

View file

@ -1094,7 +1094,6 @@ class PublicGatherTab(GatherTab):
self._parties_sorted.sort(
key=lambda p: (
p[1].queue is None, # Show non-queued last.
p[1].ping if p[1].ping is not None else 999999.0,
p[1].index,
)

View file

@ -515,7 +515,20 @@ class MainMenuWindow(ba.Window):
self._tdelay = 2.0
self._t_delay_inc = 0.02
self._t_delay_play = 1.7
self._next_refresh_allow_time = ba.time(ba.TimeType.REAL) + 2.01
def _set_allow_time() -> None:
self._next_refresh_allow_time = ba.time(ba.TimeType.REAL) + 2.5
# Slight hack: widget transitions currently only progress when
# frames are being drawn, but this tends to get called before
# frame drawing even starts, meaning we don't know exactly how
# long we should wait before refreshing to avoid interrupting
# the transition. To make things a bit better, let's do a
# redundant set of the time in a deferred call which hopefully
# happens closer to actual frame draw times.
_set_allow_time()
ba.pushcall(_set_allow_time)
ba.app.did_menu_intro = True
self._width = 400.0
self._height = 200.0

View file

@ -63,7 +63,7 @@ class PlaylistBrowserWindow(ba.Window):
)
uiscale = ba.app.ui.uiscale
self._width = 900 if uiscale is ba.UIScale.SMALL else 800
self._width = 900.0 if uiscale is ba.UIScale.SMALL else 800.0
x_inset = 50 if uiscale is ba.UIScale.SMALL else 0
self._height = (
480
@ -365,7 +365,7 @@ class PlaylistBrowserWindow(ba.Window):
self._sub_width = self._scroll_width
self._sub_height = (
40 + rows * (button_height + 2 * button_buffer_v) + 90
40.0 + rows * (button_height + 2 * button_buffer_v) + 90
)
assert self._sub_width is not None
assert self._sub_height is not None

View file

@ -4,18 +4,25 @@
# pylint: disable=too-many-lines
from __future__ import annotations
import time
import copy
import math
import logging
import weakref
from enum import Enum
from threading import Thread
from typing import TYPE_CHECKING
from efro.error import CommunicationError
import bacommon.cloud
import ba
import ba.internal
if TYPE_CHECKING:
from typing import Any, Callable, Sequence
MERCH_LINK_KEY = 'Merch Link'
class StoreBrowserWindow(ba.Window):
"""Window for browsing the store."""
@ -593,8 +600,14 @@ class StoreBrowserWindow(ba.Window):
else:
self._last_buy_time = curtime
# Pro is an actual IAP; the rest are ticket purchases.
if item == 'pro':
# Merch is a special case - just a link.
if item == 'merch':
url = ba.app.config.get('Merch Link')
if isinstance(url, str):
ba.open_url(url)
# Pro is an actual IAP, and the rest are ticket purchases.
elif item == 'pro':
ba.playsound(ba.getsound('click01'))
# Purchase either pro or pro_sale depending on whether
@ -681,7 +694,9 @@ class StoreBrowserWindow(ba.Window):
assert self.button_infos is not None
for b_type, b_info in self.button_infos.items():
if b_type in ['upgrades.pro', 'pro']:
if b_type == 'merch':
purchased = False
elif b_type in ['upgrades.pro', 'pro']:
purchased = ba.app.accounts_v1.have_pro()
else:
purchased = ba.internal.get_purchased(b_type)
@ -707,7 +722,11 @@ class StoreBrowserWindow(ba.Window):
color = (0.4, 0.8, 0.1)
extra_image_opacity = 1.0
call = b_info['call'] if 'call' in b_info else None
if b_type in ['upgrades.pro', 'pro']:
if b_type == 'merch':
price_text = ''
price_text_left = ''
price_text_right = ''
elif b_type in ['upgrades.pro', 'pro']:
sale_time = get_available_sale_time('extras')
if sale_time is not None:
priceraw = ba.internal.get_price('pro')
@ -888,7 +907,14 @@ class StoreBrowserWindow(ba.Window):
dummy_name
)
section['v_spacing'] = (
-17 if self._tab == 'characters' else 0
-25
if (
self._tab == 'extras'
and uiscale is ba.UIScale.SMALL
)
else -17
if self._tab == 'characters'
else 0
)
if 'title' not in section:
section['title'] = ''
@ -900,7 +926,13 @@ class StoreBrowserWindow(ba.Window):
else 0
)
section['y_offs'] = (
55
20
if (
self._tab == 'extras'
and uiscale is ba.UIScale.SMALL
and ba.app.config.get('Merch Link')
)
else 55
if (
self._tab == 'extras'
and uiscale is ba.UIScale.SMALL
@ -917,7 +949,9 @@ class StoreBrowserWindow(ba.Window):
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-nested-blocks
from bastd.ui.store import item as storeitemui
from bastd.ui.store.item import (
instantiate_store_item_display,
)
title_spacing = 40
button_border = 20
@ -1102,7 +1136,7 @@ class StoreBrowserWindow(ba.Window):
+ (b_width + button_spacing) * col,
v - b_height + boffs_v2,
)
storeitemui.instantiate_store_item_display(
instantiate_store_item_display(
item_name,
item,
parent_widget=cnt2,
@ -1325,3 +1359,41 @@ class StoreBrowserWindow(ba.Window):
)
if self._on_close_call is not None:
self._on_close_call()
def _check_merch_availability_in_bg_thread() -> None:
# pylint: disable=cell-var-from-loop
# Merch is available from some countries only.
# Make a reasonable check to ask the master-server about this at
# launch and store the results.
for _i in range(15):
try:
if ba.app.cloud.is_connected():
response = ba.app.cloud.send_message(
bacommon.cloud.MerchAvailabilityMessage()
)
def _store_in_logic_thread() -> None:
cfg = ba.app.config
current: str | None = cfg.get(MERCH_LINK_KEY)
if not isinstance(current, str | None):
current = None
if current != response.url:
cfg[MERCH_LINK_KEY] = response.url
cfg.commit()
# If we successfully get a response, kick it over to the
# logic thread to store and we're done.
ba.pushcall(_store_in_logic_thread, from_other_thread=True)
return
except CommunicationError:
pass
except Exception:
logging.warning(
'Unexpected error in merch-availability-check.', exc_info=True
)
time.sleep(1.1934) # A bit randomized to avoid aliasing.
Thread(target=_check_merch_availability_in_bg_thread, daemon=True).start()

View file

@ -45,19 +45,23 @@ def instantiate_store_item_display(
item['name'] = title = get_store_item_name_translated(item_name)
btn: ba.Widget | None
# Hack; showbuffer stuff isn't working well when we're showing merch.
showbuffer = 10 if item_name in {'merch', 'pro', 'pro_sale'} else 76.0
if button:
item['button'] = btn = ba.buttonwidget(
parent=parent_widget,
position=b_pos,
transition_delay=delay,
show_buffer_top=76.0,
show_buffer_top=showbuffer,
enable_sound=False,
button_type='square',
size=(b_width, b_height),
autoselect=True,
label='',
)
ba.widget(edit=btn, show_buffer_bottom=76.0)
ba.widget(edit=btn, show_buffer_bottom=showbuffer)
else:
btn = None
@ -92,6 +96,10 @@ def instantiate_store_item_display(
tint_tex = character.icon_mask_texture
title_v = 0.255
price_v = 0.145
elif item_name == 'merch':
base_text_scale = 0.6
title_v = 0.85
price_v = 0.15
elif item_name in ['upgrades.pro', 'pro']:
base_text_scale = 0.6
title_v = 0.85
@ -165,6 +173,23 @@ def instantiate_store_item_display(
tint2_color=tint2_color,
)
if item_name == 'merch':
frame_size = b_width * 0.65
im_dim = frame_size * (100.0 / 113.0)
im_pos = (
b_pos[0] + b_width * 0.5 - im_dim * 0.5 + b_offs_x,
b_pos[1] + b_height * 0.47 - im_dim * 0.5,
)
ba.imagewidget(
parent=parent_widget,
position=im_pos,
size=(im_dim, im_dim),
transition_delay=delay,
draw_controller=btn,
opacity=1.0,
texture=ba.gettexture('merch'),
)
if item_name in ['pro', 'upgrades.pro']:
frame_size = b_width * 0.5
im_dim = frame_size * (100.0 / 113.0)
@ -184,7 +209,6 @@ def instantiate_store_item_display(
)
txt = ba.Lstr(resource='store.bombSquadProNewDescriptionText')
# t = 'foo\nfoo\nfoo\nfoo\nfoo\nfoo'
item['descriptionText'] = ba.textwidget(
parent=parent_widget,
text=txt,

View file

@ -0,0 +1,117 @@
# Released under the MIT License. See LICENSE for details.
#
"""UI for upgrading V1 accounts to V2."""
from __future__ import annotations
import ba
import ba.internal
class V2UpgradeWindow(ba.Window):
"""A window presenting a URL to the user visually."""
def __init__(self, login_name: str, code: str):
from bastd.ui.account.settings import show_what_is_v2_page
app = ba.app
uiscale = app.ui.uiscale
self._code = code
self._width = 700
self._height = 270
super().__init__(
root_widget=ba.containerwidget(
size=(self._width, self._height + 40),
transition='in_right',
scale=(
1.25
if uiscale is ba.UIScale.SMALL
else 1.25
if uiscale is ba.UIScale.MEDIUM
else 1.25
),
)
)
ba.playsound(ba.getsound('error'))
ba.textwidget(
parent=self._root_widget,
position=(self._width * 0.5, self._height - 46),
size=(0, 0),
color=ba.app.ui.title_color,
h_align='center',
v_align='center',
text=ba.Lstr(
resource='deviceAccountUpgradeText',
subs=[('${NAME}', login_name)],
),
maxwidth=self._width * 0.95,
)
ba.textwidget(
parent=self._root_widget,
position=(self._width * 0.5, 125),
size=(0, 0),
scale=0.8,
color=(0.7, 0.8, 0.7),
h_align='center',
v_align='center',
text=(
ba.charstr(ba.SpecialChar.LOCAL_ACCOUNT)
+ login_name
+ ' ----> '
+ ba.charstr(ba.SpecialChar.V2_LOGO)
+ login_name
),
maxwidth=self._width * 0.95,
)
button_width = 200
cancel_button = ba.buttonwidget(
parent=self._root_widget,
position=(20, 25),
size=(button_width, 65),
autoselect=True,
label=ba.Lstr(resource='notNowText'),
on_activate_call=self._done,
)
_what_is_this_button = ba.buttonwidget(
parent=self._root_widget,
position=(self._width * 0.5 - button_width * 0.5, 25),
size=(button_width, 65),
autoselect=True,
label=ba.Lstr(resource='whatIsThisText'),
color=(0.55, 0.5, 0.6),
textcolor=(0.75, 0.7, 0.8),
on_activate_call=show_what_is_v2_page,
)
upgrade_button = ba.buttonwidget(
parent=self._root_widget,
position=(self._width - button_width - 20, 25),
size=(button_width, 65),
autoselect=True,
label=ba.Lstr(resource='upgradeText'),
on_activate_call=self._upgrade_press,
)
ba.containerwidget(
edit=self._root_widget,
selected_child=upgrade_button,
cancel_button=cancel_button,
)
def _upgrade_press(self) -> None:
# Get rid of the window and sign out before kicking the
# user over to a browser to do the upgrade. This hopefully
# makes it more clear when they come back that they need to
# sign in with the 'BombSquad account' option.
ba.containerwidget(edit=self._root_widget, transition='out_left')
ba.internal.sign_out_v1()
bamasteraddr = ba.internal.get_master_server_address(version=2)
ba.open_url(f'{bamasteraddr}/v2uda/{self._code}')
def _done(self) -> None:
ba.containerwidget(edit=self._root_widget, transition='out_left')

View file

@ -274,6 +274,44 @@ if TYPE_CHECKING:
) -> _CallNoArgs[OutT]:
...
# 4 arg call; 3 args bundled.
# noinspection PyPep8Naming
@overload
def Call(
call: Callable[[In1T, In2T, In3T, In4T], OutT],
arg1: In1T,
arg2: In2T,
arg3: In3T,
) -> _Call1Arg[In4T, OutT]:
...
# 4 arg call; 2 args bundled.
# noinspection PyPep8Naming
@overload
def Call(
call: Callable[[In1T, In2T, In3T, In4T], OutT],
arg1: In1T,
arg2: In2T,
) -> _Call2Args[In3T, In4T, OutT]:
...
# 4 arg call; 1 arg bundled.
# noinspection PyPep8Naming
@overload
def Call(
call: Callable[[In1T, In2T, In3T, In4T], OutT],
arg1: In1T,
) -> _Call3Args[In2T, In3T, In4T, OutT]:
...
# 4 arg call; no args bundled.
# noinspection PyPep8Naming
@overload
def Call(
call: Callable[[In1T, In2T, In3T, In4T], OutT],
) -> _Call4Args[In1T, In2T, In3T, In4T, OutT]:
...
# 5 arg call; 5 args bundled.
# noinspection PyPep8Naming
@overload

View file

@ -111,6 +111,8 @@ class IOAttrs:
boundaries (see efro.util.utc_today()).
'whole_hours', if True, requires datetime values to lie exactly on hour
boundaries (see efro.util.utc_this_hour()).
'whole_minutes', if True, requires datetime values to lie exactly on minute
boundaries (see efro.util.utc_this_minute()).
'soft_default', if passed, injects a default value into dataclass
instantiation when the field is not present in the input data.
This allows dataclasses to add new non-optional fields while
@ -136,6 +138,7 @@ class IOAttrs:
store_default: bool = True
whole_days: bool = False
whole_hours: bool = False
whole_minutes: bool = False
soft_default: Any = MISSING
soft_default_factory: Callable[[], Any] | _MissingType = MISSING
@ -145,6 +148,7 @@ class IOAttrs:
store_default: bool = store_default,
whole_days: bool = whole_days,
whole_hours: bool = whole_hours,
whole_minutes: bool = whole_minutes,
soft_default: Any = MISSING,
soft_default_factory: Callable[[], Any] | _MissingType = MISSING,
):
@ -160,6 +164,8 @@ class IOAttrs:
self.whole_days = whole_days
if whole_hours != cls.whole_hours:
self.whole_hours = whole_hours
if whole_minutes != cls.whole_minutes:
self.whole_minutes = whole_minutes
if soft_default is not cls.soft_default:
# Do what dataclasses does with its default types and
@ -216,13 +222,18 @@ class IOAttrs:
raise ValueError(
f'Value {value} at {fieldpath} is not a whole day.'
)
if self.whole_hours:
elif self.whole_hours:
if any(
x != 0 for x in (value.minute, value.second, value.microsecond)
):
raise ValueError(
f'Value {value} at {fieldpath}' f' is not a whole hour.'
)
elif self.whole_minutes:
if any(x != 0 for x in (value.second, value.microsecond)):
raise ValueError(
f'Value {value} at {fieldpath}' f' is not a whole minute.'
)
def _get_origin(anntype: Any) -> Any:

View file

@ -80,6 +80,15 @@ class IntegrityError(ValueError):
"""Data has been tampered with or corrupted in some form."""
class AuthenticationError(Exception):
"""Authentication has failed for some operation.
This can be raised if server-side-verification does not match
client-supplied credentials, if an invalid password is supplied
for a sign-in attempt, etc.
"""
def is_urllib_communication_error(exc: BaseException, url: str | None) -> bool:
"""Is the provided exception from urllib a communication-related error?
@ -188,6 +197,7 @@ def is_asyncio_streams_communication_error(exc: BaseException) -> bool:
firewall/connectivity issues, etc. These issues can often be safely
ignored or presented to the user as general 'connection-lost' events.
"""
# pylint: disable=too-many-return-statements
import ssl
if isinstance(
@ -227,4 +237,8 @@ def is_asyncio_streams_communication_error(exc: BaseException) -> bool:
if 'SSL: WRONG_VERSION_NUMBER' in excstr:
return True
# And seeing this very rarely; assuming its just data corruption?
if 'SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC' in excstr:
return True
return False

View file

@ -37,7 +37,27 @@ class LogLevel(Enum):
ERROR = 3
CRITICAL = 4
@property
def python_logging_level(self) -> int:
"""Give the corresponding logging level."""
return LOG_LEVEL_LEVELNOS[self]
@classmethod
def from_python_logging_level(cls, levelno: int) -> LogLevel:
"""Given a Python logging level, return a LogLevel."""
return LEVELNO_LOG_LEVELS[levelno]
# Python logging levels from LogLevels
LOG_LEVEL_LEVELNOS = {
LogLevel.DEBUG: logging.DEBUG,
LogLevel.INFO: logging.INFO,
LogLevel.WARNING: logging.WARNING,
LogLevel.ERROR: logging.ERROR,
LogLevel.CRITICAL: logging.CRITICAL,
}
# LogLevels from Python logging levels
LEVELNO_LOG_LEVELS = {
logging.DEBUG: LogLevel.DEBUG,
logging.INFO: LogLevel.INFO,
@ -128,7 +148,7 @@ class LogHandler(logging.Handler):
self._cache_lock = Lock()
self._printed_callback_error = False
self._thread_bootstrapped = False
self._thread = Thread(target=self._thread_main, daemon=True)
self._thread = Thread(target=self._log_thread_main, daemon=True)
self._thread.start()
# Spin until our thread is up and running; otherwise we could
@ -145,7 +165,7 @@ class LogHandler(logging.Handler):
with self._callbacks_lock:
self._callbacks.append(call)
def _thread_main(self) -> None:
def _log_thread_main(self) -> None:
self._event_loop = asyncio.new_event_loop()
# NOTE: if we ever use default threadpool at all we should allow
# setting it for our loop.

View file

@ -43,6 +43,20 @@ class SysResponse:
users of the api never see them.
"""
def set_local_exception(self, exc: Exception) -> None:
"""Attach a local exception to facilitate better logging/handling.
Be aware that this data does not get serialized and only
exists on the local object.
"""
setattr(self, '_sr_local_exception', exc)
def get_local_exception(self) -> Exception | None:
"""Fetch a local attached exception."""
value = getattr(self, '_sr_local_exception', None)
assert isinstance(value, Exception | None)
return value
# Some standard response types:

View file

@ -6,7 +6,6 @@ Supports static typing for message types and possible return types.
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from efro.error import CleanError, RemoteError, CommunicationError
@ -158,17 +157,18 @@ class MessageSender:
bound_obj, msg_encoded
)
except Exception as exc:
# Any error in the raw send call gets recorded as either
# a local or communication error.
return ErrorSysResponse(
error_message=f'Error in MessageSender @send_method'
f' ({type(exc)}): {exc}',
response = ErrorSysResponse(
error_message='Error in MessageSender @send_method.',
error_type=(
ErrorSysResponse.ErrorType.COMMUNICATION
if isinstance(exc, CommunicationError)
else ErrorSysResponse.ErrorType.LOCAL
),
)
# Can include the actual exception since we'll be looking at
# this locally; might be helpful.
response.set_local_exception(exc)
return response
return self._decode_raw_response(bound_obj, message, response_encoded)
async def fetch_raw_response_async(
@ -193,17 +193,18 @@ class MessageSender:
bound_obj, msg_encoded
)
except Exception as exc:
# Any error in the raw send call gets recorded as either
# a local or communication error.
return ErrorSysResponse(
error_message=f'Error in MessageSender @send_async_method'
f' ({type(exc)}): {exc}',
response = ErrorSysResponse(
error_message='Error in MessageSender @send_async_method.',
error_type=(
ErrorSysResponse.ErrorType.COMMUNICATION
if isinstance(exc, CommunicationError)
else ErrorSysResponse.ErrorType.LOCAL
),
)
# Can include the actual exception since we'll be looking at
# this locally; might be helpful.
response.set_local_exception(exc)
return response
return self._decode_raw_response(bound_obj, message, response_encoded)
def unpack_raw_response(
@ -250,18 +251,14 @@ class MessageSender:
self._decode_filter_call(
bound_obj, message, response_dict, response
)
except Exception:
# If we got to this point, we successfully communicated
# with the other end so errors represent protocol mismatches
# or other invalid data. For now let's just log it but perhaps
# we'd want to somehow embed it in the ErrorSysResponse to be
# available directly to the user later.
logging.exception('Error decoding raw response')
except Exception as exc:
response = ErrorSysResponse(
error_message='Error decoding raw response;'
' see log for details.',
error_message='Error decoding raw response.',
error_type=ErrorSysResponse.ErrorType.LOCAL,
)
# Since we'll be looking at this locally, we can include
# extra info for logging/etc.
response.set_local_exception(exc)
return response
def _unpack_raw_response(
@ -282,16 +279,24 @@ class MessageSender:
# Some error occurred. Raise a local Exception for it.
if isinstance(raw_response, ErrorSysResponse):
# Errors that happened locally can attach their exceptions
# here for extra logging goodness.
local_exception = raw_response.get_local_exception()
if (
raw_response.error_type
is ErrorSysResponse.ErrorType.COMMUNICATION
):
raise CommunicationError(raw_response.error_message)
raise CommunicationError(
raw_response.error_message
) from local_exception
# If something went wrong on *our* end of the connection,
# don't say it was a remote error.
if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL:
raise RuntimeError(raw_response.error_message)
raise RuntimeError(
raw_response.error_message
) from local_exception
# If they want to support clean errors, do those.
if (
@ -299,14 +304,18 @@ class MessageSender:
and raw_response.error_type
is ErrorSysResponse.ErrorType.REMOTE_CLEAN
):
raise CleanError(raw_response.error_message)
raise CleanError(
raw_response.error_message
) from local_exception
if (
self.protocol.forward_communication_errors
and raw_response.error_type
is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION
):
raise CommunicationError(raw_response.error_message)
raise CommunicationError(
raw_response.error_message
) from local_exception
# Everything else gets lumped in as a remote error.
raise RemoteError(
@ -316,7 +325,7 @@ class MessageSender:
if self._peer_desc_call is None
else self._peer_desc_call(bound_obj)
),
)
) from local_exception
assert isinstance(raw_response, Response)
return raw_response

View file

@ -628,7 +628,8 @@ class RPCEndpoint:
# Now just sit and handle stuff as it comes in.
while True:
assert not self._closing
if self._closing:
return
# Read message type.
mtype = _PacketType(await self._read_int_8())

View file

@ -39,6 +39,7 @@ class _EmptyObj:
pass
# TODO: kill this and just use efro.call.tpartial
if TYPE_CHECKING:
Call = Call
else:
@ -273,13 +274,14 @@ class DispatchMethodWrapper(Generic[ArgT, RetT]):
"""Type-aware standin for the dispatch func returned by dispatchmethod."""
def __call__(self, arg: ArgT) -> RetT:
pass
raise RuntimeError('Should not get here')
@staticmethod
def register(
func: Callable[[Any, Any], RetT]
) -> Callable[[Any, Any], RetT]:
"""Register a new dispatch handler for this dispatch-method."""
raise RuntimeError('Should not get here')
registry: dict[Any, Callable]