pywbem/0005-python3_12.patch
Tony Asleson 9d11e00e66 Account for changes to python 3.12
Resolves: RHEL-47150

Signed-off-by: Tony Asleson <tasleson@redhat.com>
2024-07-15 21:25:13 -05:00

217 lines
10 KiB
Diff

diff --git a/pywbem/_cim_http.py b/pywbem/_cim_http.py
index b6080058..2779d7aa 100644
--- a/pywbem/_cim_http.py
+++ b/pywbem/_cim_http.py
@@ -57,22 +57,12 @@ from ._utils import _ensure_unicode, _ensure_bytes, _format
_ON_RTD = os.environ.get('READTHEDOCS', None) == 'True'
-if six.PY2 and not _ON_RTD: # RTD has no swig to install M2Crypto
- # pylint: disable=wrong-import-order,wrong-import-position
- from M2Crypto import SSL
- from M2Crypto.Err import SSLError
- from M2Crypto.m2 import OPENSSL_VERSION_TEXT as OPENSSL_VERSION
- _HAVE_M2CRYPTO = True
- # pylint: disable=invalid-name
- SocketErrors = (socket.error, socket.sslerror)
-else:
- # pylint: disable=wrong-import-order,wrong-import-position
- import ssl as SSL
- from ssl import SSLError, CertificateError
- from ssl import OPENSSL_VERSION
- _HAVE_M2CRYPTO = False
- # pylint: disable=invalid-name
- SocketErrors = (socket.error,)
+# pylint: disable=wrong-import-order
+import ssl as SSL
+from ssl import SSLError, CertificateError
+from ssl import OPENSSL_VERSION
+# pylint: disable=invalid-name
+SocketErrors = (socket.error,)
__all__ = ['DEFAULT_CA_CERT_PATHS']
@@ -519,12 +509,25 @@ def wbem_request(url, data, creds, cimxml_headers=None, debug=False, x509=None,
# Note: We do not use strict=True in the following call, because it
# is not clear what side effects that would have, and if no status
# line comes back we'll certainly find out about that.
+ ssl_context = SSL.create_default_context(purpose=SSL.Purpose.SERVER_AUTH)
+ ssl_context.check_hostname = False
+
+ if no_verification:
+ ssl_context.verify_mode = SSL.CERT_NONE
+
+ if cert_file and key_file:
+ ssl_context.load_cert_chain(cert_file, key_file)
+
+ if ca_certs:
+ ssl_context.load_verify_locations(ca_certs)
+
+ # 3.12 removed key_file, cert_file, etc.
httplib.HTTPSConnection.__init__(self, host=host, port=port,
- key_file=key_file,
- cert_file=cert_file,
+ context=ssl_context,
timeout=timeout)
self.ca_certs = ca_certs
self.verify_callback = verify_callback
+ self.ctx = ssl_context
# issue 297: Verify_callback is not used in py 3
if verify_callback is not None and six.PY3:
warnings.warn("The 'verify_callback' parameter was specified "
@@ -534,137 +537,25 @@ def wbem_request(url, data, creds, cimxml_headers=None, debug=False, x509=None,
def connect(self):
# pylint: disable=too-many-branches
"""Connect to a host on a given (SSL) port."""
+ # set up the socket
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(self.timeout)
- # Connect for M2Crypto ssl package
- if _HAVE_M2CRYPTO:
- # Calling httplib.HTTPSConnection.connect(self) does not work
- # because of its ssl.wrap_socket() call. So we copy the code of
- # that connect() method modulo the ssl.wrap_socket() call.
-
- # Another change is that we do not pass the timeout value
- # on to the socket call, because that does not work with
- # M2Crypto.
-
- if sys.version_info[0:2] >= (2, 7):
- # the source_address parameter was added in Python 2.7
- self.sock = socket.create_connection(
- (self.host, self.port), None, self.source_address)
- else:
- self.sock = socket.create_connection(
- (self.host, self.port), None)
-
- # Removed code for tunneling support.
-
- # End of code from httplib.HTTPSConnection.connect(self).
-
- ctx = SSL.Context('sslv23')
-
- if self.cert_file:
- ctx.load_cert(self.cert_file, keyfile=self.key_file)
- if self.ca_certs:
- ctx.set_verify(
- SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
- depth=9, callback=verify_callback)
- # M2Crypto requires binary strings as path names and
- # otherwise raises TypeError.
- ca_certs = _ensure_bytes(self.ca_certs)
- if os.path.isdir(self.ca_certs):
- ctx.load_verify_locations(capath=ca_certs)
- else:
- ctx.load_verify_locations(cafile=ca_certs)
- try:
- self.sock = SSL.Connection(ctx, self.sock)
-
- # Below is a body of SSL.Connection.connect() method
- # except for the first line (socket connection).
-
- # Removed code for tunneling support.
-
- # Setting the timeout on the input socket does not work
- # with M2Crypto, with such a timeout set it calls a
- # different low level function (nbio instead of bio)
- # that does not work. The symptom is that reading the
- # response returns None.
- # Therefore, we set the timeout at the level of the outer
- # M2Crypto socket object.
- # pylint: disable=using-constant-test
-
- if self.timeout is not None:
- self.sock.set_socket_read_timeout(
- SSL.timeout(self.timeout))
- self.sock.set_socket_write_timeout(
- SSL.timeout(self.timeout))
-
- self.sock.addr = (self.host, self.port)
- self.sock.setup_ssl()
- self.sock.set_connect_state()
- ret = self.sock.connect_ssl()
- if self.ca_certs:
- check = getattr(self.sock, 'postConnectionCheck',
- self.sock.clientPostConnectionCheck)
- if check is not None:
- if not check(self.sock.get_peer_cert(), self.host):
- raise ConnectionError(
- 'SSL error: post connection check failed',
- conn_id=conn_id)
- return ret
-
- except (SSLError, SSL.SSLError,
- SSL.Checker.SSLVerificationError) as arg:
- raise ConnectionError(
- _format("SSL error {0}: {1}; OpenSSL version: {2}",
- arg.__class__, arg, OPENSSL_VERSION),
- conn_id=conn_id)
-
- # Connect using Python SSL module
- else:
- # Setup the socket context
-
- # Note: PROTOCOL_SSLv23 allows talking to servers with TLS but
- # not with SSL. For details, see the table in
- # https://docs.python.org/3/library/ssl.html#ssl.wrap_socket
- # Within the defined set of protocol versions, SSLv23 selects
- # the highest protocol version that both client and server
- # support.
- # Issue #893: Consider the use of default_context()
- ctx = SSL.SSLContext(SSL.PROTOCOL_SSLv23)
-
- if self.cert_file:
- ctx.load_cert_chain(self.cert_file, keyfile=self.key_file)
- if self.ca_certs:
- # We need to use CERT_REQUIRED to require that the server
- # certificate is being validated by the client (against the
- # certificates in ca_certs).
- ctx.verify_mode = SSL.CERT_REQUIRED
- if os.path.isdir(self.ca_certs):
- ctx.load_verify_locations(capath=self.ca_certs)
- else:
- ctx.load_verify_locations(cafile=self.ca_certs)
- ctx.check_hostname = True
- else:
- ctx.check_hostname = False
- ctx.verify_mode = SSL.CERT_NONE
-
- # setup the socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
-
- try:
- self.sock = ctx.wrap_socket(sock,
- server_hostname=self.host)
- return self.sock.connect((self.host, self.port))
+ try:
+ self.sock = self.ctx.wrap_socket(sock)
+ return self.sock.connect((self.host, self.port))
- except SSLError as arg:
- raise ConnectionError(
- _format("SSL error {0}: {1}; OpenSSL version: {2}",
- arg.__class__, arg, OPENSSL_VERSION),
- conn_id=conn_id)
- except CertificateError as arg:
- raise ConnectionError(
- _format("SSL certificate error {0}: {1}; "
- "OpenSSL version: {2}",
- arg.__class__, arg, OPENSSL_VERSION),
- conn_id=conn_id)
+ except SSLError as arg:
+ raise ConnectionError(
+ _format("SSL error {0}: {1}; OpenSSL version: {2}",
+ arg.__class__, arg, OPENSSL_VERSION),
+ conn_id=conn_id)
+ except CertificateError as arg:
+ raise ConnectionError(
+ _format("SSL certificate error {0}: {1}; "
+ "OpenSSL version: {2}",
+ arg.__class__, arg, OPENSSL_VERSION),
+ conn_id=conn_id)
class FileHTTPConnection(HTTPBaseConnection, httplib.HTTPConnection):
"""Execute client connection based on a unix domain socket. """