Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dhcp_server] add config dhcp server range #17741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
"ips": "100.1.1.10,10.1.1.11"
},
"DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7": {
"ranges": "range1,range2"
"ranges": "range1,range3"
},
"DHCP_SERVER_IPV4_PORT|Vlan200|Ethernet8": {
"ranges": "range3,range4"
"ranges": "range1,range4"
},
"DHCP_SERVER_IPV4_PORT|Ethernet9": {
"ranges": "range5,range6"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,133 @@ def test_config_dhcp_server_ipv4_update_wrong_netmask(self, mock_db):
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["update"], ["Vlan100", "--netmask=255.255.254"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_add(self, mock_db):
expected_value = {
"range": "10.10.10.10,10.10.10.11"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \
["range4", "10.10.10.10", "10.10.10.11"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range4") == expected_value

def test_config_dhcp_server_ipv4_range_add_existing(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \
["range1", "10.10.10.10", "10.10.10.11"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_add_single_ip(self, mock_db):
expected_value = {
"range": "10.10.10.10,10.10.10.10"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \
["range4", "10.10.10.10"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range4") == expected_value

def test_config_dhcp_server_ipv4_range_add_wrong_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \
["range4", "10.10.10"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_add_wrong_order(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \
["range4", "10.10.10.10", "10.10.10.9"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_update(self, mock_db):
expected_value = {
"range": "10.10.10.10,10.10.10.11"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \
["range1", "10.10.10.10", "10.10.10.11"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == expected_value

def test_config_dhcp_server_ipv4_range_update_nonexisting(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \
["range4", "10.10.10.10", "10.10.10.11"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_update_single_ip(self, mock_db):
expected_value = {
"range": "10.10.10.10,10.10.10.10"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \
["range1", "10.10.10.10"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == expected_value

def test_config_dhcp_server_ipv4_range_update_wrong_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \
["range1", "10.10.10"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_add_wrong_order(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \
["range1", "10.10.10.10", "10.10.10.9"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_delete(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \
["range2"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range2") == False

def test_config_dhcp_server_ipv4_range_delete_nonexisting(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \
["range4"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_delete_referenced(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \
["range1"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

def test_config_dhcp_server_ipv4_range_delete_referenced_force(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \
["range1", "--force"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == False

Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ def test_show_dhcp_server_ipv4_port_without_intf(self, mock_db):
| | 10.1.1.11 |
+-------------------+------------+
| Vlan100|Ethernet7 | range1 |
| | range2 |
| | range3 |
+-------------------+------------+
| Vlan200|Ethernet8 | range3 |
| Vlan200|Ethernet8 | range1 |
| | range4 |
+-------------------+------------+
| Ethernet9 | range5 |
Expand All @@ -237,7 +237,7 @@ def test_show_dhcp_server_ipv4_port_with_port(self, mock_db):
| Interface | Bind |
+===================+========+
| Vlan100|Ethernet7 | range1 |
| | range2 |
| | range3 |
+-------------------+--------+
"""
runner = CliRunner()
Expand All @@ -256,7 +256,7 @@ def test_show_dhcp_server_ipv4_port_with_vlan(self, mock_db):
| | 10.1.1.11 |
+-------------------+------------+
| Vlan100|Ethernet7 | range1 |
| | range2 |
| | range3 |
+-------------------+------------+
"""
runner = CliRunner()
Expand All @@ -271,7 +271,7 @@ def test_show_dhcp_server_ipv4_port_with_port_and_vlan(self, mock_db):
+-------------------+--------+
| Interface | Bind |
+===================+========+
| Vlan200|Ethernet8 | range3 |
| Vlan200|Ethernet8 | range1 |
| | range4 |
+-------------------+--------+
"""
Expand Down
88 changes: 80 additions & 8 deletions dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
SUPPORT_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"]


def validate_str_type(type, value):
def validate_str_type(type_, value):
"""
To validate whether type is consistent with string value
Args:
Expand All @@ -20,27 +20,27 @@ def validate_str_type(type, value):
"""
if not isinstance(value, str):
return False
if type not in SUPPORT_TYPE:
if type_ not in SUPPORT_TYPE:
return False
if type == "string":
if type_ == "string":
return True
if type == "binary":
if type_ == "binary":
if len(value) == 0 or len(value) % 2 != 0:
return False
return all(c in set(string.hexdigits) for c in value)
if type == "boolean":
if type_ == "boolean":
return value in ["true", "false"]
if type == "ipv4-address":
if type_ == "ipv4-address":
try:
if len(value.split(".")) != 4:
return False
return ipaddress.ip_address(value).version == 4
except ValueError:
return False
if type.startswith("uint"):
if type_.startswith("uint"):
if not value.isdigit():
return False
length = int("".join([c for c in type if c.isdigit()]))
length = int("".join([c for c in type_ if c.isdigit()]))
return 0 <= int(value) <= int(pow(2, length)) - 1
return False

Expand Down Expand Up @@ -186,6 +186,78 @@ def dhcp_server_ipv4_disable(db, dhcp_interface):
ctx.fail("Failed to disable, dhcp interface {} does not exist".format(dhcp_interface))


@dhcp_server_ipv4.group(cls=clicommon.AliasedGroup, name="range")
def dhcp_server_ipv4_range():
pass


def count_ipv4(start, end):
ip1 = int(ipaddress.IPv4Address(start))
ip2 = int(ipaddress.IPv4Address(end))
return ip2 - ip1 + 1


@dhcp_server_ipv4_range.command(name="add")
@click.argument("range_name", required=True)
@click.argument("ip_start", required=True)
@click.argument("ip_end", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_range_add(db, range_name, ip_start, ip_end):
ctx = click.get_current_context()
if not ip_end:
ip_end = ip_start
if not validate_str_type("ipv4-address", ip_start) or not validate_str_type("ipv4-address", ip_end):
ctx.fail("ip_start or ip_end is not valid ipv4 address")
if count_ipv4(ip_start, ip_end) < 1:
ctx.fail("range value is illegal")
dbconn = db.db
key = "DHCP_SERVER_IPV4_RANGE|" + range_name
if dbconn.exists("CONFIG_DB", key):
ctx.fail("Range {} already exist".format(range_name))
else:
dbconn.hmset("CONFIG_DB", key, {"range": ip_start + "," + ip_end})


@dhcp_server_ipv4_range.command(name="update")
@click.argument("range_name", required=True)
@click.argument("ip_start", required=True)
@click.argument("ip_end", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_range_update(db, range_name, ip_start, ip_end):
ctx = click.get_current_context()
if not ip_end:
ip_end = ip_start
if not validate_str_type("ipv4-address", ip_start) or not validate_str_type("ipv4-address", ip_end):
ctx.fail("ip_start or ip_end is not valid ipv4 address")
if count_ipv4(ip_start, ip_end) < 1:
ctx.fail("range value is illegal")
dbconn = db.db
key = "DHCP_SERVER_IPV4_RANGE|" + range_name
if dbconn.exists("CONFIG_DB", key):
dbconn.set("CONFIG_DB", key, "range", ip_start + "," + ip_end)
else:
ctx.fail("Range {} does not exist, cannot update".format(range_name))


@dhcp_server_ipv4_range.command(name="del")
@click.argument("range_name", required=True)
@click.option("--force", required=False, default=False, is_flag=True)
@clicommon.pass_db
def dhcp_sever_ipv4_range_del(db, range_name, force):
ctx = click.get_current_context()
dbconn = db.db
key = "DHCP_SERVER_IPV4_RANGE|" + range_name
if dbconn.exists("CONFIG_DB", key):
Copy link
Contributor

@yaqiangz yaqiangz Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add checks to make sure range is not used in DHCP_SERVER_IPV4 table and add a not required argument '--force' to bypass the check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

if not force:
for port in dbconn.keys("CONFIG_DB", "DHCP_SERVER_IPV4_PORT*"):
ranges = dbconn.get("CONFIG_DB", port, "ranges")
if ranges and range_name in ranges.split(","):
ctx.fail("Range {} is referenced in {}, cannot delete, add --force to bypass".format(range_name, port))
dbconn.delete("CONFIG_DB", key)
else:
ctx.fail("Range {} does not exist, cannot delete".format(range_name))


def register(cli):
# cli.add_command(dhcp_server)
pass
Expand Down
Loading