Skip to content

Commit

Permalink
Feat: Rework k8s_tshark functionality and apply the changes in tests
Browse files Browse the repository at this point in the history
Refs: cnti-testcatalog#2072 cnti-testcatalog#2087
- Prior functionality was bound to fixed time of execution (120s), which introduced problems
in testing (tshark session ending before the test began).
- New functionality mainly implements infinite tshark execution along with the possibility
of terminating it when deemed appropriate. This is complemented with robust error handling
and termination of the tshark process on unexpected crashes during initialization.
NOTE: The main tests currently do not handle states where a crash could occur elsewhere and
thus a hanging tshark session can still happen (although unlikely).
- The module is properly commented which should allow the user to get a quick understanding
of its functionality.
- The user functionality remains the same with easier-to-comprehend function names.
- Handling of PIDs is rather problematic due to the nature of exec_by_node_bg function, which
does not return the PID of the tshark process but rather the PID of the shell executing it
(unverified). This is why the retrieval of PID may seem rather complicated (especially the
pid_command variable). Possible solutions are listed in a comment, but these don't quite
work for various reasons (globbing issues, return of incorrect PID, etc.).
As for the kill -15 and kill -9 repetition, some tshark session would
get stuck in a zombie state if the commands were not executed in this order.

Signed-off-by: svteb <slavo.valko@tietoevry.com>
  • Loading branch information
svteb committed Jul 4, 2024
1 parent fcf22a2 commit 567bf60
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 201 deletions.
6 changes: 4 additions & 2 deletions src/tasks/utils/cnf_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,9 @@ module CNFManager
baselines = JaegerManager.unique_services_total
Log.info { "baselines: #{baselines}" }
end

# todo start tshark monitoring the e2 traffic
tshark_log_name = ORANMonitor.start_e2_capture?(config.cnf_config)
capture = ORANMonitor.start_e2_capture?(config.cnf_config)

# todo separate out install methods into a module/function that accepts a block
liveness_time = 0
Expand Down Expand Up @@ -1053,9 +1054,10 @@ module CNFManager
else
tracing_used = false
end

if ORANMonitor.isCNFaRIC?(config.cnf_config)
sleep 30
e2_found = ORANMonitor.e2_session_established?(tshark_log_name)
e2_found = ORANMonitor.e2_session_established?(capture)
else
e2_found = false
end
Expand Down
302 changes: 176 additions & 126 deletions src/tasks/utils/k8s_tshark.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,148 +5,198 @@ require "halite"


module K8sTshark
class TsharkPacketCapture
property capture_file_path : String
property pid : Int32?
private property node_match : JSON::Any?

def self.log_of_tshark_by_label(command, label_key, label_value, duration="120") : String
Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" }
def initialize
@capture_file_path = ""
end

def finalize
if @pid
terminate_capture
end
end

# Starts a tshark packet capture on the node where the pod with the specified label is running.
# label_key and label_value: Used to identify the pod's label.
# command: Parameters to be passed to tshark.
def begin_capture_by_label(label_key : String, label_value : String, command : String = "")
Log.info { "Searching for the pod matching the label '#{label_key}:#{label_value}'."}
all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list)
pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value)
first_labeled_pod = pods[0]?
Log.info { "first_labeled_pod: #{first_labeled_pod}" }
if first_labeled_pod && first_labeled_pod.dig?("metadata", "name")
Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" }
pod_name = first_labeled_pod.dig("metadata", "name")
Log.info { "pod_name: #{pod_name}" }
nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod)
node = nodes.first
#create a unique name for the log
# rnd = Random.new
# name_id = rnd.next_int
# tshark_log_name = "/tmp/tshark-#{name_id}.json"
# Log.info { "tshark_log_name #{tshark_log_name}" }
#
# #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
# #command= -ni any -Y nas_5gs.mm.type_id -T json
# #todo check if tshark running already to keep from saturating network
# #todo play with reducing default duration
# ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
# ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node)
# Log.info { "after exec by node bg" }
# resp = tshark_log_name
resp = log_of_tshark_by_node(command, node, duration)
pod_match = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value).first?

unless pod_match && pod_match.dig?("metadata", "name")
error_message = "Pod with label '#{label_key}:#{label_value}' could not be found."
Log.error { error_message }
raise K8sTsharkError.new(error_message)
end

pod_name = pod_match.dig("metadata", "name")
Log.info { "Pod '#{pod_name}'' matches the label '#{label_key}:#{label_value}'." }

