Delete dist directory

This commit is contained in:
Mikahael 2024-02-25 23:59:01 +05:30 committed by GitHub
parent b2bfb6afd8
commit 9e59089810
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
1776 changed files with 0 additions and 548278 deletions

View file

@ -1,359 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import logging
import binascii
import time
import re
import copy
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec, utils as ecutils
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
from py_vapid.utils import b64urldecode, b64urlencode
from py_vapid.jwt import sign
# Show compliance version. For earlier versions see previously tagged releases.
VERSION = "VAPID-RFC/ECE-RFC"
class VapidException(Exception):
"""An exception wrapper for Vapid."""
pass
class Vapid01(object):
"""Minimal VAPID Draft 01 signature generation library.
https://tools.ietf.org/html/draft-ietf-webpush-vapid-01
"""
_private_key = None
_public_key = None
_schema = "WebPush"
def __init__(self, private_key=None, conf=None):
"""Initialize VAPID with an optional private key.
:param private_key: A private key object
:type private_key: ec.EllipticCurvePrivateKey
"""
if conf is None:
conf = {}
self.conf = conf
self.private_key = private_key
if private_key:
self._public_key = self.private_key.public_key()
@classmethod
def from_raw(cls, private_raw):
"""Initialize VAPID using a private key point in "raw" or
"uncompressed" form. Raw keys consist of a single, 32 octet
encoded integer.
:param private_raw: A private key point in uncompressed form.
:type private_raw: bytes
"""
key = ec.derive_private_key(
int(binascii.hexlify(b64urldecode(private_raw)), 16),
curve=ec.SECP256R1(),
backend=default_backend())
return cls(key)
@classmethod
def from_raw_public(cls, public_raw):
key = ec.EllipticCurvePublicKey.from_encoded_point(
curve=ec.SECP256R1(),
data=b64urldecode(public_raw)
)
ss = cls()
ss._public_key = key
return ss
@classmethod
def from_pem(cls, private_key):
"""Initialize VAPID using a private key in PEM format.
:param private_key: A private key in PEM format.
:type private_key: bytes
"""
# not sure why, but load_pem_private_key fails to deserialize
return cls.from_der(
b''.join(private_key.splitlines()[1:-1]))
@classmethod
def from_der(cls, private_key):
"""Initialize VAPID using a private key in DER format.
:param private_key: A private key in DER format and Base64-encoded.
:type private_key: bytes
"""
key = serialization.load_der_private_key(b64urldecode(private_key),
password=None,
backend=default_backend())
return cls(key)
@classmethod
def from_file(cls, private_key_file=None):
"""Initialize VAPID using a file containing a private key in PEM or
DER format.
:param private_key_file: Name of the file containing the private key
:type private_key_file: str
"""
if not os.path.isfile(private_key_file):
logging.info("Private key not found, generating key...")
vapid = cls()
vapid.generate_keys()
vapid.save_key(private_key_file)
return vapid
with open(private_key_file, 'r') as file:
private_key = file.read()
try:
if "-----BEGIN" in private_key:
vapid = cls.from_pem(private_key.encode('utf8'))
else:
vapid = cls.from_der(private_key.encode('utf8'))
return vapid
except Exception as exc:
logging.error("Could not open private key file: %s", repr(exc))
raise VapidException(exc)
@classmethod
def from_string(cls, private_key):
"""Initialize VAPID using a string containing the private key. This
will try to determine if the key is in RAW or DER format.
:param private_key: String containing the key info
:type private_key: str
"""
pkey = private_key.encode().replace(b"\n", b"")
key = b64urldecode(pkey)
if len(key) == 32:
return cls.from_raw(pkey)
return cls.from_der(pkey)
@classmethod
def verify(cls, key, auth):
"""Verify a VAPID authorization token.
:param key: base64 serialized public key
:type key: str
:param auth: authorization token
type key: str
"""
tokens = auth.rsplit(' ', 1)[1].rsplit('.', 1)
kp = cls().from_raw_public(key.encode())
return kp.verify_token(
validation_token=tokens[0].encode(),
verification_token=tokens[1]
)
@property
def private_key(self):
"""The VAPID private ECDSA key"""
if not self._private_key:
raise VapidException("No private key. Call generate_keys()")
return self._private_key
@private_key.setter
def private_key(self, value):
"""Set the VAPID private ECDSA key
:param value: the byte array containing the private ECDSA key data
:type value: ec.EllipticCurvePrivateKey
"""
self._private_key = value
if value:
self._public_key = self.private_key.public_key()
@property
def public_key(self):
"""The VAPID public ECDSA key
The public key is currently read only. Set it via the `.private_key`
method. This will autogenerate a public and private key if no value
has been set.
:returns ec.EllipticCurvePublicKey
"""
return self._public_key
def generate_keys(self):
"""Generate a valid ECDSA Key Pair."""
self.private_key = ec.generate_private_key(ec.SECP256R1,
default_backend())
def private_pem(self):
return self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
def public_pem(self):
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def save_key(self, key_file):
"""Save the private key to a PEM file.
:param key_file: The file path to save the private key data
:type key_file: str
"""
with open(key_file, "wb") as file:
file.write(self.private_pem())
file.close()
def save_public_key(self, key_file):
"""Save the public key to a PEM file.
:param key_file: The name of the file to save the public key
:type key_file: str
"""
with open(key_file, "wb") as file:
file.write(self.public_pem())
file.close()
def verify_token(self, validation_token, verification_token):
"""Internally used to verify the verification token is correct.
:param validation_token: Provided validation token string
:type validation_token: str
:param verification_token: Generated verification token
:type verification_token: str
:returns: Boolean indicating if verifictation token is valid.
:rtype: boolean
"""
hsig = b64urldecode(verification_token.encode('utf8'))
r = int(binascii.hexlify(hsig[:32]), 16)
s = int(binascii.hexlify(hsig[32:]), 16)
try:
self.public_key.verify(
ecutils.encode_dss_signature(r, s),
validation_token,
signature_algorithm=ec.ECDSA(hashes.SHA256())
)
return True
except InvalidSignature:
return False
def _base_sign(self, claims):
cclaims = copy.deepcopy(claims)
if not cclaims.get('exp'):
cclaims['exp'] = int(time.time()) + 86400
if not self.conf.get('no-strict', False):
valid = _check_sub(cclaims.get('sub', ''))
else:
valid = cclaims.get('sub') is not None
if not valid:
raise VapidException(
"Missing 'sub' from claims. "
"'sub' is your admin email as a mailto: link.")
if not re.match(r"^https?://[^/:]+(:\d+)?$",
cclaims.get("aud", ""),
re.IGNORECASE):
raise VapidException(
"Missing 'aud' from claims. "
"'aud' is the scheme, host and optional port for this "
"transaction e.g. https://example.com:8080")
return cclaims
def sign(self, claims, crypto_key=None):
"""Sign a set of claims.
:param claims: JSON object containing the JWT claims to use.
:type claims: dict
:param crypto_key: Optional existing crypto_key header content. The
vapid public key will be appended to this data.
:type crypto_key: str
:returns: a hash containing the header fields to use in
the subscription update.
:rtype: dict
"""
sig = sign(self._base_sign(claims), self.private_key)
pkey = 'p256ecdsa='
pkey += b64urlencode(
self.public_key.public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint
))
if crypto_key:
crypto_key = crypto_key + ';' + pkey
else:
crypto_key = pkey
return {"Authorization": "{} {}".format(self._schema, sig.strip('=')),
"Crypto-Key": crypto_key}
class Vapid02(Vapid01):
"""Minimal Vapid RFC8292 signature generation library
https://tools.ietf.org/html/rfc8292
"""
_schema = "vapid"
def sign(self, claims, crypto_key=None):
sig = sign(self._base_sign(claims), self.private_key)
pkey = self.public_key.public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint
)
return{
"Authorization": "{schema} t={t},k={k}".format(
schema=self._schema,
t=sig,
k=b64urlencode(pkey)
)
}
@classmethod
def verify(cls, auth):
pref_tok = auth.rsplit(' ', 1)
assert pref_tok[0].lower() == cls._schema, (
"Incorrect schema specified")
parts = {}
for tok in pref_tok[1].split(','):
kv = tok.split('=', 1)
parts[kv[0]] = kv[1]
assert 'k' in parts.keys(), (
"Auth missing public key 'k' value")
assert 't' in parts.keys(), (
"Auth missing token set 't' value")
kp = cls().from_raw_public(parts['k'].encode())
tokens = parts['t'].rsplit('.', 1)
return kp.verify_token(
validation_token=tokens[0].encode(),
verification_token=tokens[1]
)
def _check_sub(sub):
pattern =(
r"^(mailto:.+@((localhost|[%\w-]+(\.[%\w-]+)+|([0-9a-f]{1,4}):+([0-9a-f]{1,4})?)))|https:\/\/(localhost|[\w-]+\.[\w\.-]+|([0-9a-f]{1,4}:+)+([0-9a-f]{1,4})?)$"
)
return re.match(pattern, sub, re.IGNORECASE) is not None
Vapid = Vapid02

