Bombsquad-Ballistica-Modded.../dist/ba_data/python/bacommon/cloud.py

215 lines
5 KiB
Python
Raw Normal View History

2022-06-09 01:26:46 +05:30
# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to cloud functionality."""
from __future__ import annotations
2022-06-30 00:31:52 +05:30
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Annotated
2022-06-09 01:26:46 +05:30
from enum import Enum
from efro.message import Message, Response
from efro.dataclassio import ioprepped, IOAttrs
2022-06-30 00:31:52 +05:30
from bacommon.transfer import DirectoryManifest
2022-12-25 00:39:49 +05:30
from bacommon.login import LoginType
2022-06-09 01:26:46 +05:30
if TYPE_CHECKING:
pass
@ioprepped
@dataclass
class LoginProxyRequestMessage(Message):
"""Request send to the cloud to ask for a login-proxy."""
@classmethod
2022-10-01 14:51:35 +05:30
def get_response_types(cls) -> list[type[Response] | None]:
2022-06-09 01:26:46 +05:30
return [LoginProxyRequestResponse]
@ioprepped
@dataclass
class LoginProxyRequestResponse(Response):
"""Response to a request for a login proxy."""
# URL to direct the user to for login.
url: Annotated[str, IOAttrs('u')]
# Proxy-Login id for querying results.
proxyid: Annotated[str, IOAttrs('p')]
# Proxy-Login key for querying results.
proxykey: Annotated[str, IOAttrs('k')]
@ioprepped
@dataclass
class LoginProxyStateQueryMessage(Message):
"""Soo.. how is that login proxy going?"""
2022-06-09 01:26:46 +05:30
proxyid: Annotated[str, IOAttrs('p')]
proxykey: Annotated[str, IOAttrs('k')]
@classmethod
2022-10-01 14:51:35 +05:30
def get_response_types(cls) -> list[type[Response] | None]:
2022-06-09 01:26:46 +05:30
return [LoginProxyStateQueryResponse]
@ioprepped
@dataclass
class LoginProxyStateQueryResponse(Response):
"""Here's the info on that login-proxy you asked about, boss."""
class State(Enum):
"""States a login-proxy can be in."""
2022-06-09 01:26:46 +05:30
WAITING = 'waiting'
SUCCESS = 'success'
FAIL = 'fail'
state: Annotated[State, IOAttrs('s')]
# On success, these will be filled out.
2022-06-30 00:31:52 +05:30
credentials: Annotated[str | None, IOAttrs('tk')]
2022-06-09 01:26:46 +05:30
@ioprepped
@dataclass
class LoginProxyCompleteMessage(Message):
"""Just so you know, we're done with this proxy."""
2022-06-09 01:26:46 +05:30
proxyid: Annotated[str, IOAttrs('p')]
2022-07-16 17:59:14 +05:30
@ioprepped
@dataclass
class PingMessage(Message):
"""Standard ping."""
@classmethod
2022-10-01 14:51:35 +05:30
def get_response_types(cls) -> list[type[Response] | None]:
2022-07-16 17:59:14 +05:30
return [PingResponse]
@ioprepped
@dataclass
class PingResponse(Response):
"""pong."""
2022-06-09 01:26:46 +05:30
@ioprepped
@dataclass
2022-06-30 00:31:52 +05:30
class TestMessage(Message):
"""Can I get some of that workspace action?"""
2022-06-30 00:31:52 +05:30
testfoo: Annotated[int, IOAttrs('f')]
@classmethod
2022-10-01 14:51:35 +05:30
def get_response_types(cls) -> list[type[Response] | None]:
2022-06-30 00:31:52 +05:30
return [TestResponse]
2022-06-09 01:26:46 +05:30
@ioprepped
@dataclass
2022-06-30 00:31:52 +05:30
class TestResponse(Response):
"""Here's that workspace you asked for, boss."""
2022-06-09 01:26:46 +05:30
2022-06-30 00:31:52 +05:30
testfoo: Annotated[int, IOAttrs('f')]
@ioprepped
@dataclass
class WorkspaceFetchState:
"""Common state data for a workspace fetch."""
2022-06-30 00:31:52 +05:30
manifest: Annotated[DirectoryManifest, IOAttrs('m')]
iteration: Annotated[int, IOAttrs('i')] = 0
total_deletes: Annotated[int, IOAttrs('tdels')] = 0
total_downloads: Annotated[int, IOAttrs('tdlds')] = 0
total_up_to_date: Annotated[int | None, IOAttrs('tunmd')] = None
2022-06-09 01:26:46 +05:30
@ioprepped
@dataclass
2022-06-30 00:31:52 +05:30
class WorkspaceFetchMessage(Message):
"""Can I get some of that workspace action?"""
2022-06-30 00:31:52 +05:30
workspaceid: Annotated[str, IOAttrs('w')]
state: Annotated[WorkspaceFetchState, IOAttrs('s')]
2022-06-09 01:26:46 +05:30
2022-06-30 00:31:52 +05:30
@classmethod
2022-10-01 14:51:35 +05:30
def get_response_types(cls) -> list[type[Response] | None]:
2022-06-30 00:31:52 +05:30
return [WorkspaceFetchResponse]
2022-06-09 01:26:46 +05:30
2022-06-30 00:31:52 +05:30
@ioprepped
@dataclass
class WorkspaceFetchResponse(Response):
"""Here's that workspace you asked for, boss."""
state: Annotated[WorkspaceFetchState, IOAttrs('s')]
deletes: Annotated[list[str], IOAttrs('dlt', store_default=False)] = field(
default_factory=list
)
downloads_inline: Annotated[
dict[str, bytes], IOAttrs('dinl', store_default=False)
] = field(default_factory=dict)
2022-06-30 00:31:52 +05:30
done: Annotated[bool, IOAttrs('d')] = False
2022-12-25 00:39:49 +05:30
@ioprepped
@dataclass
class MerchAvailabilityMessage(Message):
"""Can we show merch link?"""
@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
return [MerchAvailabilityResponse]
@ioprepped
@dataclass
class MerchAvailabilityResponse(Response):
"""About that merch..."""
url: Annotated[str | None, IOAttrs('u')]
@ioprepped
@dataclass
class SignInMessage(Message):
"""Can I sign in please?"""
login_type: Annotated[LoginType, IOAttrs('l')]
sign_in_token: Annotated[str, IOAttrs('t')]
@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
return [SignInResponse]
@ioprepped
@dataclass
class SignInResponse(Response):
"""Here's that sign-in result you asked for, boss."""
credentials: Annotated[str | None, IOAttrs('c')]
@ioprepped
@dataclass
class ManageAccountMessage(Message):
"""Message asking for a manage-account url."""
@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
return [ManageAccountResponse]
@ioprepped
@dataclass
class ManageAccountResponse(Response):
"""Here's that sign-in result you asked for, boss."""
url: Annotated[str | None, IOAttrs('u')]