Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more content types #1575

Merged
merged 15 commits into from
Sep 8, 2020
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,12 @@ if(BUILD_TESTS)
CONSENSUS raft
)

add_e2e_test(
NAME content_types_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/content_types.py
CONSENSUS raft
)

if(QUOTES_ENABLED)
add_e2e_test(
NAME reconfiguration_test
Expand Down
96 changes: 62 additions & 34 deletions python/ccf/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,20 @@ def truncate(string: str, max_len: int = 256):
CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit"

DEFAULT_CONNECTION_TIMEOUT_SEC = 3
DEFAULT_REQUEST_TIMEOUT_SEC = 3
DEFAULT_REQUEST_TIMEOUT_SEC = 10
DEFAULT_COMMIT_TIMEOUT_SEC = 3

CONTENT_TYPE_TEXT = "text/plain"
CONTENT_TYPE_JSON = "application/json"
CONTENT_TYPE_BINARY = "application/octet-stream"


@dataclass
class Request:
#: Resource path (with optional query string)
path: str
#: Body of request
body: Optional[Union[dict, str]]
body: Optional[Union[dict, str, bytes]]
#: HTTP verb
http_verb: str
#: HTTP headers
Expand Down Expand Up @@ -85,7 +89,7 @@ class Response:
#: Response HTTP status code
status_code: int
#: Response body
body: Optional[Union[str, dict]]
body: Optional[Union[str, dict, bytes]]
#: CCF sequence number
seqno: Optional[int]
#: CCF consensus view
Expand All @@ -106,14 +110,12 @@ def __str__(self):
@staticmethod
def from_requests_response(rr):
content_type = rr.headers.get("content-type")
if content_type == "application/json":
if content_type == CONTENT_TYPE_JSON:
parsed_body = rr.json()
elif content_type == "text/plain":
elif content_type == CONTENT_TYPE_TEXT:
parsed_body = rr.text
elif content_type is None:
parsed_body = None
else:
raise ValueError(f"Unhandled content type: {content_type}")
parsed_body = rr.content

return Response(
status_code=rr.status_code,
Expand All @@ -132,14 +134,12 @@ def from_raw(raw):
raw_body = response.read(raw)

content_type = response.headers.get("content-type")
if content_type == "application/json":
if content_type == CONTENT_TYPE_JSON:
parsed_body = json.loads(raw_body)
elif content_type == "text/plain":
elif content_type == CONTENT_TYPE_TEXT:
parsed_body = raw_body.decode()
elif content_type is None:
parsed_body = None
else:
raise ValueError(f"Unhandled content type: {content_type}")
parsed_body = raw_body

return Response(
response.status,
Expand Down Expand Up @@ -223,18 +223,27 @@ def request(self, request, signed=False, timeout=DEFAULT_REQUEST_TIMEOUT_SEC):
if isinstance(request.body, str) and request.body.startswith("@"):
# Request is already a file path - pass it directly
cmd.extend(["--data-binary", request.body])
if request.body.lower().endswith(".json"):
content_type = CONTENT_TYPE_JSON
else:
content_type = CONTENT_TYPE_BINARY
else:
# Write request body to temp file
if isinstance(request.body, bytes):
if isinstance(request.body, str):
msg_bytes = request.body.encode()
content_type = CONTENT_TYPE_TEXT
elif isinstance(request.body, bytes):
msg_bytes = request.body
content_type = CONTENT_TYPE_BINARY
else:
msg_bytes = json.dumps(request.body).encode()
content_type = CONTENT_TYPE_JSON
LOG.debug(f"Writing request body: {truncate(msg_bytes)}")
nf.write(msg_bytes)
nf.flush()
cmd.extend(["--data-binary", f"@{nf.name}"])
if not "content-type" in request.headers:
request.headers["content-type"] = "application/json"
request.headers["content-type"] = content_type

# Set requested headers first - so they take precedence over defaults
for k, v in request.headers.items():
Expand Down Expand Up @@ -343,32 +352,38 @@ def request(
headers=["(request-target)", "Digest", "Content-Length"],
)

request_args = {
"method": request.http_verb,
"url": f"https://{self.host}:{self.port}{request.path}",
"auth": auth_value,
"headers": extra_headers,
"allow_redirects": False,
}

request_body = None
if request.body is not None:
if isinstance(request.body, str) and request.body.startswith("@"):
# Request is a file path - read contents, assume json
request_body = json.load(open(request.body[1:]))
else:
# Request is a file path - read contents
with open(request.body[1:], "rb") as f:
request_body = f.read()
if request.body.lower().endswith(".json"):
content_type = CONTENT_TYPE_JSON
else:
content_type = CONTENT_TYPE_BINARY
elif isinstance(request.body, str):
request_body = request.body.encode()
content_type = CONTENT_TYPE_TEXT
elif isinstance(request.body, bytes):
request_body = request.body
request_args["json"] = request_body
content_type = CONTENT_TYPE_BINARY
else:
request_body = json.dumps(request.body).encode()
content_type = CONTENT_TYPE_JSON

if not "content-type" in request.headers:
extra_headers["content-type"] = content_type

try:
response = self.session.request(
method=request.http_verb,
request.http_verb,
url=f"https://{self.host}:{self.port}{request.path}",
auth=auth_value,
headers=extra_headers,
allow_redirects=False,
json=request_body,
timeout=timeout,
data=request_body,
)
except requests.exceptions.ReadTimeout as exc:
raise TimeoutError from exc
Expand Down Expand Up @@ -437,7 +452,12 @@ def request(

assert self.ws is not None

payload = json.dumps(request.body).encode()
if isinstance(request.body, str):
payload = request.body.encode()
elif isinstance(request.body, bytes):
payload = request.body
else:
payload = json.dumps(request.body).encode()
path = (request.path).encode()
header = struct.pack("<h", len(path)) + path
# FIN, no RSV, BIN, UNMASKED every time, because it's all we support right now
Expand All @@ -452,7 +472,14 @@ def request(
global_commit = unpack_seqno_or_view(out[18:26])
payload = out[26:]
if status_code == 200:
body = json.loads(payload) if payload else None
if payload:
# Ideally, the content type should be sent like for HTTP.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The view here is that websockets is the efficient option, and it is the efficient option because it does away with the large variable header. Endpoints are expected to expose their schema out of band rather than send (or expect) self-describing traffic.

An application that wishes to replicate the content-type concept across their websockets endpoints would do so in the outer frame of their schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is it fine to keep the try-except? If not, then maybe for WebSocket it should always return bytes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should, but then we have to adjust the logging test accordingly. Let's keep it as is and do that clean up in a separate PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try:
body = json.loads(payload)
except json.JSONDecodeError:
body = payload
else:
body = None
else:
body = payload.decode()
return Response(status_code, body, seqno, view, global_commit, headers={})
Expand Down Expand Up @@ -513,7 +540,7 @@ def _response(self, response: Response) -> Response:
def _direct_call(
self,
path: str,
body: Optional[Union[str, dict]] = None,
body: Optional[Union[str, dict, bytes]] = None,
http_verb: str = "POST",
headers: Optional[dict] = None,
signed: bool = False,
Expand All @@ -532,7 +559,7 @@ def _direct_call(
def call(
self,
path: str,
body: Optional[Union[str, dict]] = None,
body: Optional[Union[str, dict, bytes]] = None,
http_verb: str = "POST",
headers: Optional[dict] = None,
signed: bool = False,
Expand All @@ -542,7 +569,8 @@ def call(
Issues one request, synchronously, and returns the response.

:param str path: URI of the targeted resource. Must begin with '/'
:param dict body: Request body (optional).
:param body: Request body (optional).
:type body: str or dict or bytes
:param str http_verb: HTTP verb (e.g. "POST" or "GET").
:param dict headers: HTTP request headers (optional).
:param bool signed: Sign request with client private key.
Expand Down
Loading