1.7.13 , client_ping, player account privacy added

added _ba.get_client_ping(113)
and _ba.hide_player_device_id(True) . to hide players device id from roster of clients.
This commit is contained in:
Ayush Saini 2022-11-06 01:04:52 +05:30
parent 59730e19b3
commit e5034cbb6f
282 changed files with 35005 additions and 23205 deletions

View file

@ -1,6 +1,15 @@
# Released under the MIT License. See LICENSE for details.
#
"""Utilities for debugging memory leaks or other issues."""
"""Utilities for debugging memory leaks or other issues.
IMPORTANT - these functions use the gc module which looks 'under the hood'
at Python and sometimes returns not-fully-initialized objects, which may
cause crashes or errors due to suddenly having references to them that they
didn't expect, etc. See https://github.com/python/cpython/issues/59313.
For this reason, these methods should NEVER be called in production code.
Enable them only for debugging situations and be aware that their use may
itself cause problems. The same is true for the gc module itself.
"""
from __future__ import annotations
import gc
@ -27,7 +36,9 @@ ABS_MAX_LEVEL = 10
# we're showing some temporary objects that we should be ignoring.
def getobjs(cls: type | str, contains: str | None = None) -> list[Any]:
def getobjs(
cls: type | str, contains: str | None = None, expanded: bool = False
) -> list[Any]:
"""Return all garbage-collected objects matching criteria.
'type' can be an actual type or a string in which case objects
@ -45,17 +56,51 @@ def getobjs(cls: type | str, contains: str | None = None) -> list[Any]:
if not isinstance(contains, str | None):
raise TypeError('Expected a string or None for contains')
allobjs = _get_all_objects(expanded=expanded)
if isinstance(cls, str):
objs = [o for o in gc.get_objects() if cls in str(type(o))]
objs = [o for o in allobjs if cls in str(type(o))]
else:
objs = [o for o in gc.get_objects() if isinstance(o, cls)]
objs = [o for o in allobjs if isinstance(o, cls)]
if contains is not None:
objs = [o for o in objs if contains in str(o)]
return objs
def getobj(objid: int) -> Any:
# Recursively expand slists objects into olist, using seen to track
# already processed objects.
def _getr(slist: list[Any], olist: list[Any], seen: set[int]) -> None:
for obj in slist:
if id(obj) in seen:
continue
seen.add(id(obj))
olist.append(obj)
tll = gc.get_referents(obj)
if tll:
_getr(tll, olist, seen)
def _get_all_objects(expanded: bool) -> list[Any]:
"""Return an expanded list of all objects.
See https://utcc.utoronto.ca/~cks/space/blog/python/GetAllObjects
"""
gcl = gc.get_objects()
if not expanded:
return gcl
olist: list[Any] = []
seen: set[int] = set()
# Just in case:
seen.add(id(gcl))
seen.add(id(olist))
seen.add(id(seen))
# _getr does the real work.
_getr(gcl, olist, seen)
return olist
def getobj(objid: int, expanded: bool = False) -> Any:
"""Return a garbage-collected object by its id.
Remember that this is VERY inefficient and should only ever be used
@ -65,7 +110,10 @@ def getobj(objid: int) -> Any:
raise TypeError(f'Expected an int for objid; got a {type(objid)}.')
# Don't wanna return stuff waiting to be garbage-collected.
for obj in gc.get_objects():
gc.collect()
allobjs = _get_all_objects(expanded=expanded)
for obj in allobjs:
if id(obj) == objid:
return obj
raise RuntimeError(f'Object with id {objid} not found.')
@ -80,13 +128,15 @@ def getrefs(obj: Any) -> list[Any]:
def printfiles(file: TextIO | None = None) -> None:
"""Print info about open files in the current app."""
import io
file = sys.stderr if file is None else file
try:
import psutil
except ImportError:
print(
"Error: printfiles requires the 'psutil' module to be installed.",
file=file)
file=file,
)
return
proc = psutil.Process()
@ -110,16 +160,20 @@ def printfiles(file: TextIO | None = None) -> None:
textio_s = id(textio) if textio is not None else '<not found>'
fileio = fileio_ids.get(ofile.fd)
fileio_s = id(fileio) if fileio is not None else '<not found>'
print(f'#{i+1}: path={ofile.path!r},'
f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},'
f' FileIO={fileio_s}')
print(
f'#{i+1}: path={ofile.path!r},'
f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},'
f' FileIO={fileio_s}'
)
def printrefs(obj: Any,
max_level: int = 2,
exclude_objs: list[Any] | None = None,
expand_ids: list[int] | None = None,
file: TextIO | None = None) -> None:
def printrefs(
obj: Any,
max_level: int = 2,
exclude_objs: list[Any] | None = None,
expand_ids: list[int] | None = None,
file: TextIO | None = None,
) -> None:
"""Print human readable list of objects referring to an object.
'max_level' specifies how many levels of recursion are printed.
@ -129,20 +183,24 @@ def printrefs(obj: Any,
'expand_ids' can be a list of object ids; if that particular object is
found, it will always be expanded even if max_level has been reached.
"""
_printrefs(obj,
level=0,
max_level=max_level,
exclude_objs=[] if exclude_objs is None else exclude_objs,
expand_ids=[] if expand_ids is None else expand_ids,
file=sys.stderr if file is None else file)
_printrefs(
obj,
level=0,
max_level=max_level,
exclude_objs=[] if exclude_objs is None else exclude_objs,
expand_ids=[] if expand_ids is None else expand_ids,
file=sys.stderr if file is None else file,
)
def printtypes(limit: int = 50, file: TextIO | None = None) -> None:
def printtypes(
limit: int = 50, file: TextIO | None = None, expanded: bool = False
) -> None:
"""Print a human readable list of which types have the most instances."""
assert limit > 0
objtypes: dict[str, int] = {}
gc.collect() # Recommended before get_objects().
allobjs = gc.get_objects()
allobjs = _get_all_objects(expanded=expanded)
allobjc = len(allobjs)
for obj in allobjs:
modname = type(obj).__module__
@ -160,13 +218,45 @@ def printtypes(limit: int = 50, file: TextIO | None = None) -> None:
print(f'Types most allocated ({allobjc} total objects):', file=file)
for i, tpitem in enumerate(
sorted(objtypes.items(), key=lambda x: x[1],
reverse=True)[:limit]):
sorted(objtypes.items(), key=lambda x: x[1], reverse=True)[:limit]
):
tpname, tpval = tpitem
percent = tpval / allobjc * 100.0
print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
def printsizes(
limit: int = 50, file: TextIO | None = None, expanded: bool = False
) -> None:
"""Print total allocated sizes of different types."""
assert limit > 0
objsizes: dict[str, int] = {}
gc.collect() # Recommended before get_objects().
allobjs = _get_all_objects(expanded=expanded)
totalobjsize = 0
for obj in allobjs:
modname = type(obj).__module__
tpname = type(obj).__qualname__
if modname != 'builtins':
tpname = f'{modname}.{tpname}'
objsize = sys.getsizeof(obj)
objsizes[tpname] = objsizes.get(tpname, 0) + objsize
totalobjsize += objsize
totalobjmb = totalobjsize / (1024 * 1024)
print(
f'Types with most allocated bytes ({totalobjmb:.2f} mb total):',
file=file,
)
for i, tpitem in enumerate(
sorted(objsizes.items(), key=lambda x: x[1], reverse=True)[:limit]
):
tpname, tpval = tpitem
percent = tpval / totalobjsize * 100.0
print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
def _desctype(obj: Any) -> str:
cls = type(obj)
if cls is types.ModuleType:
@ -183,8 +273,13 @@ def _desc(obj: Any) -> str:
# Print length and the first few types.
tps = [_desctype(i) for i in obj[:3]]
tpsj = ', '.join(tps)
tpss = (f', contains [{tpsj}, ...]'
if len(obj) > 3 else f', contains [{tpsj}]' if tps else '')
tpss = (
f', contains [{tpsj}, ...]'
if len(obj) > 3
else f', contains [{tpsj}]'
if tps
else ''
)
extra = f' (len {len(obj)}{tpss})'
elif isinstance(obj, dict):
# If it seems to be the vars() for a type or module,
@ -199,16 +294,27 @@ def _desc(obj: Any) -> str:
f'{repr(n)}: {_desctype(v)}' for n, v in list(obj.items())[:3]
]
pairsj = ', '.join(pairs)
pairss = (f', contains {{{pairsj}, ...}}' if len(obj) > 3 else
f', contains {{{pairsj}}}' if pairs else '')
pairss = (
f', contains {{{pairsj}, ...}}'
if len(obj) > 3
else f', contains {{{pairsj}}}'
if pairs
else ''
)
extra = f' (len {len(obj)}{pairss})'
if extra is None:
extra = ''
return f'{_desctype(obj)} @ {id(obj)}{extra}'
def _printrefs(obj: Any, level: int, max_level: int, exclude_objs: list,
expand_ids: list[int], file: TextIO) -> None:
def _printrefs(
obj: Any,
level: int,
max_level: int,
exclude_objs: list,
expand_ids: list[int],
file: TextIO,
) -> None:
ind = ' ' * level
print(ind + _desc(obj), file=file)
v = vars()
@ -233,9 +339,11 @@ def _printrefs(obj: Any, level: int, max_level: int, exclude_objs: list,
# The 'refs' list we just made will be listed as a referrer
# of this obj, so explicitly exclude it from the obj's listing.
_printrefs(ref,
level=level + 1,
max_level=max_level,
exclude_objs=exclude_objs + [refs],
expand_ids=expand_ids,
file=file)
_printrefs(
ref,
level=level + 1,
max_level=max_level,
exclude_objs=exclude_objs + [refs],
expand_ids=expand_ids,
file=file,
)