pcs/SOURCES/bz1783106-02-send-request-f...

534 lines
20 KiB
Diff

From 770252b476bc342ea08da2bc5b83de713463d14a Mon Sep 17 00:00:00 2001
From: Ivan Devat <idevat@redhat.com>
Date: Thu, 12 Mar 2020 15:32:31 +0100
Subject: [PATCH 1/2] send request from python to ruby more directly
Rack protection middleware is launched before
TornadoCommunicationMiddleware. When request parts are unpacked in
TornadoCommunicationMiddleware they are not checked by rack protection.
This commit changes communication between python and ruby - request is
sent to ruby more directly (without need to unpack request in sinatra
middleware).
---
pcs/daemon/ruby_pcsd.py | 217 ++++++++++++++--------
pcs_test/tier0/daemon/app/fixtures_app.py | 7 +-
pcs_test/tier0/daemon/test_ruby_pcsd.py | 61 ++----
pcsd/rserver.rb | 39 ++--
4 files changed, 175 insertions(+), 149 deletions(-)
diff --git a/pcs/daemon/ruby_pcsd.py b/pcs/daemon/ruby_pcsd.py
index e612f8da..53c53eaf 100644
--- a/pcs/daemon/ruby_pcsd.py
+++ b/pcs/daemon/ruby_pcsd.py
@@ -7,8 +7,8 @@ from time import time as now
import pycurl
from tornado.gen import convert_yielded
from tornado.web import HTTPError
-from tornado.httputil import split_host_and_port, HTTPServerRequest
-from tornado.httpclient import AsyncHTTPClient
+from tornado.httputil import HTTPServerRequest, HTTPHeaders
+from tornado.httpclient import AsyncHTTPClient, HTTPClientError
from tornado.curl_httpclient import CurlError
@@ -29,6 +29,11 @@ RUBY_LOG_LEVEL_MAP = {
"DEBUG": logging.DEBUG,
}
+__id_dict = {"id": 0}
+def get_request_id():
+ __id_dict["id"] += 1
+ return __id_dict["id"]
+
class SinatraResult(namedtuple("SinatraResult", "headers, status, body")):
@classmethod
def from_response(cls, response):
@@ -60,6 +65,59 @@ def process_response_logs(rb_log_list):
group_id=group_id
)
+class RubyDaemonRequest(namedtuple(
+ "RubyDaemonRequest",
+ "request_type, path, query, headers, method, body"
+)):
+ def __new__(
+ cls,
+ request_type,
+ http_request: HTTPServerRequest = None,
+ payload=None,
+ ):
+ headers = http_request.headers if http_request else HTTPHeaders()
+ headers.add("X-Pcsd-Type", request_type)
+ if payload:
+ headers.add(
+ "X-Pcsd-Payload",
+ b64encode(json.dumps(payload).encode()).decode()
+ )
+ return super(RubyDaemonRequest, cls).__new__(
+ cls,
+ request_type,
+ http_request.path if http_request else "",
+ http_request.query if http_request else "",
+ headers,
+ http_request.method if http_request else "GET",
+ http_request.body if http_request else None,
+ )
+
+ @property
+ def url(self):
+ # We do not need location for communication with ruby itself since we
+ # communicate via unix socket. But it is required by AsyncHTTPClient so
+ # "localhost" is used.
+ query = f"?{self.query}" if self.query else ""
+ return f"localhost/{self.path}{query}"
+
+ @property
+ def is_get(self):
+ return self.method.upper() == "GET"
+
+ @property
+ def has_http_request_detail(self):
+ return self.path or self.query or self.method != "GET" or self.body
+
+def log_ruby_daemon_request(label, request: RubyDaemonRequest):
+ log.pcsd.debug("%s type: '%s'", label, request.request_type)
+ if request.has_http_request_detail:
+ log.pcsd.debug("%s path: '%s'", label, request.path)
+ if request.query:
+ log.pcsd.debug("%s query: '%s'", label, request.query)
+ log.pcsd.debug("%s method: '%s'", label, request.method)
+ if request.body:
+ log.pcsd.debug("%s body: '%s'", label, request.body)
+
class Wrapper:
def __init__(self, pcsd_ruby_socket, debug=False):
self.__debug = debug
@@ -67,74 +125,87 @@ class Wrapper:
self.__client = AsyncHTTPClient()
self.__pcsd_ruby_socket = pcsd_ruby_socket
- @staticmethod
- def get_sinatra_request(request: HTTPServerRequest):
- host, port = split_host_and_port(request.host)
- return {"env": {
- "PATH_INFO": request.path,
- "QUERY_STRING": request.query,
- "REMOTE_ADDR": request.remote_ip,
- "REMOTE_HOST": request.host,
- "REQUEST_METHOD": request.method,
- "REQUEST_URI": f"{request.protocol}://{request.host}{request.uri}",
- "SCRIPT_NAME": "",
- "SERVER_NAME": host,
- "SERVER_PORT": port,
- "SERVER_PROTOCOL": request.version,
- "HTTP_HOST": request.host,
- "HTTP_ACCEPT": "*/*",
- "HTTP_COOKIE": ";".join([
- v.OutputString() for v in request.cookies.values()
- ]),
- "HTTPS": "on" if request.protocol == "https" else "off",
- "HTTP_VERSION": request.version,
- "REQUEST_PATH": request.path,
- "rack.input": request.body.decode("utf8"),
- }}
-
def prepare_curl_callback(self, curl):
curl.setopt(pycurl.UNIX_SOCKET_PATH, self.__pcsd_ruby_socket)
curl.setopt(pycurl.TIMEOUT, 70)
- async def send_to_ruby(self, request_json):
- # We do not need location for communication with ruby itself since we
- # communicate via unix socket. But it is required by AsyncHTTPClient so
- # "localhost" is used.
- tornado_request = b64encode(request_json.encode()).decode()
- return (await self.__client.fetch(
- "localhost",
- method="POST",
- body=f"TORNADO_REQUEST={tornado_request}",
- prepare_curl_callback=self.prepare_curl_callback,
- )).body
-
- async def run_ruby(self, request_type, request=None):
- """
- request_type: SINATRA_GUI|SINATRA_REMOTE|SYNC_CONFIGS
- request: result of get_sinatra_request|None
- i.e. it has structure returned by get_sinatra_request if the request
- is not None - so we can get SERVER_NAME and SERVER_PORT
- """
- request = request or {}
- request.update({"type": request_type})
- request_json = json.dumps(request)
-
- if self.__debug:
- log.pcsd.debug("Ruby daemon request: '%s'", request_json)
+ async def send_to_ruby(self, request: RubyDaemonRequest):
try:
- ruby_response = await self.send_to_ruby(request_json)
+ return (await self.__client.fetch(
+ request.url,
+ headers=request.headers,
+ method=request.method,
+ # Tornado enforces body=None for GET method:
+ # Even with `allow_nonstandard_methods` we disallow GET with a
+ # body (because libcurl doesn't allow it unless we use
+ # CUSTOMREQUEST). While the spec doesn't forbid clients from
+ # sending a body, it arguably disallows the server from doing
+ # anything with them.
+ body=(request.body if not request.is_get else None),
+ prepare_curl_callback=self.prepare_curl_callback,
+ )).body
except CurlError as e:
+ # This error we can get e.g. when ruby daemon is down.
log.pcsd.error(
"Cannot connect to ruby daemon (message: '%s'). Is it running?",
e
)
raise HTTPError(500)
+ except HTTPClientError as e:
+ # This error we can get e.g. when rack protection raises exception.
+ log.pcsd.error(
+ (
+ "Got error from ruby daemon (message: '%s')."
+ " Try checking system logs (e.g. journal, systemctl status"
+ " pcsd.service) for more information.."
+ ),
+ e
+ )
+ raise HTTPError(500)
+
+ async def run_ruby(
+ self,
+ request_type,
+ http_request: HTTPServerRequest = None,
+ payload=None,
+ ):
+ request = RubyDaemonRequest(request_type, http_request, payload)
+ request_id = get_request_id()
+
+ def log_request():
+ log_ruby_daemon_request(
+ f"Ruby daemon request (id: {request_id})",
+ request,
+ )
+
+ if self.__debug:
+ log_request()
+
+ return self.process_ruby_response(
+ f"Ruby daemon response (id: {request_id})",
+ log_request,
+ await self.send_to_ruby(request),
+ )
+
+ def process_ruby_response(self, label, log_request, ruby_response):
+ """
+ Return relevant part of unpacked ruby response. As a side effect
+ relevant logs are writen.
+ string label -- is used as a log prefix
+ callable log_request -- is used to log request when some errors happen;
+ we want to log request before error even if there is not debug mode
+ string ruby_response -- body of response from ruby; it should contain
+ json with dictionary with response specific keys
+ """
try:
response = json.loads(ruby_response)
if "error" in response:
+ if not self.__debug:
+ log_request()
log.pcsd.error(
- "Ruby daemon response contains an error: '%s'",
+ "%s contains an error: '%s'",
+ label,
json.dumps(response)
)
raise HTTPError(500)
@@ -144,56 +215,52 @@ class Wrapper:
body = b64decode(response.pop("body"))
if self.__debug:
log.pcsd.debug(
- "Ruby daemon response (without logs and body): '%s'",
+ "%s (without logs and body): '%s'",
+ label,
json.dumps(response)
)
- log.pcsd.debug("Ruby daemon response body: '%s'", body)
+ log.pcsd.debug("%s body: '%s'", label, body)
response["body"] = body
elif self.__debug:
log.pcsd.debug(
- "Ruby daemon response (without logs): '%s'",
+ "%s (without logs): '%s'",
+ label,
json.dumps(response)
)
process_response_logs(logs)
return response
except (json.JSONDecodeError, binascii.Error) as e:
if self.__debug:
- log.pcsd.debug("Ruby daemon response: '%s'", ruby_response)
+ log.pcsd.debug("%s: '%s'", label, ruby_response)
+ else:
+ log_request()
+
log.pcsd.error("Cannot decode json from ruby pcsd wrapper: '%s'", e)
raise HTTPError(500)
async def request_gui(
self, request: HTTPServerRequest, user, groups, is_authenticated
) -> SinatraResult:
- sinatra_request = self.get_sinatra_request(request)
# Sessions handling was removed from ruby. However, some session
# information is needed for ruby code (e.g. rendering some parts of
# templates). So this information must be sent to ruby by another way.
- sinatra_request.update({
- "session": {
+ return SinatraResult.from_response(
+ await convert_yielded(self.run_ruby(SINATRA_GUI, request, {
"username": user,
"groups": groups,
"is_authenticated": is_authenticated,
- }
- })
- response = await convert_yielded(self.run_ruby(
- SINATRA_GUI,
- sinatra_request
- ))
- return SinatraResult.from_response(response)
+ }))
+ )
async def request_remote(self, request: HTTPServerRequest) -> SinatraResult:
- response = await convert_yielded(self.run_ruby(
- SINATRA_REMOTE,
- self.get_sinatra_request(request)
- ))
- return SinatraResult.from_response(response)
+ return SinatraResult.from_response(
+ await convert_yielded(self.run_ruby(SINATRA_REMOTE, request))
+ )
async def sync_configs(self):
try:
- response = await convert_yielded(self.run_ruby(SYNC_CONFIGS))
- return response["next"]
+ return (await convert_yielded(self.run_ruby(SYNC_CONFIGS)))["next"]
except HTTPError:
log.pcsd.error("Config synchronization failed")
return int(now()) + DEFAULT_SYNC_CONFIG_DELAY
diff --git a/pcs_test/tier0/daemon/app/fixtures_app.py b/pcs_test/tier0/daemon/app/fixtures_app.py
index 8d5b8f4c..590203b4 100644
--- a/pcs_test/tier0/daemon/app/fixtures_app.py
+++ b/pcs_test/tier0/daemon/app/fixtures_app.py
@@ -20,7 +20,12 @@ class RubyPcsdWrapper(ruby_pcsd.Wrapper):
self.headers = {"Some": "value"}
self.body = b"Success action"
- async def run_ruby(self, request_type, request=None):
+ async def run_ruby(
+ self,
+ request_type,
+ http_request=None,
+ payload=None,
+ ):
if request_type != self.request_type:
raise AssertionError(
f"Wrong request type: expected '{self.request_type}'"
diff --git a/pcs_test/tier0/daemon/test_ruby_pcsd.py b/pcs_test/tier0/daemon/test_ruby_pcsd.py
index 28f14c87..32eb74cc 100644
--- a/pcs_test/tier0/daemon/test_ruby_pcsd.py
+++ b/pcs_test/tier0/daemon/test_ruby_pcsd.py
@@ -4,7 +4,7 @@ from base64 import b64encode
from unittest import TestCase, mock
from urllib.parse import urlencode
-from tornado.httputil import HTTPServerRequest
+from tornado.httputil import HTTPServerRequest, HTTPHeaders
from tornado.testing import AsyncTestCase, gen_test
from tornado.web import HTTPError
@@ -22,46 +22,17 @@ def create_http_request():
return HTTPServerRequest(
method="POST",
uri="/pcsd/uri",
- headers={"Cookie": "cookie1=first;cookie2=second"},
+ headers=HTTPHeaders({"Cookie": "cookie1=first;cookie2=second"}),
body=str.encode(urlencode({"post-key": "post-value"})),
host="pcsd-host:2224"
)
-class GetSinatraRequest(TestCase):
- def test_translate_request(self):
- # pylint: disable=invalid-name
- self.maxDiff = None
- self.assertEqual(
- create_wrapper().get_sinatra_request(create_http_request()),
- {
- 'env': {
- 'HTTPS': 'off',
- 'HTTP_ACCEPT': '*/*',
- 'HTTP_COOKIE': 'cookie1=first;cookie2=second',
- 'HTTP_HOST': 'pcsd-host:2224',
- 'HTTP_VERSION': 'HTTP/1.0',
- 'PATH_INFO': '/pcsd/uri',
- 'QUERY_STRING': '',
- 'REMOTE_ADDR': None, # It requires complicated request args
- 'REMOTE_HOST': 'pcsd-host:2224',
- 'REQUEST_METHOD': 'POST',
- 'REQUEST_PATH': '/pcsd/uri',
- 'REQUEST_URI': 'http://pcsd-host:2224/pcsd/uri',
- 'SCRIPT_NAME': '',
- 'SERVER_NAME': 'pcsd-host',
- 'SERVER_PORT': 2224,
- 'SERVER_PROTOCOL': 'HTTP/1.0',
- 'rack.input': 'post-key=post-value'
- }
- }
- )
-
patch_ruby_pcsd = create_patcher(ruby_pcsd)
class RunRuby(AsyncTestCase):
def setUp(self):
self.ruby_response = ""
- self.request = self.create_request()
+ self.request = ruby_pcsd.RubyDaemonRequest(ruby_pcsd.SYNC_CONFIGS)
self.wrapper = create_wrapper()
patcher = mock.patch.object(
self.wrapper,
@@ -72,14 +43,10 @@ class RunRuby(AsyncTestCase):
patcher.start()
super().setUp()
- async def send_to_ruby(self, request_json):
- self.assertEqual(json.loads(request_json), self.request)
+ async def send_to_ruby(self, ruby_request):
+ self.assertEqual(ruby_request, self.request)
return self.ruby_response
- @staticmethod
- def create_request(_type=ruby_pcsd.SYNC_CONFIGS):
- return {"type": _type}
-
def set_run_result(self, run_result):
self.ruby_response = json.dumps({**run_result, "logs": []})
@@ -125,10 +92,10 @@ class RunRuby(AsyncTestCase):
"body": b64encode(str.encode(body)).decode(),
})
http_request = create_http_request()
- self.request = {
- **self.create_request(ruby_pcsd.SINATRA_REMOTE),
- **self.wrapper.get_sinatra_request(http_request),
- }
+ self.request = ruby_pcsd.RubyDaemonRequest(
+ ruby_pcsd.SINATRA_REMOTE,
+ http_request,
+ )
result = yield self.wrapper.request_remote(http_request)
self.assert_sinatra_result(result, headers, status, body)
@@ -148,15 +115,15 @@ class RunRuby(AsyncTestCase):
"body": b64encode(str.encode(body)).decode(),
})
http_request = create_http_request()
- self.request = {
- **self.create_request(ruby_pcsd.SINATRA_GUI),
- **self.wrapper.get_sinatra_request(http_request),
- "session": {
+ self.request = ruby_pcsd.RubyDaemonRequest(
+ ruby_pcsd.SINATRA_GUI,
+ http_request,
+ {
"username": user,
"groups": groups,
"is_authenticated": is_authenticated,
}
- }
+ )
result = yield self.wrapper.request_gui(
http_request,
user=user,
diff --git a/pcsd/rserver.rb b/pcsd/rserver.rb
index 6002a73c..4b58f252 100644
--- a/pcsd/rserver.rb
+++ b/pcsd/rserver.rb
@@ -11,42 +11,25 @@ def pack_response(response)
return [200, {}, [response.to_json.to_str]]
end
-def unpack_request(transport_env)
- return JSON.parse(Base64.strict_decode64(
- transport_env["rack.request.form_hash"]["TORNADO_REQUEST"]
- ))
-end
-
class TornadoCommunicationMiddleware
def initialize(app)
@app = app
end
- def call(transport_env)
+ def call(env)
Thread.current[:pcsd_logger_container] = []
begin
- request = unpack_request(transport_env)
+ type = env["HTTP_X_PCSD_TYPE"]
- if ["sinatra_gui", "sinatra_remote"].include?(request["type"])
- if request["type"] == "sinatra_gui"
- session = request["session"]
+ if ["sinatra_gui", "sinatra_remote"].include?(type)
+ if type == "sinatra_gui"
+ session = JSON.parse(Base64.strict_decode64(env["HTTP_X_PCSD_PAYLOAD"]))
Thread.current[:tornado_username] = session["username"]
Thread.current[:tornado_groups] = session["groups"]
Thread.current[:tornado_is_authenticated] = session["is_authenticated"]
end
- # Keys rack.input and rack.errors are required. We make sure they are
- # there.
- request_env = request["env"]
- request_env["rack.input"] = StringIO.new(request_env["rack.input"])
- request_env["rack.errors"] = StringIO.new()
-
- status, headers, body = @app.call(request_env)
-
- rack_errors = request_env['rack.errors'].string()
- if not rack_errors.empty?()
- $logger.error(rack_errors)
- end
+ status, headers, body = @app.call(env)
return pack_response({
:status => status,
@@ -56,16 +39,20 @@ class TornadoCommunicationMiddleware
})
end
- if request["type"] == "sync_configs"
+ if type == "sync_configs"
return pack_response({
:next => Time.now.to_i + run_cfgsync(),
:logs => Thread.current[:pcsd_logger_container],
})
end
- raise "Unexpected value for key 'type': '#{request['type']}'"
+ return pack_response({
+ :error => "Unexpected value for key 'type': '#{type}'"
+ })
rescue => e
- return pack_response({:error => "Processing request error: '#{e}'"})
+ return pack_response({
+ :error => "Processing request error: '#{e}' '#{e.backtrace}'"
+ })
end
end
end
--
2.21.1