Log.info { "Searching for the node running the pod '#{pod_name}'." }
@node_match = KubectlClient::Get.nodes_by_pod(pod_match).first

unless @node_match && @node_match.not_nil!.dig?("metadata", "name")
error_message = "Node for pod '#{pod_name}' could not be found."
Log.error { error_message }
raise K8sTsharkError.new(error_message)
end

node_name = node_match.not_nil!.dig("metadata", "name")
Log.info { "Pod '#{pod_name}' is running on node '#{node_name}'." }

begin_capture_common(command)
end

# Starts a tshark packet capture on the specified node.
# node: The node where the capture should be performed.
# command: Parameters to be passed to tshark.
# duration: Optional; specifies the capture duration, eliminating the need to call terminate_capture manually.
def begin_capture_by_node(node : JSON::Any, command : String = "")
@node_match = node
begin_capture_common(command)
end

# Common method to unify capture by label and node.
private def begin_capture_common(command : String)
if @pid
Log.warn { "Ongoing capture process exists, terminate it or create a new capture." }
return
end

Log.info { "Starting tshark capture with command: #{command}." }

@capture_file_path = generate_capture_file_path()
Log.info { "Capturing packets on path: #{@capture_file_path}." }

# Other possible options to resolve the pid conundrum:
# 1. pgrep tshark -f -x "tshark #{command}"
# 2. ...; echo $! > /tmp/pidfile
# 3. bake in 'echo $$!', retrieve from some file
# 4. fix kubectl_client to return the pid of the process that is launched, not the shell
pid_file = "/tmp/pidfile"
pid_command = "ps -eo pid,cmd,start --sort=start_time | grep '[t]shark' | tail -1 | awk '{print $1}' > #{pid_file}"
capture_command = "tshark #{command} > #{@capture_file_path} 2>&1"

launch_capture(capture_command)
retrieve_pid(pid_command, pid_file)
end

# Terminates the tshark packet capture process. This should be called if the duration parameter was not set in begin_capture.
def terminate_capture
if @pid
Log.info { "Terminating packet capture with PID: #{@pid}." }
Log.info { "Capture collected on path: #{@capture_file_path}." }

# Some tshark captures were left in zombie states if only kill/kill -9 was invoked.
ClusterTools.exec_by_node_bg("kill -15 #{@pid}", @node_match.not_nil!)
sleep 1
ClusterTools.exec_by_node_bg("kill -9 #{@pid}", @node_match.not_nil!)

@pid = nil
@node_match = nil
else
resp = "label key:#{label_key} value: #{label_value} not found"
Log.warn { "No active capture process to terminate." }
end
Log.info { "resp #{resp}" }
resp
end
end

def self.log_of_tshark_by_label_bg(command, label_key, label_value, duration="120") : String
Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" }
all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list)
pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value)
first_labeled_pod = pods[0]?
Log.info { "first_labeled_pod: #{first_labeled_pod}" }
if first_labeled_pod && first_labeled_pod.dig?("metadata", "name")
Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" }
pod_name = first_labeled_pod.dig("metadata", "name")
Log.info { "pod_name: #{pod_name}" }
nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod)
node = nodes.first
#create a unique name for the log
# rnd = Random.new
# name_id = rnd.next_int
# tshark_log_name = "/tmp/tshark-#{name_id}.json"
# Log.info { "tshark_log_name #{tshark_log_name}" }
#
# #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
# #command= -ni any -Y nas_5gs.mm.type_id -T json
# #todo check if tshark running already to keep from saturating network
# #todo play with reducing default duration
# ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
# ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node)
# Log.info { "after exec by node bg" }
# resp = tshark_log_name
resp = log_of_tshark_by_node_bg(command, node, duration: "120")
else
resp = "label key:#{label_key} value: #{label_value} not found"
# Searches the capture file for lines matching the specified regex pattern.
# Returns an array of matching lines.
def regex_search(pattern : Regex) : Array(String)
matches = [] of String

if @capture_file_path.empty?
Log.warn { "Cannot find a match using a regular expression before a capture has been started." }
else
Log.info { "Collecting lines matching the regular expression #{pattern}." }
file_content = File.read(@capture_file_path)
matches = file_content.scan(pattern).map(&.string)
Log.debug { "Printing out matching lines:\n#{matches}" }
end

matches
end
Log.info { "resp #{resp}" }
resp
end

