Skip to content

Commit

Permalink
Add item_callback hook.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdoehring committed Jun 5, 2021
1 parent 5e69bde commit 3da8704
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
32 changes: 22 additions & 10 deletions cdip_connector/core/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
logger = logging.getLogger(__name__)
logger.setLevel(cdip_settings.LOG_LEVEL)

CLIENT_TIMEOUT_TOTAL=180 # seconds
CLIENT_TIMEOUT_TOTAL = 180 # seconds


# todo
async def gather_with_semaphore(n, *tasks):
Expand Down Expand Up @@ -57,7 +58,8 @@ async def main(self) -> None:
logger.info(result)
except Exception as ex:
self.metrics.incr_count(MetricsEnum.ERRORS)
logger.exception(f'Exception raised {ex}')
logger.exception('Uncaught exception in main.')
raise

async def extract_load(self,
session: ClientSession,
Expand All @@ -68,17 +70,17 @@ async def extract_load(self,
async for extracted in self.extract(session, integration):

if extracted is not None:
logger.info(f'{integration.login}:{integration.id} {len(extracted)} recs to send')
if extracted:
logger.info(f'first transformed payload: {extracted[0]}')
logger.info(f'{integration.login}:{integration.id} {len(extracted)} records to send')

await self.load(session, extracted)
await self.update_state(session, integration)

self.metrics.incr_count(MetricsEnum.TO_CDIP, len(extracted))
total += len(extracted)

if not total:
logger.info(f'{integration.login}:{integration.id} Nothing to send to SIntegrate')

return total

@abstractmethod
Expand All @@ -87,6 +89,9 @@ async def extract(self,
integration_info: IntegrationInformation) -> AsyncGenerator[List[CDIPBaseModel], None]:
s = yield 0 # unreachable, but makes the return type AsyncGenerator, expected by caller

def item_callback(self, item):
pass

async def update_state(self,
session: ClientSession,
integration_info: IntegrationInformation) -> None:
Expand All @@ -100,21 +105,28 @@ async def load(self,

def generate_batches(iterable, n=self.load_batch_size):
for i in range(0, len(iterable), n):
yield iterable[i:i+n]
yield iterable[i:i + n]

logger.info(f'Posting to: {cdip_settings.CDIP_API_ENDPOINT}')
for i, batch in enumerate(generate_batches(transformed_data)):

logger.debug(f'r1 is: {batch[0]}')
clean_batch = [json.loads(r.json()) for r in batch]

for j in range(2):
logger.debug('sending batch no. %d, length=%d', i, len(batch))
clean_batch = [json.loads(r.json()) for r in batch]

logger.debug('sending batch.', extra={'batch_no': i, 'length': len(batch), 'attempt': j})

client_response = await session.post(url=cdip_settings.CDIP_API_ENDPOINT,
headers=headers,
json=clean_batch)
headers=headers,
json=clean_batch)

# Catch to attemp to re-authorized
if client_response.status == 401:
headers = await self.portal.get_auth_header(session)
else:
[self.item_callback(item) for item in batch]
client_response.raise_for_status()
break


2 changes: 1 addition & 1 deletion cdip_connector/core/portal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def update_states_with_dict(self,
headers = await self.get_auth_header(session)
response = await session.post(url=f'{self.device_states_endpoint}/update/{inbound_id}',
headers=headers,
json=states_dict)
json=states_dict,)
response.raise_for_status()
text = await response.text()
logger.info(f'update device_states resp: {response.status}')
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cdip_connector"
version = "0.6.3"
version = "0.6.4"
description = "SMART Integrate Connector Library"
authors = [
"Rohit Chaudhri <rohitc@vulcan.com>",
Expand Down

0 comments on commit 3da8704

Please sign in to comment.