View file

@ -1,87 +0,0 @@
import binascii
import json
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives import hashes
from py_vapid.utils import b64urldecode, b64urlencode, num_to_bytes
def extract_signature(auth):
"""Extracts the payload and signature from a JWT, converting from RFC7518
to RFC 3279
:param auth: A JWT Authorization Token.
:type auth: str
:return tuple containing the signature material and signature
"""
payload, asig = auth.encode('utf8').rsplit(b'.', 1)
sig = b64urldecode(asig)
if len(sig) != 64:
raise InvalidSignature()
encoded = utils.encode_dss_signature(
s=int(binascii.hexlify(sig[32:]), 16),
r=int(binascii.hexlify(sig[:32]), 16)
)
return payload, encoded
def decode(token, key):
"""Decode a web token into an assertion dictionary
:param token: VAPID auth token
:type token: str
:param key: bitarray containing the public key
:type key: str
:return dict of the VAPID claims
:raise InvalidSignature
"""
try:
sig_material, signature = extract_signature(token)
dkey = b64urldecode(key.encode('utf8'))
pkey = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(),
dkey,
)
pkey.verify(
signature,
sig_material,
ec.ECDSA(hashes.SHA256())
)
return json.loads(
b64urldecode(sig_material.split(b'.')[1]).decode('utf8')
)
except InvalidSignature:
raise
except(ValueError, TypeError, binascii.Error):
raise InvalidSignature()
def sign(claims, key):
"""Sign the claims
:param claims: list of JWS claims
:type claims: dict
:param key: Private key for signing
:type key: ec.EllipticCurvePrivateKey
:param algorithm: JWT "alg" descriptor
:type algorithm: str
"""
header = b64urlencode(b"""{"typ":"JWT","alg":"ES256"}""")
# Unfortunately, chrome seems to require the claims to be sorted.
claims = b64urlencode(json.dumps(claims,
separators=(',', ':'),
sort_keys=True).encode('utf8'))
token = "{}.{}".format(header, claims)
rsig = key.sign(token.encode('utf8'), ec.ECDSA(hashes.SHA256()))
(r, s) = utils.decode_dss_signature(rsig)
sig = b64urlencode(num_to_bytes(r, 32) + num_to_bytes(s, 32))
return "{}.{}".format(token, sig)

