[ Avaa Bypassed ]




Upload:

Command:

www-data@18.217.35.130: ~ $
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

import datetime
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl.decode_asn1 import (
    _CRL_ENTRY_REASON_CODE_TO_ENUM,
    _asn1_integer_to_int,
    _asn1_string_to_bytes,
    _decode_x509_name,
    _obj2txt,
    _parse_asn1_generalized_time,
)
from cryptography.hazmat.backends.openssl.x509 import _Certificate
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509.ocsp import (
    OCSPCertStatus,
    OCSPRequest,
    OCSPResponse,
    OCSPResponseStatus,
    _CERT_STATUS_TO_ENUM,
    _OIDS_TO_HASH,
    _RESPONSE_STATUS_TO_ENUM,
)


def _issuer_key_hash(backend, cert_id):
    key_hash = backend._ffi.new("ASN1_OCTET_STRING **")
    res = backend._lib.OCSP_id_get0_info(
        backend._ffi.NULL,
        backend._ffi.NULL,
        key_hash,
        backend._ffi.NULL,
        cert_id,
    )
    backend.openssl_assert(res == 1)
    backend.openssl_assert(key_hash[0] != backend._ffi.NULL)
    return _asn1_string_to_bytes(backend, key_hash[0])


def _issuer_name_hash(backend, cert_id):
    name_hash = backend._ffi.new("ASN1_OCTET_STRING **")
    res = backend._lib.OCSP_id_get0_info(
        name_hash,
        backend._ffi.NULL,
        backend._ffi.NULL,
        backend._ffi.NULL,
        cert_id,
    )
    backend.openssl_assert(res == 1)
    backend.openssl_assert(name_hash[0] != backend._ffi.NULL)
    return _asn1_string_to_bytes(backend, name_hash[0])


def _serial_number(backend, cert_id):
    num = backend._ffi.new("ASN1_INTEGER **")
    res = backend._lib.OCSP_id_get0_info(
        backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id
    )
    backend.openssl_assert(res == 1)
    backend.openssl_assert(num[0] != backend._ffi.NULL)
    return _asn1_integer_to_int(backend, num[0])


def _hash_algorithm(backend, cert_id):
    asn1obj = backend._ffi.new("ASN1_OBJECT **")
    res = backend._lib.OCSP_id_get0_info(
        backend._ffi.NULL,
        asn1obj,
        backend._ffi.NULL,
        backend._ffi.NULL,
        cert_id,
    )
    backend.openssl_assert(res == 1)
    backend.openssl_assert(asn1obj[0] != backend._ffi.NULL)
    oid = _obj2txt(backend, asn1obj[0])
    try:
        return _OIDS_TO_HASH[oid]
    except KeyError:
        raise UnsupportedAlgorithm(
            "Signature algorithm OID: {} not recognized".format(oid)
        )


