-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Acl icmp #451
Closed
Anandaraj-Maharajan
wants to merge
4
commits into
sonic-net:master
from
Anandaraj-Maharajan:acl_icmp
Closed
Acl icmp #451
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
27bd1e0
[ansible/roles] Added icmp filter test case
Anandaraj-Maharajan 06d021d
[ansible/roles] Added icmp filter test case
Anandaraj-Maharajan c653199
Merge branch 'acl_icmp' of https://github.com/Anandaraj-Maharajan/son…
Anandaraj-Maharajan 469be53
Fixed minor errors in files
Anandaraj-Maharajan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,303 @@ | ||
''' | ||
Description: This file contains the ACL test for SONiC testbed | ||
Implemented according to the https://github.com/Azure/SONiC/wiki/ACL-test-plan | ||
Usage: Examples of how to use: | ||
ptf --test-dir acstests acltb_test.AclTest --platform remote -t 'router_mac="00:02:03:04:05:00";verbose=True;route_info="/tmp/route_info.txt"' | ||
''' | ||
|
||
#--------------------------------------------------------------------- | ||
# Global imports | ||
#--------------------------------------------------------------------- | ||
import random | ||
import time | ||
import logging | ||
import ptf.packet as scapy | ||
import socket | ||
import ptf.dataplane as dataplane | ||
|
||
from ptf.testutils import * | ||
from ptf.mask import Mask | ||
import ipaddress | ||
|
||
import os | ||
import logging | ||
import unittest | ||
|
||
import ptf | ||
from ptf.base_tests import BaseTest | ||
from ptf import config | ||
import ptf.dataplane as dataplane | ||
import ptf.testutils as testutils | ||
|
||
import pprint | ||
|
||
class AclTest(BaseTest): | ||
''' | ||
@summary: ACL tests on testbed topo: t1 | ||
''' | ||
|
||
#--------------------------------------------------------------------- | ||
# Class variables | ||
#--------------------------------------------------------------------- | ||
PORT_COUNT = 31 # temporary exclude the last port | ||
|
||
def __init__(self): | ||
''' | ||
@summary: constructor | ||
''' | ||
BaseTest.__init__(self) | ||
self.test_params = testutils.test_params_get() | ||
#--------------------------------------------------------------------- | ||
|
||
def setUp(self): | ||
''' | ||
@summary: Setup for the test | ||
''' | ||
|
||
self.dataplane = ptf.dataplane_instance | ||
self.router_mac = self.test_params['router_mac'] | ||
self.testbed_type = self.test_params['testbed_type'] | ||
|
||
#--------------------------------------------------------------------- | ||
|
||
''' | ||
For diagnostic purposes only | ||
''' | ||
def print_route_info(self): | ||
pprint.pprint(self.route_info) | ||
return | ||
#--------------------------------------------------------------------- | ||
|
||
def verify_packet_any_port(self, pkt, ports=[], device_number=0): | ||
""" | ||
@summary: Check that the packet is received on _any_ of the specified ports belonging to | ||
the given device (default device_number is 0). | ||
The function returns when either the expected packet is received or timeout (1 second). | ||
Also verifies that the packet is or received on any other ports for this | ||
device, and that no other packets are received on the device (unless --relax | ||
is in effect). | ||
@param pkt : packet to verify | ||
@param ports : list of ports | ||
@return: index of the port on which the packet is received and the packet. | ||
""" | ||
received = False | ||
match_index = 0 | ||
(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=device_number, exp_pkt=pkt, timeout=1) | ||
|
||
if rcv_port in ports: | ||
match_index = ports.index(rcv_port) | ||
received = True | ||
|
||
return (match_index, rcv_pkt, received) | ||
#--------------------------------------------------------------------- | ||
|
||
def runSendReceiveTest(self, pkt2send, src_port , pkt2recv, destination_ports): | ||
""" | ||
@summary Send packet and verify it is received/not received on the expected ports | ||
""" | ||
|
||
masked2recv = Mask(pkt2recv) | ||
masked2recv.set_do_not_care_scapy(scapy.Ether, "dst") | ||
masked2recv.set_do_not_care_scapy(scapy.Ether, "src") | ||
|
||
send_packet(self, src_port, pkt2send) | ||
(index, rcv_pkt, received) = self.verify_packet_any_port(masked2recv, destination_ports) | ||
|
||
self.tests_total += 1 | ||
|
||
return received | ||
|
||
#--------------------------------------------------------------------- | ||
def runAclTests(self, dst_ip, dst_ip_blocked, src_port, dst_ports): | ||
""" | ||
@summary: Crete and send packet to verify each ACL rule | ||
@return: Number of tests passed | ||
""" | ||
|
||
tests_passed = 0 | ||
self.tests_total = 0 | ||
|
||
print "\nPort to sent packets to: %d" % src_port | ||
print "Destination IP: %s" % dst_ip_blocked | ||
print "Ports to expect packet from: ", | ||
pprint.pprint(dst_ports) | ||
print "Dst IP expected to be blocked: ", dst_ip_blocked | ||
|
||
pkt0 = simple_tcp_packet( | ||
eth_dst = self.router_mac, | ||
eth_src = self.dataplane.get_mac(0, 0), | ||
ip_src = "10.0.0.1", | ||
ip_dst = dst_ip, | ||
tcp_sport = 0x1234, | ||
tcp_dport = 0x50, | ||
ip_ttl = 64 | ||
) | ||
#exp_pkt = pkt.deepcopy() | ||
exp_pkt0 = simple_tcp_packet( | ||
eth_dst = self.dataplane.get_mac(0, 0), | ||
eth_src = self.router_mac, | ||
ip_src = "10.0.0.1", | ||
ip_dst = dst_ip, | ||
tcp_sport = 0x1234, | ||
tcp_dport = 0x50, | ||
ip_ttl = 63 | ||
) | ||
|
||
print "" | ||
# Test #1 - Verify source IP match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['IP'].src = "10.0.0.2" | ||
exp_pkt['IP'].src = "10.0.0.2" | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #1 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #2 - Verify destination IP match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['IP'].dst = dst_ip_blocked | ||
exp_pkt['IP'].dst = dst_ip_blocked | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #2 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #3 - Verify L4 source port match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['TCP'].sport = 0x1235 | ||
exp_pkt['TCP'].sport = 0x1235 | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #3 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #4 - Verify L4 destination port match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['TCP'].dport = 0x1235 | ||
exp_pkt['TCP'].dport = 0x1235 | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #4 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #5 - Verify ether type match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['Ethernet'].type = 0x1234 | ||
exp_pkt['Ethernet'].type = 0x1234 | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #5 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #6 - Verify ip protocol match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['IP'].proto = 0x7E | ||
exp_pkt['IP'].proto = 0x7E | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #6 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #7 - Verify TCP flags match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['TCP'].flags = 0x12 | ||
exp_pkt['TCP'].flags = 0x12 | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #7 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #9 - Verify source port range match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['TCP'].sport = 0x123A | ||
exp_pkt['TCP'].sport = 0x123A | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #9 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #10 - Verify destination port range match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['TCP'].dport = 0x123A | ||
exp_pkt['TCP'].dport = 0x123A | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #10 %s" % ("FAILED" if res else "PASSED") | ||
|
||
# Test #11 - Verify rules priority | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['IP'].src = "10.0.0.3" | ||
exp_pkt['IP'].src = "10.0.0.3" | ||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (1 if res else 0) | ||
print "Test #11 %s" % ("PASSED" if res else "FAILED") | ||
|
||
#Creates a ICMP packet | ||
pkt0 = simple_icmp_packet( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect alignment and else where in file. Please correct. |
||
eth_dst = self.router_mac, | ||
eth_src = self.dataplane.get_mac(0, 0), | ||
ip_src = "10.0.0.1", | ||
ip_dst = dst_ip, | ||
icmp_type=8, | ||
icmp_code=0, | ||
ip_ttl = 64 | ||
) | ||
#exp_pkt = pkt.deepcopy() | ||
exp_pkt0 = simple_icmp_packet( | ||
eth_dst = self.dataplane.get_mac(0, 0), | ||
eth_src = self.router_mac, | ||
ip_src = "10.0.0.1", | ||
ip_dst = dst_ip, | ||
icmp_type=8, | ||
icmp_code=0, | ||
ip_ttl = 63 | ||
) | ||
|
||
# Test #12 - Verify IP protocol & source IP match | ||
pkt = pkt0.copy() | ||
exp_pkt = exp_pkt0.copy() | ||
pkt['IP'].src = "10.0.0.2" | ||
exp_pkt['IP'].src = "10.0.0.2" | ||
pkt['IP'].proto=0x1 | ||
exp_pkt['IP'].proto=0x1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you fix the alignment here? |
||
res = self.runSendReceiveTest(pkt, src_port, exp_pkt, dst_ports) | ||
tests_passed += (0 if res else 1) | ||
print "Test #12 %s" % ("FAILED" if res else "PASSED") | ||
return tests_passed, self.tests_total | ||
|
||
#--------------------------------------------------------------------- | ||
|
||
def runTest(self): | ||
""" | ||
@summary: Crete and send packet to verify each ACL rule | ||
""" | ||
|
||
test_result = False | ||
|
||
self.switch_info = open(self.test_params["switch_info"], 'r').readlines() | ||
if self.testbed_type in [ 't1', 't1-lag' ]: | ||
self.tor_ports = map(int, self.switch_info[0].rstrip(",\n").split(",")) | ||
self.spine_ports = map(int, self.switch_info[1].rstrip(",\n").split(",")) | ||
self.dest_ip_addr_spine = self.switch_info[2].strip() | ||
self.dest_ip_addr_spine_blocked = self.switch_info[3].strip() | ||
self.dest_ip_addr_tor = self.switch_info[4].strip() | ||
self.dest_ip_addr_tor_blocked = self.switch_info[5].strip() | ||
|
||
# Verify ACLs on tor port | ||
(tests_passed, tests_total) = self.runAclTests(self.dest_ip_addr_spine, self.dest_ip_addr_spine_blocked, self.tor_ports[0], self.spine_ports) | ||
assert(tests_passed == tests_total) | ||
|
||
# Verify ACLs on spine port | ||
(tests_passed, tests_total) = self.runAclTests(self.dest_ip_addr_tor, self.dest_ip_addr_tor_blocked, self.spine_ports[0], self.tor_ports) | ||
assert(tests_passed == tests_total) | ||
elif self.testbed_type == 't0': | ||
src_port = map(int, self.switch_info[0].rstrip(",\n").split(",")) | ||
dst_ports = map(int, self.switch_info[1].rstrip(",\n").split(",")) | ||
dst_ip = self.switch_info[2].strip() | ||
dst_ip_blocked = self.switch_info[3].strip() | ||
|
||
(tests_passed, tests_total) = self.runAclTests(dst_ip, dst_ip_blocked, src_port[0], dst_ports) | ||
assert(tests_passed == tests_total) | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't use
verify_packets_any
from ptf utils instead of defining new function?