Refactor get_filename so it can be tested
Just pass in the headers instead of the whole response object so that it can be tested without needing an actual server.
This commit is contained in:
parent
d0676dbf7c
commit
d4b99b5ee9
@ -26,6 +26,12 @@ from composer.unix_socket import UnixHTTPConnectionPool
|
|||||||
def api_url(api_version, url):
|
def api_url(api_version, url):
|
||||||
"""Return the versioned path to the API route
|
"""Return the versioned path to the API route
|
||||||
|
|
||||||
|
:param api_version: The version of the API to talk to. eg. "0"
|
||||||
|
:type api_version: str
|
||||||
|
:param url: The API route to talk to
|
||||||
|
:type url: str
|
||||||
|
:returns: The full url to use for the route and API version
|
||||||
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
return os.path.normpath("/api/v%s/%s" % (api_version, url))
|
return os.path.normpath("/api/v%s/%s" % (api_version, url))
|
||||||
|
|
||||||
@ -124,7 +130,7 @@ def post_url_json(socket_path, url, body):
|
|||||||
headers={"Content-Type": "application/json"})
|
headers={"Content-Type": "application/json"})
|
||||||
return json.loads(r.data.decode("utf-8"))
|
return json.loads(r.data.decode("utf-8"))
|
||||||
|
|
||||||
def get_filename(response):
|
def get_filename(headers):
|
||||||
"""Get the filename from the response header
|
"""Get the filename from the response header
|
||||||
|
|
||||||
:param response: The urllib3 response object
|
:param response: The urllib3 response object
|
||||||
@ -133,12 +139,12 @@ def get_filename(response):
|
|||||||
:returns: Filename from content-disposition header
|
:returns: Filename from content-disposition header
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
log.debug("Headers = %s", response.headers)
|
log.debug("Headers = %s", headers)
|
||||||
if "content-disposition" not in response.headers:
|
if "content-disposition" not in headers:
|
||||||
raise RuntimeError("No Content-Disposition header; cannot get filename")
|
raise RuntimeError("No Content-Disposition header; cannot get filename")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
k, _, v = response.headers["content-disposition"].split(";")[1].strip().partition("=")
|
k, _, v = headers["content-disposition"].split(";")[1].strip().partition("=")
|
||||||
if k != "filename":
|
if k != "filename":
|
||||||
raise RuntimeError("No filename= found in content-disposition header")
|
raise RuntimeError("No filename= found in content-disposition header")
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
@ -163,7 +169,7 @@ def download_file(socket_path, url, progress=True):
|
|||||||
if not err["status"]:
|
if not err["status"]:
|
||||||
raise RuntimeError(err["error"]["msg"])
|
raise RuntimeError(err["error"]["msg"])
|
||||||
|
|
||||||
filename = get_filename(r)
|
filename = get_filename(r.headers)
|
||||||
if os.path.exists(filename):
|
if os.path.exists(filename):
|
||||||
msg = "%s exists, skipping download" % filename
|
msg = "%s exists, skipping download" % filename
|
||||||
log.error(msg)
|
log.error(msg)
|
||||||
|
Loading…
Reference in New Issue
Block a user