View file

@ -1,115 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import argparse
import os
import json
from cryptography.hazmat.primitives import serialization
from py_vapid import Vapid01, Vapid02, b64urlencode
def prompt(prompt):
# Not sure why, but python3 throws and exception if you try to
# monkeypatch for this. It's ugly, but this seems to play nicer.
try:
return input(prompt)
except NameError:
return raw_input(prompt) # noqa: F821
def main():
parser = argparse.ArgumentParser(description="VAPID tool")
parser.add_argument('--sign', '-s', help='claims file to sign')
parser.add_argument('--gen', '-g', help='generate new key pairs',
default=False, action="store_true")
parser.add_argument('--version2', '-2', help="use RFC8292 VAPID spec",
default=True, action="store_true")
parser.add_argument('--version1', '-1', help="use VAPID spec Draft-01",
default=False, action="store_true")
parser.add_argument('--json', help="dump as json",
default=False, action="store_true")
parser.add_argument('--no-strict', help='Do not be strict about "sub"',
default=False, action="store_true")
parser.add_argument('--applicationServerKey',
help="show applicationServerKey value",
default=False, action="store_true")
parser.add_argument('--private-key', '-k', help='private key pem file',
default="private_key.pem")
args = parser.parse_args()
# Added to solve 2.7 => 3.* incompatibility
Vapid = Vapid02
if args.version1:
Vapid = Vapid01
if args.gen or not os.path.exists(args.private_key):
if not args.gen:
print("No private key file found.")
answer = None
while answer not in ['y', 'n']:
answer = prompt("Do you want me to create one for you? (Y/n)")
if not answer:
answer = 'y'
answer = answer.lower()[0]
if answer == 'n':
print("Sorry, can't do much for you then.")
exit(1)
vapid = Vapid(conf=args)
vapid.generate_keys()
print("Generating private_key.pem")
vapid.save_key('private_key.pem')
print("Generating public_key.pem")
vapid.save_public_key('public_key.pem')
vapid = Vapid.from_file(args.private_key)
claim_file = args.sign
result = dict()
if args.applicationServerKey:
raw_pub = vapid.public_key.public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint
)
print("Application Server Key = {}\n\n".format(
b64urlencode(raw_pub)))
if claim_file:
if not os.path.exists(claim_file):
print("No {} file found.".format(claim_file))
print("""
The claims file should be a JSON formatted file that holds the
information that describes you. There are three elements in the claims
file you'll need:
"sub" This is your site's admin email address
(e.g. "mailto:admin@example.com")
"exp" This is the expiration time for the claim in seconds. If you don't
have one, I'll add one that expires in 24 hours.
You're also welcome to add additional fields to the claims which could be
helpful for the Push Service operations team to pass along to your operations
team (e.g. "ami-id": "e-123456", "cust-id": "a3sfa10987"). Remember to keep
these values short to prevent some servers from rejecting the transaction due
to overly large headers. See https://jwt.io/introduction/ for details.
For example, a claims.json file could contain:
{"sub": "mailto:admin@example.com"}
""")
exit(1)
try:
claims = json.loads(open(claim_file).read())
result.update(vapid.sign(claims))
except Exception as exc:
print("Crap, something went wrong: {}".format(repr(exc)))
raise exc
if args.json:
print(json.dumps(result))
return
print("Include the following headers in your request:\n")
for key, value in result.items():
print("{}: {}\n".format(key, value))
print("\n")
if __name__ == '__main__':
main()

