Skip to content
This repository has been archived by the owner on Sep 30, 2022. It is now read-only.

Commit

Permalink
Improve aiohttp usage for ddi (readchunk and timeout)
Browse files Browse the repository at this point in the history
Replaces read(chunk_size) with readchunk which reads as much as it can.
This is the better approach than to choose a chunk_size that works well
under all circumstances (HTTP, HTTPS).

Also, sets the timeouts properly. Before the context helper didn't do
anything since the requests would timeout after 5 minutes anyway.
Now, the timeouts are respected as expected.

Signed-off-by: Livio Bieri <livio@livio.li>
  • Loading branch information
livioso committed Jun 14, 2018
1 parent 7abb898 commit ace4fee
Showing 1 changed file with 41 additions and 33 deletions.
74 changes: 41 additions & 33 deletions rauc_hawkbit/ddi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import hashlib
import logging

from aiohttp.client import ClientTimeout
from datetime import datetime
from enum import Enum

Expand Down Expand Up @@ -163,17 +164,17 @@ async def get_resource(self, api_path, query_params={}, **kwargs):
**kwargs))

self.logger.debug('GET {}'.format(url))
with aiohttp.Timeout(self.timeout):
async with self.session.get(url, headers=get_headers,
params=query_params) as resp:
await self.check_http_status(resp)
json = await resp.json()
self.logger.debug(json)
return json
async with self.session.get(url, headers=get_headers,
params=query_params,
timeout=ClientTimeout(self.timeout)) as resp:
await self.check_http_status(resp)
json = await resp.json()
self.logger.debug(json)
return json

async def get_binary_resource(self, api_path, dl_location,
mime='application/octet-stream',
chunk_size=512, timeout=3600, **kwargs):
timeout=3600, **kwargs):
"""
Helper method for binary HTTP GET API requests.
Expand All @@ -185,7 +186,6 @@ async def get_binary_resource(self, api_path, dl_location,
Keyword Args:
mime: mimetype of content to retrieve
(default: 'application/octet-stream')
chunk_size: size of chunk to retrieve
kwargs: Other keyword args used for replacing items in the API path
Returns:
Expand All @@ -196,11 +196,10 @@ async def get_binary_resource(self, api_path, dl_location,
tenant=self.tenant,
controllerId=self.controller_id,
**kwargs))
return await self.get_binary(url, dl_location, mime, chunk_size,
timeout=timeout)
return await self.get_binary(url, dl_location, mime, timeout=timeout)

async def get_binary(self, url, dl_location,
mime='application/octet-stream', chunk_size=512,
mime='application/octet-stream',
timeout=3600):
"""
Actual download method with checksum checking.
Expand All @@ -211,9 +210,8 @@ async def get_binary(self, url, dl_location,
Keyword Args:
mime: mimetype of content to retrieve
(default: 'application/octet-stream')
chunk_size: size of chunk to retrieve
timeout: download timeout
(default: 3600)
(default: 3600)
Returns:
MD5 hash of downloaded content
Expand All @@ -225,17 +223,25 @@ async def get_binary(self, url, dl_location,
hash_md5 = hashlib.md5()

self.logger.debug('GET binary {}'.format(url))
with aiohttp.Timeout(timeout, loop=self.session.loop):
async with self.session.get(url, headers=get_bin_headers) as resp:
await self.check_http_status(resp)
with open(dl_location, 'wb') as fd:
while True:
with aiohttp.Timeout(60):
chunk = await resp.content.read(chunk_size)
if not chunk:
break
fd.write(chunk)
hash_md5.update(chunk)

# session timeout & single socket read timeout
timeout = ClientTimeout(timeout, sock_read=60)

async with self.session.get(url, headers=get_bin_headers,
timeout=timeout) as resp:

await self.check_http_status(resp)
with open(dl_location, 'wb') as fd:
while True:
chunk, _ = await resp.content.readchunk()

# we are EOF
if not chunk:
break

fd.write(chunk)
hash_md5.update(chunk)

return hash_md5.hexdigest()

async def post_resource(self, api_path, data, **kwargs):
Expand All @@ -259,10 +265,11 @@ async def post_resource(self, api_path, data, **kwargs):
controllerId=self.controller_id,
**kwargs))
self.logger.debug('POST {}'.format(url))
with aiohttp.Timeout(self.timeout):
async with self.session.post(url, headers=post_headers,
data=json.dumps(data)) as resp:
await self.check_http_status(resp)

async with self.session.post(url, headers=post_headers,
data=json.dumps(data),
timeout=ClientTimeout(self.timeout)) as resp:
await self.check_http_status(resp)

async def put_resource(self, api_path, data, **kwargs):
"""
Expand All @@ -285,10 +292,11 @@ async def put_resource(self, api_path, data, **kwargs):
**kwargs))
self.logger.debug('PUT {}'.format(url))
self.logger.debug(json.dumps(data))
with aiohttp.Timeout(self.timeout):
async with self.session.put(url, headers=put_headers,
data=json.dumps(data)) as resp:
await self.check_http_status(resp)

async with self.session.put(url, headers=put_headers,
data=json.dumps(data),
timeout=ClientTimeout(self.timeout)) as resp:
await self.check_http_status(resp)

async def check_http_status(self, resp):
"""Log API error message."""
Expand Down

0 comments on commit ace4fee

Please sign in to comment.