# Checks if any line in the capture file matches the specified regex pattern.
# Returns true if a match is found, otherwise false.
def regex_match?(pattern : Regex) : Bool
if @capture_file_path.empty?
Log.warn { "Cannot find a match using a regular expression before a capture has been started." }
else
Log.info { "Finding a match for regular expression: #{pattern} in file: #{capture_file_path}." }
file_content = File.read(@capture_file_path)
if file_content.scan(pattern).any?
Log.debug { "Match found for regular expression: #{pattern}" }
return true
end
end

def self.log_of_tshark_by_node(command, node, duration="120") : String
Log.info { "log_of_tshark_by_node: command #{command}" }
#create a unique name for the log
rnd = Random.new
name_id = rnd.next_int.abs
tshark_log_name = "/tmp/tshark-#{name_id}.json"
Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" }

#tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
#command= -ni any -Y nas_5gs.mm.type_id -T json
#todo check if tshark running already to keep from saturating network
ClusterTools.exec_by_node("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
Log.info { "after exec by node bg" }
tshark_log_name
end
false
end

def self.log_of_tshark_by_node_bg(command, node, duration="120") : String
Log.info { "log_of_tshark_by_node: command #{command}" }
#create a unique name for the log
rnd = Random.new
name_id = rnd.next_int.abs
tshark_log_name = "/tmp/tshark-#{name_id}.json"
Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" }

#tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
#command= -ni any -Y nas_5gs.mm.type_id -T json
#todo check if tshark running already to keep from saturating network
ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
Log.info { "after exec by node bg" }
tshark_log_name
end
# Retrieves the file path where the capture is stored.
def get_capture_file_path : String
@capture_file_path
end

private def generate_capture_file_path : String
name_id = Random.new.next_int.abs

def self.regex_tshark_log_scan(regex, tshark_log_name)
Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" }
resp = File.read("#{tshark_log_name}")
Log.debug { "tshark_log_name resp: #{resp}" }
if resp
Log.debug { "resp: #{resp}" }
ret = resp.scan(regex)
else
Log.info { "file empty" }
ret = nil
"/tmp/tshark-#{name_id}.pcap"
end
Log.info { "#{regex}: #{ret}" }
ret
end

def self.regex_tshark_log_match(regex, tshark_log_name)
Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" }
resp = File.read("#{tshark_log_name}")
Log.info { "tshark_log_name resp: #{resp}" }
if resp
Log.info { "resp: #{resp}" }
ret = resp =~ regex
else
Log.info { "file empty" }
ret = nil
private def launch_capture(command : String)
begin
# Start tshark capture.
ClusterTools.exec_by_node_bg(command, @node_match.not_nil!)
rescue ex : Exception
error_message = "Could not start tshark capture process: #{ex.message}"
Log.error { error_message }
raise K8sTsharkError.new(error_message)
end
end
Log.info { "#{regex}: #{ret}" }
ret
end

def self.regex_tshark_log(regex, tshark_log_name)
Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" }
regex_found : Bool | Nil
if regex_tshark_log_match(regex, tshark_log_name)
regex_found = true
else
regex_found = false
private def retrieve_pid(command : String, pid_file : String)
begin
# Store the pid of the tshark process.
ClusterTools.exec_by_node_bg(command, @node_match.not_nil!)

# Define the PID file check block
pid_check = -> do
File.exists?(pid_file) && File.size(pid_file) > 0
end

# Wait for pidfile to be readable using repeat_with_timeout
timeout = 10 # seconds
errormsg = "Timeout waiting for PID file."
if repeat_with_timeout(timeout, errormsg) { pid_check.call }
@pid = File.read(pid_file).strip.to_i
Log.info { "tshark process started with PID: #{@pid}" }
else
raise errormsg
end
rescue ex : Exception
error_message = "Could not retrieve the PID of the tshark process: #{ex.message}"
Log.error { error_message }

# Attempt to kill the capture process if it was started.
ClusterTools.exec_by_node_bg("pkill -15 tshark && sleep 1 && pkill -9 tshark", @node_match.not_nil!)
@pid = nil

raise K8sTsharkError.new(error_message)
ensure
File.delete(pid_file) if File.exists?(pid_file)
end
end
regex_found
end

class K8sTsharkError < Exception
def initialize(message : String)
super(message)
end
end
end
Loading

0 comments on commit 567bf60

Please sign in to comment.