View file

@ -1,282 +0,0 @@
import binascii
import base64
import copy
import os
import json
import unittest
from cryptography.hazmat.primitives import serialization
from mock import patch, Mock
from py_vapid import Vapid01, Vapid02, VapidException, _check_sub
from py_vapid.jwt import decode
TEST_KEY_PRIVATE_DER = """
MHcCAQEEIPeN1iAipHbt8+/KZ2NIF8NeN24jqAmnMLFZEMocY8RboAoGCCqGSM49
AwEHoUQDQgAEEJwJZq/GN8jJbo1GGpyU70hmP2hbWAUpQFKDByKB81yldJ9GTklB
M5xqEwuPM7VuQcyiLDhvovthPIXx+gsQRQ==
"""
key = dict(
d=111971876876285331364078054667935803036831194031221090723024134705696601261147, # noqa
x=7512698603580564493364310058109115206932767156853859985379597995200661812060, # noqa
y=74837673548863147047276043384733294240255217876718360423043754089982135570501 # noqa
)
# This is the same private key, in PEM form.
TEST_KEY_PRIVATE_PEM = (
"-----BEGIN PRIVATE KEY-----{}"
"-----END PRIVATE KEY-----\n").format(TEST_KEY_PRIVATE_DER)
# This is the same private key, as a point in uncompressed form. This should
# be Base64url-encoded without padding.
TEST_KEY_PRIVATE_RAW = """
943WICKkdu3z78pnY0gXw143biOoCacwsVkQyhxjxFs
""".strip().encode('utf8')
# This is a public key in PEM form.
TEST_KEY_PUBLIC_PEM = """-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEJwJZq/GN8jJbo1GGpyU70hmP2hb
WAUpQFKDByKB81yldJ9GTklBM5xqEwuPM7VuQcyiLDhvovthPIXx+gsQRQ==
-----END PUBLIC KEY-----
"""
# this is a public key in uncompressed form ('\x04' + 2 * 32 octets)
# Remember, this should have any padding stripped.
TEST_KEY_PUBLIC_RAW = (
"BBCcCWavxjfIyW6NRhqclO9IZj9oW1gFKUBSgwcigfNc"
"pXSfRk5JQTOcahMLjzO1bkHMoiw4b6L7YTyF8foLEEU"
).strip('=').encode('utf8')
def setup_module(self):
with open('/tmp/private', 'w') as ff:
ff.write(TEST_KEY_PRIVATE_PEM)
with open('/tmp/public', 'w') as ff:
ff.write(TEST_KEY_PUBLIC_PEM)
with open('/tmp/private.der', 'w') as ff:
ff.write(TEST_KEY_PRIVATE_DER)
def teardown_module(self):
os.unlink('/tmp/private')
os.unlink('/tmp/public')
class VapidTestCase(unittest.TestCase):
def check_keys(self, v):
assert v.private_key.private_numbers().private_value == key.get('d')
assert v.public_key.public_numbers().x == key.get('x')
assert v.public_key.public_numbers().y == key.get('y')
def test_init(self):
v1 = Vapid01.from_file("/tmp/private")
self.check_keys(v1)
v2 = Vapid01.from_pem(TEST_KEY_PRIVATE_PEM.encode())
self.check_keys(v2)
v3 = Vapid01.from_der(TEST_KEY_PRIVATE_DER.encode())
self.check_keys(v3)
v4 = Vapid01.from_file("/tmp/private.der")
self.check_keys(v4)
no_exist = '/tmp/not_exist'
Vapid01.from_file(no_exist)
assert os.path.isfile(no_exist)
os.unlink(no_exist)
def repad(self, data):
return data + "===="[len(data) % 4:]
@patch("py_vapid.Vapid01.from_pem", side_effect=Exception)
def test_init_bad_read(self, mm):
self.assertRaises(Exception,
Vapid01.from_file,
private_key_file="/tmp/private")
def test_gen_key(self):
v = Vapid01()
v.generate_keys()
assert v.public_key
assert v.private_key
def test_private_key(self):
v = Vapid01()
self.assertRaises(VapidException,
lambda: v.private_key)
def test_public_key(self):
v = Vapid01()
assert v._private_key is None
assert v._public_key is None
def test_save_key(self):
v = Vapid01()
v.generate_keys()
v.save_key("/tmp/p2")
os.unlink("/tmp/p2")
def test_same_public_key(self):
v = Vapid01()
v.generate_keys()
v.save_public_key("/tmp/p2")
os.unlink("/tmp/p2")
def test_from_raw(self):
v = Vapid01.from_raw(TEST_KEY_PRIVATE_RAW)
self.check_keys(v)
def test_from_string(self):
v1 = Vapid01.from_string(TEST_KEY_PRIVATE_DER)
v2 = Vapid01.from_string(TEST_KEY_PRIVATE_RAW.decode())
self.check_keys(v1)
self.check_keys(v2)
def test_sign_01(self):
v = Vapid01.from_string(TEST_KEY_PRIVATE_DER)
claims = {"aud": "https://example.com",
"sub": "mailto:admin@example.com"}
result = v.sign(claims, "id=previous")
assert result['Crypto-Key'] == (
'id=previous;p256ecdsa=' + TEST_KEY_PUBLIC_RAW.decode('utf8'))
pkey = binascii.b2a_base64(
v.public_key.public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint
)
).decode('utf8').replace('+', '-').replace('/', '_').strip()
items = decode(result['Authorization'].split(' ')[1], pkey)
for k in claims:
assert items[k] == claims[k]
result = v.sign(claims)
assert result['Crypto-Key'] == ('p256ecdsa=' +
TEST_KEY_PUBLIC_RAW.decode('utf8'))
# Verify using the same function as Integration
# this should ensure that the r,s sign values are correctly formed
assert Vapid01.verify(
key=result['Crypto-Key'].split('=')[1],
auth=result['Authorization']
)
def test_sign_02(self):
v = Vapid02.from_file("/tmp/private")
claims = {"aud": "https://example.com",
"sub": "mailto:admin@example.com",
"foo": "extra value"}
claim_check = copy.deepcopy(claims)
result = v.sign(claims, "id=previous")
auth = result['Authorization']
assert auth[:6] == 'vapid '
assert ' t=' in auth
assert ',k=' in auth
parts = auth[6:].split(',')
assert len(parts) == 2
t_val = json.loads(base64.urlsafe_b64decode(
self.repad(parts[0][2:].split('.')[1])
).decode('utf8'))
k_val = binascii.a2b_base64(self.repad(parts[1][2:]))
assert binascii.hexlify(k_val)[:2] == b'04'
assert len(k_val) == 65
assert claims == claim_check
for k in claims:
assert t_val[k] == claims[k]
def test_sign_02_localhost(self):
v = Vapid02.from_file("/tmp/private")
claims = {"aud": "http://localhost:8000",
"sub": "mailto:admin@example.com",
"foo": "extra value"}
result = v.sign(claims, "id=previous")
auth = result['Authorization']
assert auth[:6] == 'vapid '
assert ' t=' in auth
assert ',k=' in auth
def test_integration(self):
# These values were taken from a test page. DO NOT ALTER!
key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI"
"iBHXRdJI2Qhumhf6_LFTeZaNndIo")
auth = ("eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod"
"HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV"
"4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb"
"W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc"
"4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZolQA")
assert Vapid01.verify(key=key, auth="webpush {}".format(auth))
assert Vapid02.verify(auth="vapid t={},k={}".format(auth, key))
def test_bad_integration(self):
# These values were taken from a test page. DO NOT ALTER!
key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI"
"iBHXRdJI2Qhumhf6_LFTeZaNndIo")
auth = ("WebPush eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod"
"HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV"
"4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb"
"W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc"
"4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZ_BAD")
assert Vapid01.verify(key=key, auth=auth) == False
def test_bad_sign(self):
v = Vapid01.from_file("/tmp/private")
self.assertRaises(VapidException,
v.sign,
{})
self.assertRaises(VapidException,
v.sign,
{'sub': 'foo',
'aud': "p.example.com"})
self.assertRaises(VapidException,
v.sign,
{'sub': 'mailto:foo@bar.com',
'aud': "p.example.com"})
self.assertRaises(VapidException,
v.sign,
{'sub': 'mailto:foo@bar.com',
'aud': "https://p.example.com:8080/"})
def test_ignore_sub(self):
v = Vapid02.from_file("/tmp/private")
v.conf['no-strict'] = True
assert v.sign({"sub": "foo", "aud": "http://localhost:8000"})
@patch('cryptography.hazmat.primitives.asymmetric'
'.ec.EllipticCurvePublicNumbers')
def test_invalid_sig(self, mm):
from cryptography.exceptions import InvalidSignature
ve = Mock()
ve.verify.side_effect = InvalidSignature
pk = Mock()
pk.public_key.return_value = ve
mm.from_encoded_point.return_value = pk
self.assertRaises(InvalidSignature,
decode,
'foo.bar.blat',
'aaaa')
self.assertRaises(InvalidSignature,
decode,
'foo.bar.a',
'aaaa')
def test_sub(self):
valid = [
'mailto:me@localhost',
'mailto:me@1.2.3.4',
'mailto:me@1234::',
'mailto:me@1234::5678',
'mailto:admin@example.org',
'mailto:admin-test-case@example-test-case.test.org',
'https://localhost',
'https://exmample-test-case.test.org',
'https://8001::',
'https://8001:1000:0001',
'https://1.2.3.4'
]
invalid = [
'mailto:@foobar.com',
'mailto:example.org',
'mailto:0123:',
'mailto:::1234',
'https://somehost',
'https://xyz:123',
]
for val in valid:
assert _check_sub(val) is True
for val in invalid:
assert _check_sub(val) is False

View file

@ -1,39 +0,0 @@
import base64
import binascii
def b64urldecode(data):
"""Decodes an unpadded Base64url-encoded string.
:param data: data bytes to decode
:type data: bytes
:returns bytes
"""
return base64.urlsafe_b64decode(data + b"===="[len(data) % 4:])
def b64urlencode(data):
"""Encode a byte string into a Base64url-encoded string without padding
:param data: data bytes to encode
:type data: bytes
:returns str
"""
return base64.urlsafe_b64encode(data).replace(b'=', b'').decode('utf8')
def num_to_bytes(n, pad_to):
"""Returns the byte representation of an integer, in big-endian order.
:param n: The integer to encode.
:type n: int
:param pad_to: Expected length of result, zeropad if necessary.
:type pad_to: int
:returns bytes
"""
h = '%x' % n
r = binascii.unhexlify('0' * (len(h) % 2) + h)
return b'\x00' * (pad_to - len(r)) + r