diff --git a/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json b/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json index 6183453aff80..773b7e178ab5 100644 --- a/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json +++ b/dockers/docker-dhcp-server/cli-plugin-tests/mock_config_db.json @@ -49,10 +49,10 @@ "ips": "100.1.1.10,10.1.1.11" }, "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7": { - "ranges": "range1,range2" + "ranges": "range1,range3" }, "DHCP_SERVER_IPV4_PORT|Vlan200|Ethernet8": { - "ranges": "range3,range4" + "ranges": "range1,range4" }, "DHCP_SERVER_IPV4_PORT|Ethernet9": { "ranges": "range5,range6" diff --git a/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py b/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py index 9cc12aa3cacc..612f229cc8ee 100644 --- a/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py +++ b/dockers/docker-dhcp-server/cli-plugin-tests/test_config_dhcp_server.py @@ -240,3 +240,133 @@ def test_config_dhcp_server_ipv4_update_wrong_netmask(self, mock_db): result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["update"], ["Vlan100", "--netmask=255.255.254"], obj=db) assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + def test_config_dhcp_server_ipv4_range_add(self, mock_db): + expected_value = { + "range": "10.10.10.10,10.10.10.11" + } + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \ + ["range4", "10.10.10.10", "10.10.10.11"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range4") == expected_value + + def test_config_dhcp_server_ipv4_range_add_existing(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \ + ["range1", "10.10.10.10", "10.10.10.11"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_add_single_ip(self, mock_db): + expected_value = { + "range": "10.10.10.10,10.10.10.10" + } + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \ + ["range4", "10.10.10.10"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range4") == expected_value + + def test_config_dhcp_server_ipv4_range_add_wrong_ip(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \ + ["range4", "10.10.10"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_add_wrong_order(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["add"], \ + ["range4", "10.10.10.10", "10.10.10.9"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_update(self, mock_db): + expected_value = { + "range": "10.10.10.10,10.10.10.11" + } + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \ + ["range1", "10.10.10.10", "10.10.10.11"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == expected_value + + def test_config_dhcp_server_ipv4_range_update_nonexisting(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \ + ["range4", "10.10.10.10", "10.10.10.11"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_update_single_ip(self, mock_db): + expected_value = { + "range": "10.10.10.10,10.10.10.10" + } + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \ + ["range1", "10.10.10.10"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == expected_value + + def test_config_dhcp_server_ipv4_range_update_wrong_ip(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \ + ["range1", "10.10.10"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_add_wrong_order(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["update"], \ + ["range1", "10.10.10.10", "10.10.10.9"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_delete(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \ + ["range2"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range2") == False + + def test_config_dhcp_server_ipv4_range_delete_nonexisting(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \ + ["range4"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_delete_referenced(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \ + ["range1"], obj=db) + assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + + def test_config_dhcp_server_ipv4_range_delete_referenced_force(self, mock_db): + runner = CliRunner() + db = clicommon.Db() + db.db = mock_db + result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["range"].commands["del"], \ + ["range1", "--force"], obj=db) + assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info) + assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == False + diff --git a/dockers/docker-dhcp-server/cli-plugin-tests/test_show_dhcp_server.py b/dockers/docker-dhcp-server/cli-plugin-tests/test_show_dhcp_server.py index 1373dfe0bf5b..f09c0a9c86ae 100644 --- a/dockers/docker-dhcp-server/cli-plugin-tests/test_show_dhcp_server.py +++ b/dockers/docker-dhcp-server/cli-plugin-tests/test_show_dhcp_server.py @@ -215,9 +215,9 @@ def test_show_dhcp_server_ipv4_port_without_intf(self, mock_db): | | 10.1.1.11 | +-------------------+------------+ | Vlan100|Ethernet7 | range1 | -| | range2 | +| | range3 | +-------------------+------------+ -| Vlan200|Ethernet8 | range3 | +| Vlan200|Ethernet8 | range1 | | | range4 | +-------------------+------------+ | Ethernet9 | range5 | @@ -237,7 +237,7 @@ def test_show_dhcp_server_ipv4_port_with_port(self, mock_db): | Interface | Bind | +===================+========+ | Vlan100|Ethernet7 | range1 | -| | range2 | +| | range3 | +-------------------+--------+ """ runner = CliRunner() @@ -256,7 +256,7 @@ def test_show_dhcp_server_ipv4_port_with_vlan(self, mock_db): | | 10.1.1.11 | +-------------------+------------+ | Vlan100|Ethernet7 | range1 | -| | range2 | +| | range3 | +-------------------+------------+ """ runner = CliRunner() @@ -271,7 +271,7 @@ def test_show_dhcp_server_ipv4_port_with_port_and_vlan(self, mock_db): +-------------------+--------+ | Interface | Bind | +===================+========+ -| Vlan200|Ethernet8 | range3 | +| Vlan200|Ethernet8 | range1 | | | range4 | +-------------------+--------+ """ diff --git a/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py b/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py index 9abd5a275fd8..237550a4b803 100644 --- a/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py +++ b/dockers/docker-dhcp-server/cli/config/plugins/dhcp_server.py @@ -8,7 +8,7 @@ SUPPORT_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"] -def validate_str_type(type, value): +def validate_str_type(type_, value): """ To validate whether type is consistent with string value Args: @@ -20,27 +20,27 @@ def validate_str_type(type, value): """ if not isinstance(value, str): return False - if type not in SUPPORT_TYPE: + if type_ not in SUPPORT_TYPE: return False - if type == "string": + if type_ == "string": return True - if type == "binary": + if type_ == "binary": if len(value) == 0 or len(value) % 2 != 0: return False return all(c in set(string.hexdigits) for c in value) - if type == "boolean": + if type_ == "boolean": return value in ["true", "false"] - if type == "ipv4-address": + if type_ == "ipv4-address": try: if len(value.split(".")) != 4: return False return ipaddress.ip_address(value).version == 4 except ValueError: return False - if type.startswith("uint"): + if type_.startswith("uint"): if not value.isdigit(): return False - length = int("".join([c for c in type if c.isdigit()])) + length = int("".join([c for c in type_ if c.isdigit()])) return 0 <= int(value) <= int(pow(2, length)) - 1 return False @@ -186,6 +186,78 @@ def dhcp_server_ipv4_disable(db, dhcp_interface): ctx.fail("Failed to disable, dhcp interface {} does not exist".format(dhcp_interface)) +@dhcp_server_ipv4.group(cls=clicommon.AliasedGroup, name="range") +def dhcp_server_ipv4_range(): + pass + + +def count_ipv4(start, end): + ip1 = int(ipaddress.IPv4Address(start)) + ip2 = int(ipaddress.IPv4Address(end)) + return ip2 - ip1 + 1 + + +@dhcp_server_ipv4_range.command(name="add") +@click.argument("range_name", required=True) +@click.argument("ip_start", required=True) +@click.argument("ip_end", required=False) +@clicommon.pass_db +def dhcp_server_ipv4_range_add(db, range_name, ip_start, ip_end): + ctx = click.get_current_context() + if not ip_end: + ip_end = ip_start + if not validate_str_type("ipv4-address", ip_start) or not validate_str_type("ipv4-address", ip_end): + ctx.fail("ip_start or ip_end is not valid ipv4 address") + if count_ipv4(ip_start, ip_end) < 1: + ctx.fail("range value is illegal") + dbconn = db.db + key = "DHCP_SERVER_IPV4_RANGE|" + range_name + if dbconn.exists("CONFIG_DB", key): + ctx.fail("Range {} already exist".format(range_name)) + else: + dbconn.hmset("CONFIG_DB", key, {"range": ip_start + "," + ip_end}) + + +@dhcp_server_ipv4_range.command(name="update") +@click.argument("range_name", required=True) +@click.argument("ip_start", required=True) +@click.argument("ip_end", required=False) +@clicommon.pass_db +def dhcp_server_ipv4_range_update(db, range_name, ip_start, ip_end): + ctx = click.get_current_context() + if not ip_end: + ip_end = ip_start + if not validate_str_type("ipv4-address", ip_start) or not validate_str_type("ipv4-address", ip_end): + ctx.fail("ip_start or ip_end is not valid ipv4 address") + if count_ipv4(ip_start, ip_end) < 1: + ctx.fail("range value is illegal") + dbconn = db.db + key = "DHCP_SERVER_IPV4_RANGE|" + range_name + if dbconn.exists("CONFIG_DB", key): + dbconn.set("CONFIG_DB", key, "range", ip_start + "," + ip_end) + else: + ctx.fail("Range {} does not exist, cannot update".format(range_name)) + + +@dhcp_server_ipv4_range.command(name="del") +@click.argument("range_name", required=True) +@click.option("--force", required=False, default=False, is_flag=True) +@clicommon.pass_db +def dhcp_sever_ipv4_range_del(db, range_name, force): + ctx = click.get_current_context() + dbconn = db.db + key = "DHCP_SERVER_IPV4_RANGE|" + range_name + if dbconn.exists("CONFIG_DB", key): + if not force: + for port in dbconn.keys("CONFIG_DB", "DHCP_SERVER_IPV4_PORT*"): + ranges = dbconn.get("CONFIG_DB", port, "ranges") + if ranges and range_name in ranges.split(","): + ctx.fail("Range {} is referenced in {}, cannot delete, add --force to bypass".format(range_name, port)) + dbconn.delete("CONFIG_DB", key) + else: + ctx.fail("Range {} does not exist, cannot delete".format(range_name)) + + def register(cli): # cli.add_command(dhcp_server) pass