Skip to content

Commit

Permalink
Version 3.19.0
Browse files Browse the repository at this point in the history
  • Loading branch information
adferrand committed Dec 6, 2024
1 parent dde8b5f commit 537b64f
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 65 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## master - CURRENT

## 3.19.0 - 06/12/2024
### Added
* Add `regfish` provider (#2102)
* Add `ionos` provider (#2127)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "dns-lexicon"
version = "3.18.0"
version = "3.19.0"
description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way"
license = "MIT"
keywords = [
Expand Down
77 changes: 38 additions & 39 deletions src/lexicon/_private/providers/ionos.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,69 @@

from lexicon.interfaces import Provider as BaseProvider


_ZONES_API = 'https://api.hosting.ionos.com/dns/v1/zones'
_ZONES_API = "https://api.hosting.ionos.com/dns/v1/zones"


class Provider(BaseProvider):

@staticmethod
def get_nameservers():
return ['ui-dns.com', 'ui-dns.org', 'ui-dns.de', 'ui-dns.biz']
return ["ui-dns.com", "ui-dns.org", "ui-dns.de", "ui-dns.biz"]

@staticmethod
def configure_parser(parser):
parser.add_argument(
'--api-key',
"--api-key",
required=True,
help='IONOS api key: public prefix + period + key proper',
help="IONOS api key: public prefix + period + key proper",
)

def authenticate(self):
zones = self._get(_ZONES_API)
for zone in zones:
if zone['name'] == self.domain:
self.domain_id = zone['id']
if zone["name"] == self.domain:
self.domain_id = zone["id"]
return
raise Exception('domain not found: ' + self.domain)
raise Exception("domain not found: " + self.domain)

def create_record(self, rtype, name, content):
self._post(
_ZONES_API + '/' + self.domain_id + '/records',
data=[{
'name': self._full_name(name),
'type': rtype,
'content': content,
'ttl': self._get_lexicon_option('ttl'),
'prio': 0,
'disabled': False,
}],
_ZONES_API + "/" + self.domain_id + "/records",
data=[
{
"name": self._full_name(name),
"type": rtype,
"content": content,
"ttl": self._get_lexicon_option("ttl"),
"prio": 0,
"disabled": False,
}
],
)
return True

def list_records(self, rtype=None, name=None, content=None):
query_params = {}
if rtype:
query_params['recordType'] = rtype
query_params["recordType"] = rtype
if name:
query_params['recordName'] = self._full_name(name)
data = self._get(_ZONES_API + '/' + self.domain_id, query_params)
records = data['records']
records = [{
'type': r['type'],
'name': r['name'],
'ttl': r['ttl'],
'content': r['content'],
'id': r['id'],
} for r in records]
query_params["recordName"] = self._full_name(name)
data = self._get(_ZONES_API + "/" + self.domain_id, query_params)
records = data["records"]
records = [
{
"type": r["type"],
"name": r["name"],
"ttl": r["ttl"],
"content": r["content"],
"id": r["id"],
}
for r in records
]
for record in records:
self._clean_TXT_record(record)
if content:
records = [r for r in records if r['content'] == content]
records = [r for r in records if r["content"] == content]
return records

def update_record(self, identifier, rtype, name, content):
Expand All @@ -71,25 +75,20 @@ def delete_record(self, identifier=None, rtype=None, name=None, content=None):
if identifier:
identifiers = [identifier]
else:
identifiers = [
r['id']
for r in self.list_records(rtype, name, content)
]
identifiers = [r["id"] for r in self.list_records(rtype, name, content)]
for identifier in identifiers:
self._delete(
_ZONES_API + '/' + self.domain_id + '/records/' + identifier
)
self._delete(_ZONES_API + "/" + self.domain_id + "/records/" + identifier)
return True

def _request(self, action='GET', url='/', data=None, query_params=None):
def _request(self, action="GET", url="/", data=None, query_params=None):
response = requests.request(
action,
url,
params=query_params,
json=data,
headers={
'accept': 'application/json',
'x-api-key': self._get_provider_option('api_key'),
"accept": "application/json",
"x-api-key": self._get_provider_option("api_key"),
},
)
response.raise_for_status()
Expand Down
24 changes: 17 additions & 7 deletions src/lexicon/_private/providers/regfish.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def create_record(self, rtype, name, content):
payload = self._post("/dns/rr", data)
except requests.exceptions.HTTPError as err:
json_res = err.response.json()
if not json_res["code"] if "code" in json_res else 0 != 15003: # ResourceRecordAlreadyExists
if (
not json_res["code"] if "code" in json_res else 0 != 15003
): # ResourceRecordAlreadyExists
raise

if payload["code"] if "code" in payload else 0 != 0:
Expand Down Expand Up @@ -91,10 +93,14 @@ def list_records(self, rtype=None, name=None, content=None):
# Apply filters on rtype, name, and content if they are provided
if rtype or name or content:
records = [
record for record in records
if (not rtype or record['type'] == rtype) and
(not name or record['name'] == self._full_name(name)) and
(not content or record['content'] == self._format_content(rtype, content))
record
for record in records
if (not rtype or record["type"] == rtype)
and (not name or record["name"] == self._full_name(name))
and (
not content
or record["content"] == self._format_content(rtype, content)
)
]

LOGGER.debug("list_records after filtering: %s", records)
Expand All @@ -108,9 +114,13 @@ def update_record(self, identifier, rtype=None, name=None, content=None):
if len(records) == 1:
identifier = records[0]["id"]
elif len(records) < 1:
raise Exception("No records found matching type and name - won't update")
raise Exception(
"No records found matching type and name - won't update"
)
else:
raise Exception("Multiple records found matching type and name - won't update")
raise Exception(
"Multiple records found matching type and name - won't update"
)

content = self._format_content(rtype, content)
data = {}
Expand Down
5 changes: 3 additions & 2 deletions tests/providers/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ def __init__(self):
self.provider_module = None

def setup_method(self, _):
warnings.filterwarnings("ignore", category=ResourceWarning,
message="unclosed.*<ssl.SSLSocket.*>")
warnings.filterwarnings(
"ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>"
)
self.provider_module = load_provider_module(self.provider_name)

###########################################################################
Expand Down
33 changes: 18 additions & 15 deletions tests/providers/test_ionos.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Integration tests for IONOS"""

import json
import os
from unittest import TestCase
Expand All @@ -12,36 +13,38 @@
class IONOSProviderTests(TestCase, IntegrationTestsV2):
"""Integration tests for IONOS provider"""

provider_name = 'ionos'
domain = os.environ.get('LEXICON_IONOS_DOMAIN', 'example.com')
provider_name = "ionos"
domain = os.environ.get("LEXICON_IONOS_DOMAIN", "example.com")

def _filter_request(self, request):
request.uri = request.uri.replace(self.domain, 'example.com')
request.uri = request.uri.replace(self.domain, "example.com")
if request.body:
body = request.body.decode('utf-8')
body = body.replace(self.domain, 'example.com')
request.body = body.encode('utf-8')
body = request.body.decode("utf-8")
body = body.replace(self.domain, "example.com")
request.body = body.encode("utf-8")
return request

def _filter_headers(self):
return ['x-api-key']
return ["x-api-key"]

def _filter_response(self, response):
for key in ['Set-Cookie', 'x-b3-traceid']:
response['headers'].pop(key, None)
body = response['body']['string'].decode('utf-8')
for key in ["Set-Cookie", "x-b3-traceid"]:
response["headers"].pop(key, None)
body = response["body"]["string"].decode("utf-8")
try:
data = json.loads(body)
if isinstance(data, list):
data = [e for e in data if not self._is_unrelated_zone(e)]
body = json.dumps(data)
except json.JSONDecodeError:
pass
body = body.replace(self.domain, 'example.com')
response['body']['string'] = body.encode('utf-8')
body = body.replace(self.domain, "example.com")
response["body"]["string"] = body.encode("utf-8")
return response

def _is_unrelated_zone(self, entry):
return isinstance(entry, dict) \
and 'name' in entry \
and not entry['name'].endswith(self.domain)
return (
isinstance(entry, dict)
and "name" in entry
and not entry["name"].endswith(self.domain)
)
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 537b64f

Please sign in to comment.