class _OCSPResponse(OCSPResponse):
    def __init__(self, backend, ocsp_response):
        self._backend = backend
        self._ocsp_response = ocsp_response
        status = self._backend._lib.OCSP_response_status(self._ocsp_response)
        self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
        self._status = _RESPONSE_STATUS_TO_ENUM[status]
        if self._status is OCSPResponseStatus.SUCCESSFUL:
            basic = self._backend._lib.OCSP_response_get1_basic(
                self._ocsp_response
            )
            self._backend.openssl_assert(basic != self._backend._ffi.NULL)
            self._basic = self._backend._ffi.gc(
                basic, self._backend._lib.OCSP_BASICRESP_free
            )
            num_resp = self._backend._lib.OCSP_resp_count(self._basic)
            if num_resp != 1:
                raise ValueError(
                    "OCSP response contains more than one SINGLERESP structure"
                    ", which this library does not support. "
                    "{} found".format(num_resp)
                )
            self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
            self._backend.openssl_assert(
                self._single != self._backend._ffi.NULL
            )
            self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
                self._single
            )
            self._backend.openssl_assert(
                self._cert_id != self._backend._ffi.NULL
            )

    response_status = utils.read_only_property("_status")

    def _requires_successful_response(self) -> None:
        if self.response_status != OCSPResponseStatus.SUCCESSFUL:
            raise ValueError(
                "OCSP response status is not successful so the property "
                "has no value"
            )

    @property
    def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
        self._requires_successful_response()
        alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
        self._backend.openssl_assert(alg != self._backend._ffi.NULL)
        oid = _obj2txt(self._backend, alg.algorithm)
        return x509.ObjectIdentifier(oid)

    @property
    def signature_hash_algorithm(
        self,
    ) -> typing.Optional[hashes.HashAlgorithm]:
        self._requires_successful_response()
        oid = self.signature_algorithm_oid
        try:
            return x509._SIG_OIDS_TO_HASH[oid]
        except KeyError:
            raise UnsupportedAlgorithm(
                "Signature algorithm OID:{} not recognized".format(oid)
            )

    @property
    def signature(self) -> bytes:
        self._requires_successful_response()
        sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
        self._backend.openssl_assert(sig != self._backend._ffi.NULL)
        return _asn1_string_to_bytes(self._backend, sig)

    @property
    def tbs_response_bytes(self) -> bytes:
        self._requires_successful_response()
        respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
        self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
        pp = self._backend._ffi.new("unsigned char **")
        res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
        self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
        pp = self._backend._ffi.gc(
            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
        )
        self._backend.openssl_assert(res > 0)
        return self._backend._ffi.buffer(pp[0], res)[:]

    @property
    def certificates(self) -> typing.List[x509.Certificate]:
        self._requires_successful_response()
        sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
        num = self._backend._lib.sk_X509_num(sk_x509)
        certs: typing.List[x509.Certificate] = []
        for i in range(num):
            x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i)
            self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL)
            cert = _Certificate(self._backend, x509_ptr)
            # We need to keep the OCSP response that the certificate came from
            # alive until the Certificate object itself goes out of scope, so
            # we give it a private reference.
            cert._ocsp_resp_ref = self
            certs.append(cert)

        return certs

    @property
    def responder_key_hash(self) -> typing.Optional[bytes]:
        self._requires_successful_response()
        _, asn1_string = self._responder_key_name()
        if asn1_string == self._backend._ffi.NULL:
            return None
        else:
            return _asn1_string_to_bytes(self._backend, asn1_string)

    @property
    def responder_name(self) -> typing.Optional[x509.Name]:
        self._requires_successful_response()
        x509_name, _ = self._responder_key_name()
        if x509_name == self._backend._ffi.NULL:
            return None
        else:
            return _decode_x509_name(self._backend, x509_name)

    def _responder_key_name(self):
        asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
        x509_name = self._backend._ffi.new("X509_NAME **")
        res = self._backend._lib.OCSP_resp_get0_id(
            self._basic, asn1_string, x509_name
        )
        self._backend.openssl_assert(res == 1)
        return x509_name[0], asn1_string[0]

    @property
    def produced_at(self) -> datetime.datetime:
        self._requires_successful_response()
        produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
            self._basic
        )
        return _parse_asn1_generalized_time(self._backend, produced_at)

    @property
    def certificate_status(self) -> OCSPCertStatus:
        self._requires_successful_response()
        status = self._backend._lib.OCSP_single_get0_status(
            self._single,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
        )
        self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
        return _CERT_STATUS_TO_ENUM[status]

    @property
    def revocation_time(self) -> typing.Optional[datetime.datetime]:
        self._requires_successful_response()
        if self.certificate_status is not OCSPCertStatus.REVOKED:
            return None

        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
        self._backend._lib.OCSP_single_get0_status(
            self._single,
            self._backend._ffi.NULL,
            asn1_time,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
        )
        self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
        return _parse_asn1_generalized_time(self._backend, asn1_time[0])

    @property
    def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
        self._requires_successful_response()
        if self.certificate_status is not OCSPCertStatus.REVOKED:
            return None

        reason_ptr = self._backend._ffi.new("int *")
        self._backend._lib.OCSP_single_get0_status(
            self._single,
            reason_ptr,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
        )
        # If no reason is encoded OpenSSL returns -1
        if reason_ptr[0] == -1:
            return None
        else:
            self._backend.openssl_assert(
                reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
            )
            return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]

    @property
    def this_update(self) -> datetime.datetime:
        self._requires_successful_response()
        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
        self._backend._lib.OCSP_single_get0_status(
            self._single,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
            asn1_time,
            self._backend._ffi.NULL,
        )
        self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
        return _parse_asn1_generalized_time(self._backend, asn1_time[0])

    @property
    def next_update(self) -> typing.Optional[datetime.datetime]:
        self._requires_successful_response()
        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
        self._backend._lib.OCSP_single_get0_status(
            self._single,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
            self._backend._ffi.NULL,
            asn1_time,
        )
        if asn1_time[0] != self._backend._ffi.NULL:
            return _parse_asn1_generalized_time(self._backend, asn1_time[0])
        else:
            return None

    @property
    def issuer_key_hash(self) -> bytes:
        self._requires_successful_response()
        return _issuer_key_hash(self._backend, self._cert_id)

    @property
    def issuer_name_hash(self) -> bytes:
        self._requires_successful_response()
        return _issuer_name_hash(self._backend, self._cert_id)

    @property
    def hash_algorithm(self) -> hashes.HashAlgorithm:
        self._requires_successful_response()
        return _hash_algorithm(self._backend, self._cert_id)

    @property
    def serial_number(self) -> int:
        self._requires_successful_response()
        return _serial_number(self._backend, self._cert_id)

    @utils.cached_property
    def extensions(self) -> x509.Extensions:
        self._requires_successful_response()
        return self._backend._ocsp_basicresp_ext_parser.parse(self._basic)

    @utils.cached_property
    def single_extensions(self) -> x509.Extensions:
        self._requires_successful_response()
        return self._backend._ocsp_singleresp_ext_parser.parse(self._single)

    def public_bytes(self, encoding: serialization.Encoding) -> bytes:
        if encoding is not serialization.Encoding.DER:
            raise ValueError("The only allowed encoding value is Encoding.DER")

        bio = self._backend._create_mem_bio_gc()
        res = self._backend._lib.i2d_OCSP_RESPONSE_bio(
            bio, self._ocsp_response
        )
        self._backend.openssl_assert(res > 0)
        return self._backend._read_mem_bio(bio)


