diff --git a/config/main.py b/config/main.py index de461557da..b70fd52264 100644 --- a/config/main.py +++ b/config/main.py @@ -2956,36 +2956,50 @@ def warm_restart_enable(ctx, module): @click.argument('seconds', metavar='', required=True, type=int) @click.pass_context def warm_restart_neighsyncd_timer(ctx, seconds): - db = ctx.obj['db'] - if seconds not in range(1, 9999): - ctx.fail("neighsyncd warm restart timer must be in range 1-9999") - db.mod_entry('WARM_RESTART', 'swss', {'neighsyncd_timer': seconds}) + db = ValidatedConfigDBConnector(ctx.obj['db']) + if ADHOC_VALIDATION: + if seconds not in range(1, 9999): + ctx.fail("neighsyncd warm restart timer must be in range 1-9999") + try: + db.mod_entry('WARM_RESTART', 'swss', {'neighsyncd_timer': seconds}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @warm_restart.command('bgp_timer') @click.argument('seconds', metavar='', required=True, type=int) @click.pass_context def warm_restart_bgp_timer(ctx, seconds): - db = ctx.obj['db'] - if seconds not in range(1, 3600): - ctx.fail("bgp warm restart timer must be in range 1-3600") - db.mod_entry('WARM_RESTART', 'bgp', {'bgp_timer': seconds}) + db = ValidatedConfigDBConnector(ctx.obj['db']) + if ADHOC_VALIDATION: + if seconds not in range(1, 3600): + ctx.fail("bgp warm restart timer must be in range 1-3600") + try: + db.mod_entry('WARM_RESTART', 'bgp', {'bgp_timer': seconds}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @warm_restart.command('teamsyncd_timer') @click.argument('seconds', metavar='', required=True, type=int) @click.pass_context def warm_restart_teamsyncd_timer(ctx, seconds): - db = ctx.obj['db'] - if seconds not in range(1, 3600): - ctx.fail("teamsyncd warm restart timer must be in range 1-3600") - db.mod_entry('WARM_RESTART', 'teamd', {'teamsyncd_timer': seconds}) + db = ValidatedConfigDBConnector(ctx.obj['db']) + if ADHOC_VALIDATION: + if seconds not in range(1, 3600): + ctx.fail("teamsyncd warm restart timer must be in range 1-3600") + try: + db.mod_entry('WARM_RESTART', 'teamd', {'teamsyncd_timer': seconds}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @warm_restart.command('bgp_eoiu') @click.argument('enable', metavar='', default='true', required=False, type=click.Choice(["true", "false"])) @click.pass_context def warm_restart_bgp_eoiu(ctx, enable): - db = ctx.obj['db'] - db.mod_entry('WARM_RESTART', 'bgp', {'bgp_eoiu': enable}) - + db = ValidatedConfigDBConnector(ctx.obj['db']) + try: + db.mod_entry('WARM_RESTART', 'bgp', {'bgp_eoiu': enable}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) def vrf_add_management_vrf(config_db): """Enable management vrf in config DB""" @@ -2994,7 +3008,11 @@ def vrf_add_management_vrf(config_db): if entry and entry['mgmtVrfEnabled'] == 'true' : click.echo("ManagementVRF is already Enabled.") return None - config_db.mod_entry('MGMT_VRF_CONFIG', "vrf_global", {"mgmtVrfEnabled": "true"}) + try: + config_db.mod_entry('MGMT_VRF_CONFIG', "vrf_global", {"mgmtVrfEnabled": "true"}) + except ValueError as e: + ctx = click.get_current_context() + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) def vrf_delete_management_vrf(config_db): @@ -3004,7 +3022,11 @@ def vrf_delete_management_vrf(config_db): if not entry or entry['mgmtVrfEnabled'] == 'false' : click.echo("ManagementVRF is already Disabled.") return None - config_db.mod_entry('MGMT_VRF_CONFIG', "vrf_global", {"mgmtVrfEnabled": "false"}) + try: + config_db.mod_entry('MGMT_VRF_CONFIG', "vrf_global", {"mgmtVrfEnabled": "false"}) + except ValueError as e: + ctx = click.get_current_context() + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @config.group(cls=clicommon.AbbreviationGroup) @@ -4833,26 +4855,30 @@ def remove_queue(db, interface_name, queue_map): @click.pass_context def cable_length(ctx, interface_name, length): """Set interface cable length""" - config_db = ctx.obj["config_db"] + config_db = ValidatedConfigDBConnector(ctx.obj["config_db"]) if not is_dynamic_buffer_enabled(config_db): ctx.fail("This command can only be supported on a system with dynamic buffer enabled") + + if ADHOC_VALIDATION: + # Check whether port is legal + ports = config_db.get_entry("PORT", interface_name) + if not ports: + ctx.fail("Port {} doesn't exist".format(interface_name)) - # Check whether port is legal - ports = config_db.get_entry("PORT", interface_name) - if not ports: - ctx.fail("Port {} doesn't exist".format(interface_name)) - - try: - assert "m" == length[-1] - except Exception: - ctx.fail("Invalid cable length. Should be in format m, like 300m".format(cable_length)) + try: + assert "m" == length[-1] + except Exception: + ctx.fail("Invalid cable length. Should be in format m, like 300m".format(cable_length)) keys = config_db.get_keys("CABLE_LENGTH") cable_length_set = {} cable_length_set[interface_name] = length - config_db.mod_entry("CABLE_LENGTH", keys[0], cable_length_set) + try: + config_db.mod_entry("CABLE_LENGTH", keys[0], cable_length_set) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # # 'transceiver' subgroup ('config interface transceiver ...') @@ -5269,7 +5295,7 @@ def vrf(ctx): @click.pass_context def add_vrf(ctx, vrf_name): """Add vrf""" - config_db = ctx.obj['config_db'] + config_db = ValidatedConfigDBConnector(ctx.obj['config_db']) if not vrf_name.startswith("Vrf") and not (vrf_name == 'mgmt') and not (vrf_name == 'management'): ctx.fail("'vrf_name' must begin with 'Vrf' or named 'mgmt'/'management' in case of ManagementVRF.") if len(vrf_name) > 15: @@ -5279,14 +5305,17 @@ def add_vrf(ctx, vrf_name): elif (vrf_name == 'mgmt' or vrf_name == 'management'): vrf_add_management_vrf(config_db) else: - config_db.set_entry('VRF', vrf_name, {"NULL": "NULL"}) + try: + config_db.set_entry('VRF', vrf_name, {"NULL": "NULL"}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vrf.command('del') @click.argument('vrf_name', metavar='', required=True) @click.pass_context def del_vrf(ctx, vrf_name): """Del vrf""" - config_db = ctx.obj['config_db'] + config_db = ValidatedConfigDBConnector(ctx.obj['config_db']) if not vrf_name.startswith("Vrf") and not (vrf_name == 'mgmt') and not (vrf_name == 'management'): ctx.fail("'vrf_name' must begin with 'Vrf' or named 'mgmt'/'management' in case of ManagementVRF.") if len(vrf_name) > 15: @@ -5303,7 +5332,10 @@ def del_vrf(ctx, vrf_name): vrf_delete_management_vrf(config_db) else: del_interface_bind_to_vrf(config_db, vrf_name) - config_db.set_entry('VRF', vrf_name, None) + try: + config_db.set_entry('VRF', vrf_name, None) + except JsonPatchConflict as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) click.echo("VRF {} deleted and all associated IP addresses removed.".format(vrf_name)) @vrf.command('add_vrf_vni_map') @@ -6424,7 +6456,7 @@ def sflow(ctx): @click.pass_context def enable(ctx): """Enable sFlow""" - config_db = ctx.obj['db'] + config_db = ValidatedConfigDBConnector(ctx.obj['db']) sflow_tbl = config_db.get_table('SFLOW') if not sflow_tbl: @@ -6432,7 +6464,10 @@ def enable(ctx): else: sflow_tbl['global']['admin_state'] = 'up' - config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + try: + config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) try: proc = subprocess.Popen("systemctl is-active sflow", shell=True, text=True, stdout=subprocess.PIPE) @@ -6452,7 +6487,7 @@ def enable(ctx): @click.pass_context def disable(ctx): """Disable sFlow""" - config_db = ctx.obj['db'] + config_db = ValidatedConfigDBConnector(ctx.obj['db']) sflow_tbl = config_db.get_table('SFLOW') if not sflow_tbl: @@ -6460,7 +6495,10 @@ def disable(ctx): else: sflow_tbl['global']['admin_state'] = 'down' - config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + try: + config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # # 'sflow' command ('config sflow polling-interval ...') @@ -6471,17 +6509,21 @@ def disable(ctx): @click.pass_context def polling_int(ctx, interval): """Set polling-interval for counter-sampling (0 to disable)""" - if interval not in range(5, 301) and interval != 0: - click.echo("Polling interval must be between 5-300 (0 to disable)") + if ADHOC_VALIDATION: + if interval not in range(5, 301) and interval != 0: + click.echo("Polling interval must be between 5-300 (0 to disable)") - config_db = ctx.obj['db'] + config_db = ValidatedConfigDBConnector(ctx.obj['db']) sflow_tbl = config_db.get_table('SFLOW') if not sflow_tbl: sflow_tbl = {'global': {'admin_state': 'down'}} sflow_tbl['global']['polling_interval'] = interval - config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + try: + config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) def is_valid_sample_rate(rate): return rate.isdigit() and int(rate) in range(256, 8388608 + 1) @@ -6503,18 +6545,25 @@ def interface(ctx): @click.argument('ifname', metavar='', required=True, type=str) @click.pass_context def enable(ctx, ifname): - config_db = ctx.obj['db'] - if not interface_name_is_valid(config_db, ifname) and ifname != 'all': - click.echo("Invalid interface name") - return + config_db = ValidatedConfigDBConnector(ctx.obj['db']) + if ADHOC_VALIDATION: + if not interface_name_is_valid(config_db, ifname) and ifname != 'all': + click.echo("Invalid interface name") + return intf_dict = config_db.get_table('SFLOW_SESSION') if intf_dict and ifname in intf_dict: intf_dict[ifname]['admin_state'] = 'up' - config_db.mod_entry('SFLOW_SESSION', ifname, intf_dict[ifname]) + try: + config_db.mod_entry('SFLOW_SESSION', ifname, intf_dict[ifname]) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) else: - config_db.mod_entry('SFLOW_SESSION', ifname, {'admin_state': 'up'}) + try: + config_db.mod_entry('SFLOW_SESSION', ifname, {'admin_state': 'up'}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # # 'sflow' command ('config sflow interface disable ...') @@ -6523,19 +6572,26 @@ def enable(ctx, ifname): @click.argument('ifname', metavar='', required=True, type=str) @click.pass_context def disable(ctx, ifname): - config_db = ctx.obj['db'] - if not interface_name_is_valid(config_db, ifname) and ifname != 'all': - click.echo("Invalid interface name") - return + config_db = ValidatedConfigDBConnector(ctx.obj['db']) + if ADHOC_VALIDATION: + if not interface_name_is_valid(config_db, ifname) and ifname != 'all': + click.echo("Invalid interface name") + return intf_dict = config_db.get_table('SFLOW_SESSION') if intf_dict and ifname in intf_dict: intf_dict[ifname]['admin_state'] = 'down' - config_db.mod_entry('SFLOW_SESSION', ifname, intf_dict[ifname]) + try: + config_db.mod_entry('SFLOW_SESSION', ifname, intf_dict[ifname]) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) else: - config_db.mod_entry('SFLOW_SESSION', ifname, - {'admin_state': 'down'}) + try: + config_db.mod_entry('SFLOW_SESSION', ifname, + {'admin_state': 'down'}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # # 'sflow' command ('config sflow interface sample-rate ...') @@ -6545,13 +6601,14 @@ def disable(ctx, ifname): @click.argument('rate', metavar='', required=True, type=str) @click.pass_context def sample_rate(ctx, ifname, rate): - config_db = ctx.obj['db'] - if not interface_name_is_valid(config_db, ifname) and ifname != 'all': - click.echo('Invalid interface name') - return - if not is_valid_sample_rate(rate) and rate != 'default': - click.echo('Error: Sample rate must be between 256 and 8388608 or default') - return + config_db = ValidatedConfigDBConnector(ctx.obj['db']) + if ADHOC_VALIDATION: + if not interface_name_is_valid(config_db, ifname) and ifname != 'all': + click.echo('Invalid interface name') + return + if not is_valid_sample_rate(rate) and rate != 'default': + click.echo('Error: Sample rate must be between 256 and 8388608 or default') + return sess_dict = config_db.get_table('SFLOW_SESSION') @@ -6560,13 +6617,22 @@ def sample_rate(ctx, ifname, rate): if 'sample_rate' not in sess_dict[ifname]: return del sess_dict[ifname]['sample_rate'] - config_db.set_entry('SFLOW_SESSION', ifname, sess_dict[ifname]) + try: + config_db.set_entry('SFLOW_SESSION', ifname, sess_dict[ifname]) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) return sess_dict[ifname]['sample_rate'] = rate - config_db.mod_entry('SFLOW_SESSION', ifname, sess_dict[ifname]) + try: + config_db.mod_entry('SFLOW_SESSION', ifname, sess_dict[ifname]) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) else: if rate != 'default': - config_db.mod_entry('SFLOW_SESSION', ifname, {'sample_rate': rate}) + try: + config_db.mod_entry('SFLOW_SESSION', ifname, {'sample_rate': rate}) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # @@ -6668,11 +6734,12 @@ def agent_id(ctx): @click.pass_context def add(ctx, ifname): """Add sFlow agent information""" - if ifname not in netifaces.interfaces(): - click.echo("Invalid interface name") - return + if ADHOC_VALIDATION: + if ifname not in netifaces.interfaces(): + click.echo("Invalid interface name") + return - config_db = ctx.obj['db'] + config_db = ValidatedConfigDBConnector(ctx.obj['db']) sflow_tbl = config_db.get_table('SFLOW') if not sflow_tbl: @@ -6683,7 +6750,10 @@ def add(ctx, ifname): return sflow_tbl['global']['agent_id'] = ifname - config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + try: + config_db.mod_entry('SFLOW', 'global', sflow_tbl['global']) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # # 'sflow' command ('config sflow agent-id del') @@ -6692,7 +6762,7 @@ def add(ctx, ifname): @click.pass_context def delete(ctx): """Delete sFlow agent information""" - config_db = ctx.obj['db'] + config_db = ValidatedConfigDBConnector(ctx.obj['db']) sflow_tbl = config_db.get_table('SFLOW') if not sflow_tbl: @@ -6703,7 +6773,10 @@ def delete(ctx): return sflow_tbl['global'].pop('agent_id') - config_db.set_entry('SFLOW', 'global', sflow_tbl['global']) + try: + config_db.set_entry('SFLOW', 'global', sflow_tbl['global']) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) # # set ipv6 link local mode on a given interface diff --git a/config/vxlan.py b/config/vxlan.py index be0a961001..71377d5609 100644 --- a/config/vxlan.py +++ b/config/vxlan.py @@ -1,6 +1,10 @@ import click import utilities_common.cli as clicommon +from jsonpatch import JsonPatchConflict +from .validated_config_db_connector import ValidatedConfigDBConnector + +ADHOC_VALIDATION = True # # 'vxlan' group ('config vxlan ...') # @@ -15,9 +19,11 @@ def vxlan(): def add_vxlan(db, vxlan_name, src_ip): """Add VXLAN""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) - if not clicommon.is_ipaddress(src_ip): - ctx.fail("{} invalid src ip address".format(src_ip)) + if ADHOC_VALIDATION: + if not clicommon.is_ipaddress(src_ip): + ctx.fail("{} invalid src ip address".format(src_ip)) vxlan_keys = db.cfgdb.get_keys('VXLAN_TUNNEL') if not vxlan_keys: @@ -29,7 +35,10 @@ def add_vxlan(db, vxlan_name, src_ip): ctx.fail("VTEP already configured.") fvs = {'src_ip': src_ip} - db.cfgdb.set_entry('VXLAN_TUNNEL', vxlan_name, fvs) + try: + config_db.set_entry('VXLAN_TUNNEL', vxlan_name, fvs) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan.command('del') @click.argument('vxlan_name', metavar='', required=True) @@ -37,6 +46,7 @@ def add_vxlan(db, vxlan_name, src_ip): def del_vxlan(db, vxlan_name): """Del VXLAN""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) vxlan_keys = db.cfgdb.get_keys('VXLAN_TUNNEL') if vxlan_name not in vxlan_keys: @@ -66,7 +76,10 @@ def del_vxlan(db, vxlan_name): if ('vxlan_tunnel' in vnet_table[vnet_key] and vnet_table[vnet_key]['vxlan_tunnel'] == vxlan_name): ctx.fail("Please delete all VNET configuration referencing the tunnel " + vxlan_name) - db.cfgdb.set_entry('VXLAN_TUNNEL', vxlan_name, None) + try: + config_db.set_entry('VXLAN_TUNNEL', vxlan_name, None) + except JsonPatchConflict as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan.group('evpn_nvo') def vxlan_evpn_nvo(): @@ -79,6 +92,7 @@ def vxlan_evpn_nvo(): def add_vxlan_evpn_nvo(db, nvo_name, vxlan_name): """Add NVO""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) vxlan_keys = db.cfgdb.get_keys("VXLAN_EVPN_NVO|*") if not vxlan_keys: vxlan_count = 0 @@ -92,7 +106,10 @@ def add_vxlan_evpn_nvo(db, nvo_name, vxlan_name): ctx.fail("VTEP {} not configured".format(vxlan_name)) fvs = {'source_vtep': vxlan_name} - db.cfgdb.set_entry('VXLAN_EVPN_NVO', nvo_name, fvs) + try: + config_db.set_entry('VXLAN_EVPN_NVO', nvo_name, fvs) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan_evpn_nvo.command('del') @click.argument('nvo_name', metavar='', required=True) @@ -100,6 +117,7 @@ def add_vxlan_evpn_nvo(db, nvo_name, vxlan_name): def del_vxlan_evpn_nvo(db, nvo_name): """Del NVO""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) vxlan_keys = db.cfgdb.get_keys('VXLAN_TUNNEL_MAP') if not vxlan_keys: vxlan_count = 0 @@ -107,8 +125,11 @@ def del_vxlan_evpn_nvo(db, nvo_name): vxlan_count = len(vxlan_keys) if(vxlan_count > 0): - ctx.fail("Please delete all VLAN VNI mappings.") - db.cfgdb.set_entry('VXLAN_EVPN_NVO', nvo_name, None) + ctx.fail("Please delete all VLAN VNI mappings.") + try: + config_db.set_entry('VXLAN_EVPN_NVO', nvo_name, None) + except JsonPatchConflict as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan.group('map') def vxlan_map(): @@ -122,6 +143,7 @@ def vxlan_map(): def add_vxlan_map(db, vxlan_name, vlan, vni): """Add VLAN-VNI map entry""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) if not vlan.isdigit(): ctx.fail("Invalid vlan {}. Only valid vlan is accepted".format(vni)) @@ -152,7 +174,10 @@ def add_vxlan_map(db, vxlan_name, vlan, vni): fvs = {'vni': vni, 'vlan' : vlan_name} mapname = vxlan_name + '|' + 'map_' + vni + '_' + vlan_name - db.cfgdb.set_entry('VXLAN_TUNNEL_MAP', mapname, fvs) + try: + config_db.set_entry('VXLAN_TUNNEL_MAP', mapname, fvs) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan_map.command('del') @click.argument('vxlan_name', metavar='', required=True) @@ -162,6 +187,7 @@ def add_vxlan_map(db, vxlan_name, vlan, vni): def del_vxlan_map(db, vxlan_name, vlan, vni): """Del VLAN-VNI map entry""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) if not vlan.isdigit(): ctx.fail("Invalid vlan {}. Only valid vlan is accepted".format(vni)) @@ -189,7 +215,10 @@ def del_vxlan_map(db, vxlan_name, vlan, vni): mapname = vxlan_name + '|' + 'map_' + vni + '_' + vlan db.cfgdb.set_entry('VXLAN_TUNNEL_MAP', mapname, None) mapname = vxlan_name + '|' + 'map_' + vni + '_Vlan' + vlan - db.cfgdb.set_entry('VXLAN_TUNNEL_MAP', mapname, None) + try: + config_db.set_entry('VXLAN_TUNNEL_MAP', mapname, None) + except JsonPatchConflict as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan.group('map_range') def vxlan_map_range(): @@ -203,6 +232,7 @@ def vxlan_map_range(): @clicommon.pass_db def add_vxlan_map_range(db, vxlan_name, vlan_start, vlan_end, vni_start): """Add Range of vlan-vni mappings""" + config_db = ValidatedConfigDBConnector(db.cfgdb) ctx = click.get_current_context() if clicommon.is_vlanid_in_range(vlan_start) is False: ctx.fail(" Invalid Vlan Id , Valid Range : 1 to 4094 ") @@ -244,7 +274,10 @@ def add_vxlan_map_range(db, vxlan_name, vlan_start, vlan_end, vni_start): fvs = {'vni': vni_name, 'vlan' : vlan_name} mapname = vxlan_name + '|' + 'map_' + vni_name + '_' + vlan_name - db.cfgdb.set_entry('VXLAN_TUNNEL_MAP', mapname, fvs) + try: + config_db.set_entry('VXLAN_TUNNEL_MAP', mapname, fvs) + except ValueError as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) @vxlan_map_range.command('del') @click.argument('vxlan_name', metavar='', required=True) @@ -255,6 +288,7 @@ def add_vxlan_map_range(db, vxlan_name, vlan_start, vlan_end, vni_start): def del_vxlan_map_range(db, vxlan_name, vlan_start, vlan_end, vni_start): """Del Range of vlan-vni mappings""" ctx = click.get_current_context() + config_db = ValidatedConfigDBConnector(db.cfgdb) if clicommon.is_vlanid_in_range(vlan_start) is False: ctx.fail(" Invalid Vlan Id , Valid Range : 1 to 4094 ") if clicommon.is_vlanid_in_range(vlan_end) is False: @@ -279,5 +313,8 @@ def del_vxlan_map_range(db, vxlan_name, vlan_start, vlan_end, vni_start): continue mapname = vxlan_name + '|' + 'map_' + vni_name + '_' + vlan_name - db.cfgdb.set_entry('VXLAN_TUNNEL_MAP', mapname, None) + try: + config_db.set_entry('VXLAN_TUNNEL_MAP', mapname, None) + except JsonPatchConflict as e: + ctx.fail("Invalid ConfigDB. Error: {}".format(e)) diff --git a/tests/config_test.py b/tests/config_test.py index 6fdc020ec4..06e984b7ff 100644 --- a/tests/config_test.py +++ b/tests/config_test.py @@ -1777,6 +1777,167 @@ def teardown_class(cls): print("TEARDOWN") +class TestConfigWarmRestart(object): + @classmethod + def setup_class(cls): + print("SETUP") + import config.main + importlib.reload(config.main) + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_warm_restart_neighsyncd_timer_yang_validation(self): + config.ADHOC_VALIDATION = False + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["neighsyncd_timer"], ["2000"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Invalid ConfigDB. Error" in result.output + + def test_warm_restart_neighsyncd_timer(self): + config.ADHOC_VALIDATION = True + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["neighsyncd_timer"], ["0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "neighsyncd warm restart timer must be in range 1-9999" in result.output + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_warm_restart_bgp_timer_yang_validation(self): + config.ADHOC_VALIDATION = False + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["bgp_timer"], ["2000"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Invalid ConfigDB. Error" in result.output + + def test_warm_restart_bgp_timer(self): + config.ADHOC_VALIDATION = True + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["bgp_timer"], ["0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "bgp warm restart timer must be in range 1-3600" in result.output + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_warm_restart_teamsyncd_timer_yang_validation(self): + config.ADHOC_VALIDATION = False + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["teamsyncd_timer"], ["2000"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Invalid ConfigDB. Error" in result.output + + def test_warm_restart_teamsyncd_timer(self): + config.ADHOC_VALIDATION = True + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["teamsyncd_timer"], ["0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "teamsyncd warm restart timer must be in range 1-3600" in result.output + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_warm_restart_bgp_eoiu_yang_validation(self): + config.ADHOC_VALIDATION = False + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["warm_restart"].commands["bgp_eoiu"], ["true"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Invalid ConfigDB. Error" in result.output + + @classmethod + def teardown_class(cls): + print("TEARDOWN") + + +class TestConfigCableLength(object): + @classmethod + def setup_class(cls): + print("SETUP") + import config.main + importlib.reload(config.main) + + @patch("config.main.is_dynamic_buffer_enabled", mock.Mock(return_value=True)) + @patch("config.main.ConfigDBConnector.get_entry", mock.Mock(return_value=False)) + def test_add_cablelength_with_nonexistent_name_valid_length(self): + config.ADHOC_VALIDATION = True + runner = CliRunner() + db = Db() + obj = {'config_db':db.cfgdb} + + result = runner.invoke(config.config.commands["interface"].commands["cable-length"], ["Ethernet0","40m"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Port Ethernet0 doesn't exist" in result.output + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + @patch("config.main.ConfigDBConnector.get_entry", mock.Mock(return_value="Port Info")) + @patch("config.main.is_dynamic_buffer_enabled", mock.Mock(return_value=True)) + @patch("config.main.ConfigDBConnector.get_keys", mock.Mock(return_value=["sample_key"])) + def test_add_cablelength_invalid_yang_validation(self): + config.ADHOC_VALIDATION = False + runner = CliRunner() + db = Db() + obj = {'config_db':db.cfgdb} + + result = runner.invoke(config.config.commands["interface"].commands["cable-length"], ["Ethernet0","40"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Invalid ConfigDB. Error" in result.output + + @patch("config.main.ConfigDBConnector.get_entry", mock.Mock(return_value="Port Info")) + @patch("config.main.is_dynamic_buffer_enabled", mock.Mock(return_value=True)) + def test_add_cablelength_with_invalid_name_invalid_length(self): + config.ADHOC_VALIDATION = True + runner = CliRunner() + db = Db() + obj = {'config_db':db.cfgdb} + + result = runner.invoke(config.config.commands["interface"].commands["cable-length"], ["Ethernet0","40x"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Invalid cable length" in result.output + + @classmethod + def teardown_class(cls): + print("TEARDOWN") + + class TestConfigLoopback(object): @classmethod def setup_class(cls): diff --git a/tests/ip_config_test.py b/tests/ip_config_test.py index 24d09c86e3..2b92546d9b 100644 --- a/tests/ip_config_test.py +++ b/tests/ip_config_test.py @@ -3,11 +3,14 @@ import os import traceback from unittest import mock +from mock import patch from click.testing import CliRunner +from jsonpatch import JsonPatchConflict import config.main as config import show.main as show +import config.validated_config_db_connector as validated_config_db_connector from utilities_common.db import Db test_path = os.path.dirname(os.path.abspath(__file__)) @@ -274,13 +277,56 @@ def test_intf_unknown_vrf_bind(self): print(result.exit_code, result.output) assert result.exit_code != 0 assert result.output == INVALID_MGMT_VRF_MSG - + result = runner.invoke(config.config.commands["vrf"].commands["add"], ["mgmt"], obj=obj) print(result.exit_code, result.output) result = runner.invoke(config.config.commands["interface"].commands["vrf"].commands["bind"], ["Ethernet64", "mgmt"], obj=obj) print(result.exit_code, result.output) assert result.exit_code == 0 + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_set_entry", mock.Mock(side_effect=ValueError)) + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.main.ConfigDBConnector.get_entry", mock.Mock(return_value={"mgmtVrfEnabled": "false"})) + def test_add_vrf_invalid_configdb_yang_validation(self): + runner = CliRunner() + db = Db() + obj = {'config_db':db.cfgdb, 'namespace':db.db.namespace} + + result = runner.invoke(config.config.commands["vrf"].commands["add"], ["mgmt"], obj=obj) + print(result.exit_code) + print(result.output) + assert "Invalid ConfigDB. Error" in result.output + assert result.exit_code != 0 + + result = runner.invoke(config.config.commands["vrf"].commands["add"], ["Vrf01"], obj=obj) + print(result.exit_code) + print(result.output) + assert "Invalid ConfigDB. Error" in result.output + assert result.exit_code != 0 + + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_set_entry", mock.Mock(side_effect=JsonPatchConflict)) + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.main.is_vrf_exists", mock.Mock(return_value=True)) + @patch("config.main.ConfigDBConnector.get_entry", mock.Mock(return_value={"mgmtVrfEnabled": "true"})) + def test_del_vrf_invalid_configdb_yang_validation(self): + runner = CliRunner() + db = Db() + obj = {'config_db':db.cfgdb, 'namespace':db.db.namespace} + + result = runner.invoke(config.config.commands["vrf"].commands["del"], ["mgmt"], obj=obj) + print(result.exit_code) + print(result.output) + assert "Invalid ConfigDB. Error" in result.output + assert result.exit_code != 0 + + result = runner.invoke(config.config.commands["vrf"].commands["del"], ["Vrf01"], obj=obj) + print(result.exit_code) + print(result.output) + assert "Invalid ConfigDB. Error" in result.output + assert result.exit_code != 0 + @classmethod def teardown_class(cls): os.environ['UTILITIES_UNIT_TESTING'] = "0" diff --git a/tests/sflow_test.py b/tests/sflow_test.py index ecb2782534..226e52ae5e 100644 --- a/tests/sflow_test.py +++ b/tests/sflow_test.py @@ -5,9 +5,11 @@ from click.testing import CliRunner from utilities_common.db import Db +from mock import patch import show.main as show import config.main as config +import config.validated_config_db_connector as validated_config_db_connector config.asic_type = mock.MagicMock(return_value = "broadcom") @@ -59,6 +61,21 @@ def test_show_sflow_intf(self): assert result.exit_code == 0 assert result.output == show_sflow_intf_output + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_config_sflow_disable_enable_yang_validation(self): + db = Db() + runner = CliRunner() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["sflow"].commands["enable"], [], obj=obj) + print(result.exit_code, result.output) + assert "Invalid ConfigDB. Error" in result.output + + result = runner.invoke(config.config.commands["sflow"].commands["disable"], [], obj=obj) + print(result.exit_code, result.output) + assert "Invalid ConfigDB. Error" in result.output + def test_config_sflow_disable_enable(self): # config sflow db = Db() @@ -176,11 +193,31 @@ def test_config_sflow_collector(self): assert result.output == show_sflow_output return + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_config_sflow_polling_interval_yang_validation(self): + db = Db() + runner = CliRunner() + obj = {'db':db.cfgdb} + + config.ADHOC_VALIDATION = False + result = runner.invoke(config.config.commands["sflow"]. + commands["polling-interval"], ["500"], obj=obj) + print(result.exit_code, result.output) + assert "Invalid ConfigDB. Error" in result.output def test_config_sflow_polling_interval(self): db = Db() runner = CliRunner() obj = {'db':db.cfgdb} + config.ADHOC_VALIDATION = True + + # set to 500 out of range + result = runner.invoke(config.config.commands["sflow"]. + commands["polling-interval"], ["500"], obj=obj) + print(result.exit_code, result.output) + assert "Polling interval must be between 5-300" in result.output # set to 20 result = runner.invoke(config.config.commands["sflow"]. @@ -208,10 +245,38 @@ def test_config_sflow_polling_interval(self): return + @patch("config.main.ConfigDBConnector.get_table", mock.Mock(return_value={'Ethernet1': {'admin_state': 'sample_state'}})) + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_config_sflow_existing_intf_enable_yang_validation(self): + db = Db() + runner = CliRunner() + obj = {'db':db.cfgdb} + config.ADHOC_VALIDATION = False + + result = runner.invoke(config.config.commands["sflow"]. + commands["interface"].commands["enable"], ["Ethernet1"], obj=obj) + print(result.exit_code, result.output) + assert "Invalid ConfigDB. Error" in result.output + + @patch("config.main.ConfigDBConnector.get_table", mock.Mock(return_value=None)) + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_mod_entry", mock.Mock(side_effect=ValueError)) + def test_config_sflow_nonexistent_intf_enable_yang_validation(self): + db = Db() + runner = CliRunner() + obj = {'db':db.cfgdb} + + result = runner.invoke(config.config.commands["sflow"]. + commands["interface"].commands["enable"], ["Ethernet1"], obj=obj) + print(result.exit_code, result.output) + assert "Invalid ConfigDB. Error" in result.output + def test_config_sflow_intf_enable_disable(self): db = Db() runner = CliRunner() obj = {'db':db.cfgdb} + config.ADHOC_VALIDATION = True # mock interface_name_is_valid config.interface_name_is_valid = mock.MagicMock(return_value = True) @@ -237,6 +302,17 @@ def test_config_sflow_intf_enable_disable(self): sflowSession = db.cfgdb.get_table('SFLOW_SESSION') assert sflowSession["Ethernet1"]["admin_state"] == "down" + config.interface_name_is_valid = mock.MagicMock(return_value = False) + result = runner.invoke(config.config.commands["sflow"]. + commands["interface"].commands["enable"], ["Ethernetx"], obj=obj) + print(result.exit_code, result.output) + assert "Invalid interface name" in result.output + + result = runner.invoke(config.config.commands["sflow"]. + commands["interface"].commands["disable"], ["Ethernetx"], obj=obj) + print(result.exit_code, result.output) + assert "Invalid interface name" in result.output + return def test_config_sflow_intf_sample_rate(self): diff --git a/tests/vxlan_test.py b/tests/vxlan_test.py index 61c78dc429..9775fe96d0 100644 --- a/tests/vxlan_test.py +++ b/tests/vxlan_test.py @@ -3,8 +3,11 @@ from unittest import mock from click.testing import CliRunner +from mock import patch +from jsonpatch import JsonPatchConflict import config.main as config +import config.validated_config_db_connector as validated_config_db_connector import show.main as show from utilities_common.db import Db from .mock_tables import dbconnector @@ -246,6 +249,34 @@ def test_show_vxlan_remotevni_specific_cnt(self): print(result.output) assert result.exit_code == 0 assert result.output == show_vxlan_remotevni_specific_cnt_output + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_set_entry", mock.Mock(side_effect=ValueError)) + @patch("config.main.ConfigDBConnector.get_entry", mock.Mock(return_value="Vlan Data")) + @patch("config.main.ConfigDBConnector.get_table", mock.Mock(return_value={'sample_key': 'sample_value'})) + def test_config_vxlan_add_yang_validation(self): + runner = CliRunner() + db = Db() + + result = runner.invoke(config.config.commands["vxlan"].commands["map_range"].commands["del"], ["vtep1", "100", "102", "100"], obj=db) + print(result.exit_code) + assert result.exit_code != 0 + + result = runner.invoke(config.config.commands["vxlan"].commands["map_range"].commands["add"], ["vtep1", "100", "102", "100"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + + @patch("validated_config_db_connector.device_info.is_yang_config_validation_enabled", mock.Mock(return_value=True)) + @patch("config.validated_config_db_connector.ValidatedConfigDBConnector.validated_set_entry", mock.Mock(side_effect=JsonPatchConflict)) + def test_config_vxlan_add_yang_validation_json_error(self): + runner = CliRunner() + db = Db() + + result = runner.invoke(config.config.commands["vxlan"].commands["map"].commands["del"], ["vtep1", "200", "200"], obj=db) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 def test_show_vxlan_remotemac(self): runner = CliRunner()