Skip to content

Commit

Permalink
Reduce identifier complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
adferrand committed Apr 18, 2022
1 parent e067277 commit c73ea30
Showing 1 changed file with 30 additions and 11 deletions.
41 changes: 30 additions & 11 deletions lexicon/providers/misaka.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Module provider for Misaka.IO"""
from __future__ import absolute_import
import base64
import json
import logging
from typing import Tuple

import requests
from lexicon.providers.base import Provider as BaseProvider
Expand Down Expand Up @@ -57,8 +59,8 @@ def _authenticate(self):
def _create_record(self, rtype, name, content):
name = self._relative_name(name)

identifier = f'{self.domain_id}/recordsets/{name}/{rtype}'
endpoint = f'/zones/{identifier}'
identifier = self._identifier_encode(rtype, name)
endpoint = f'/zones/{self.domain_id}/recordsets/{name}/{rtype}'
ttl = self._get_lexicon_option('ttl')
record = {'value': content}

Expand Down Expand Up @@ -108,9 +110,7 @@ def _list_records(self, rtype=None, name=None, content=None):
'name': self._full_name(recordset['name']),
'type': recordset['type'],
'ttl': recordset['ttl'],

# id is useless here as we don't have record ID exposed to customers
'id': f'{self.domain_id}/recordsets/{recordset["name"]}/{recordset["type"]}',
'id': self._identifier_encode(recordset["type"], recordset["name"]),
}

for record in recordset['records']:
Expand All @@ -127,21 +127,23 @@ def _update_record(self, identifier, rtype=None, name=None, content=None):
if name:
name = self._relative_name(name)

new_identifier = f'{self.domain_id}/recordsets/{name}/{rtype}'
new_identifier = self._identifier_encode(rtype, name)

if (new_identifier == identifier or (rtype is None and name is None)):
# the identifier hasn't changed, or type and name are both unspecified,
# only update the content.
data = {
'records': {"value": content}
}
self._put(f'/zones/{identifier}', data)
target_rtype, target_name = self._identifier_decode(identifier)
self._put(f'/zones/{self.domain_id}/recordsets/{target_name}/{target_rtype}', data)
else:
if not identifier:
identifier = new_identifier
# identifiers are different
# get the old record, create a new one with updated data, delete the old record.
old_record = self._get(f'/zones/{identifier}')
target_rtype, target_name = self._identifier_decode(identifier)
old_record = self._get(f'/zones/{self.domain_id}/recordsets/{target_name}/{target_rtype}')
self.create_record(
rtype or old_record['type'],
name or old_record['domain'],
Expand All @@ -157,11 +159,12 @@ def _delete_record(self, identifier=None, rtype=None, name=None, content=None):
if not identifier:
if name:
name = self._relative_name(name)
identifier = f'{self.domain_id}/recordsets/{name}/{rtype}'
identifier = self._identifier_encode(rtype, name)
should_call_delete_api = not self._delete_record_with_identifier(identifier, rtype, name, content)

if should_call_delete_api:
self._delete(f'/zones/{identifier}')
rtype, name = self._identifier_decode(identifier)
self._delete(f'/zones/{self.domain_id}/recordsets/{name}/{rtype}')

LOGGER.debug('delete_record: %s', True)
return True
Expand All @@ -184,7 +187,8 @@ def _delete_record_with_identifier(self, identifier, rtype, name, content):
if not recordset['records']:
return False

self._put(f'/zones/{identifier}', recordset)
rtype, name = self._identifier_decode(identifier)
self._put(f'/zones/{self.domain_id}/recordsets/{name}/{rtype}', recordset)
return True

# Helpers
Expand Down Expand Up @@ -226,3 +230,18 @@ def _get_recordset(self, name, rtype):
'records': payload['records'],
'filters': payload['filters'],
}

def _identifier_decode(self, identifier: str) -> Tuple[str, str]:
padding = 4 - (len(identifier) % 4)
chain = identifier + ("=" * padding)
decoded = base64.urlsafe_b64decode(chain)
if not decoded:
raise ValueError(f"Invalid identifier: {identifier}")
extracted = decoded.decode("utf-8").split("/")
if len(extracted) < 2:
raise ValueError(f"Invalid identifier: {identifier}")
return extracted[0], extracted[1]

def _identifier_encode(self, rtype: str, name: str) -> str:
encoded = base64.urlsafe_b64encode(f"{rtype}/{self._relative_name(name)}".encode("utf-8"))
return encoded.decode("utf-8").rstrip("=")

0 comments on commit c73ea30

Please sign in to comment.