class _OCSPRequest(OCSPRequest):
    def __init__(self, backend, ocsp_request):
        if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
            raise NotImplementedError(
                "OCSP request contains more than one request"
            )
        self._backend = backend
        self._ocsp_request = ocsp_request
        self._request = self._backend._lib.OCSP_request_onereq_get0(
            self._ocsp_request, 0
        )
        self._backend.openssl_assert(self._request != self._backend._ffi.NULL)
        self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request)
        self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)

    @property
    def issuer_key_hash(self) -> bytes:
        return _issuer_key_hash(self._backend, self._cert_id)

    @property
    def issuer_name_hash(self) -> bytes:
        return _issuer_name_hash(self._backend, self._cert_id)

    @property
    def serial_number(self) -> int:
        return _serial_number(self._backend, self._cert_id)

    @property
    def hash_algorithm(self) -> hashes.HashAlgorithm:
        return _hash_algorithm(self._backend, self._cert_id)

    @utils.cached_property
    def extensions(self) -> x509.Extensions:
        return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request)

    def public_bytes(self, encoding: serialization.Encoding) -> bytes:
        if encoding is not serialization.Encoding.DER:
            raise ValueError("The only allowed encoding value is Encoding.DER")

        bio = self._backend._create_mem_bio_gc()
        res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request)
        self._backend.openssl_assert(res > 0)
        return self._backend._read_mem_bio(bio)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 271 B 0644
aead.py File 5.57 KB 0644
backend.py File 103.85 KB 0644
ciphers.py File 9.01 KB 0644
cmac.py File 2.78 KB 0644
decode_asn1.py File 31.5 KB 0644
dh.py File 10.95 KB 0644
dsa.py File 10.43 KB 0644
ec.py File 13.06 KB 0644
ed25519.py File 5.65 KB 0644
ed448.py File 5.61 KB 0644
encode_asn1.py File 23.43 KB 0644
hashes.py File 3.02 KB 0644
hmac.py File 2.86 KB 0644
ocsp.py File 14.27 KB 0644
poly1305.py File 2.29 KB 0644
rsa.py File 20.43 KB 0644
utils.py File 2.23 KB 0644
x25519.py File 4.51 KB 0644
x448.py File 4.04 KB 0644
x509.py File 22.3 KB 0644