Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Rework k8s_tshark functionality and apply the changes in tests #2097

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/tasks/utils/cnf_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,9 @@ module CNFManager
baselines = JaegerManager.unique_services_total
Log.info { "baselines: #{baselines}" }
end

# todo start tshark monitoring the e2 traffic
tshark_log_name = ORANMonitor.start_e2_capture?(config.cnf_config)
capture = ORANMonitor.start_e2_capture?(config.cnf_config)

# todo separate out install methods into a module/function that accepts a block
liveness_time = 0
Expand Down Expand Up @@ -1053,9 +1054,10 @@ module CNFManager
else
tracing_used = false
end

if ORANMonitor.isCNFaRIC?(config.cnf_config)
sleep 30
e2_found = ORANMonitor.e2_session_established?(tshark_log_name)
e2_found = ORANMonitor.e2_session_established?(capture)
else
e2_found = false
end
Expand Down
316 changes: 190 additions & 126 deletions src/tasks/utils/k8s_tshark.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,148 +5,212 @@ require "halite"


module K8sTshark
class TsharkPacketCapture
property capture_file_path : String
property pid : Int32?
private property node_match : JSON::Any?
svteb marked this conversation as resolved.
Show resolved Hide resolved

def self.log_of_tshark_by_label(command, label_key, label_value, duration="120") : String
Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" }
def initialize
@capture_file_path = ""
end

def finalize
if @pid
terminate_capture
end
end

# Method to provide a block context for capture by label.
def self.begin_capture_by_label(label_key : String, label_value : String, command : String = "", &block : TsharkPacketCapture ->)
capture = new
begin
capture.begin_capture_by_label(label_key, label_value, command)
yield capture
ensure
capture.terminate_capture
end
end

# Method to provide a block context for capture by node.
def self.begin_capture_by_node(node : JSON::Any, command : String = "", &block : TsharkPacketCapture ->)
capture = new
begin
capture.begin_capture_by_node(node, command)
yield capture
ensure
capture.terminate_capture
end
end

# Starts a tshark packet capture on the node where the pod with the specified label is running.
# label_key and label_value: Used to identify the pod's label.
# command: Parameters to be passed to tshark.
def begin_capture_by_label(label_key : String, label_value : String, command : String = "")
Log.info { "Searching for the pod matching the label '#{label_key}:#{label_value}'."}
all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list)
pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value)
first_labeled_pod = pods[0]?
Log.info { "first_labeled_pod: #{first_labeled_pod}" }
if first_labeled_pod && first_labeled_pod.dig?("metadata", "name")
Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" }
pod_name = first_labeled_pod.dig("metadata", "name")
Log.info { "pod_name: #{pod_name}" }
nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod)
node = nodes.first
#create a unique name for the log
# rnd = Random.new
# name_id = rnd.next_int
# tshark_log_name = "/tmp/tshark-#{name_id}.json"
# Log.info { "tshark_log_name #{tshark_log_name}" }
#
# #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
# #command= -ni any -Y nas_5gs.mm.type_id -T json
# #todo check if tshark running already to keep from saturating network
# #todo play with reducing default duration
# ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
# ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node)
# Log.info { "after exec by node bg" }
# resp = tshark_log_name
resp = log_of_tshark_by_node(command, node, duration)
pod_match = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value).first?

unless pod_match && pod_match.dig?("metadata", "name")
error_message = "Pod with label '#{label_key}:#{label_value}' could not be found."
Log.error { error_message }
raise K8sTsharkError.new(error_message)
end

pod_name = pod_match.dig("metadata", "name")
Log.info { "Pod '#{pod_name}'' matches the label '#{label_key}:#{label_value}'." }

Log.info { "Searching for the node running the pod '#{pod_name}'." }
@node_match = KubectlClient::Get.nodes_by_pod(pod_match).first

unless @node_match && @node_match.not_nil!.dig?("metadata", "name")
error_message = "Node for pod '#{pod_name}' could not be found."
Log.error { error_message }
raise K8sTsharkError.new(error_message)
end

node_name = node_match.not_nil!.dig("metadata", "name")
Log.info { "Pod '#{pod_name}' is running on node '#{node_name}'." }

begin_capture_common(command)
end

# Starts a tshark packet capture on the specified node.
# node: The node where the capture should be performed.
# command: Parameters to be passed to tshark.
# duration: Optional; specifies the capture duration, eliminating the need to call terminate_capture manually.
def begin_capture_by_node(node : JSON::Any, command : String = "")
@node_match = node
begin_capture_common(command)
end

# Common method to unify capture by label and node.
private def begin_capture_common(command : String)
if @pid
Log.warn { "Ongoing capture process exists, terminate it or create a new capture." }
return
end

Log.info { "Starting tshark capture with command: #{command}." }

@capture_file_path = generate_capture_file_path()
Log.info { "Capturing packets on path: #{@capture_file_path}." }

# Other possible options to resolve the pid conundrum:
# 1. pgrep tshark -f -x "tshark #{command}"
# 2. ...; echo $! > /tmp/pidfile
# 3. bake in 'echo $$!', retrieve from some file
# 4. fix kubectl_client to return the pid of the process that is launched, not the shell
pid_file = "/tmp/pidfile"
pid_command = "ps -eo pid,cmd,start --sort=start_time | grep '[t]shark' | tail -1 | awk '{print $1}' > #{pid_file}"
capture_command = "tshark #{command} > #{@capture_file_path} 2>&1"

launch_capture(capture_command)
retrieve_pid(pid_command, pid_file)
end

# Terminates the tshark packet capture process.
def terminate_capture
if @pid
Log.info { "Terminating packet capture with PID: #{@pid}." }
Log.info { "Capture collected on path: #{@capture_file_path}." }

# Some tshark captures were left in zombie states if only kill/kill -9 was invoked.
ClusterTools.exec_by_node_bg("kill -15 #{@pid}", @node_match.not_nil!)
sleep 1
ClusterTools.exec_by_node_bg("kill -9 #{@pid}", @node_match.not_nil!)

@pid = nil
@node_match = nil
else
resp = "label key:#{label_key} value: #{label_value} not found"
Log.warn { "No active capture process to terminate." }
end
Log.info { "resp #{resp}" }
resp
end
end

def self.log_of_tshark_by_label_bg(command, label_key, label_value, duration="120") : String
Log.info { "log_of_tshark_by_label command label_key label value: #{command} #{label_key} #{label_value}" }
all_pods = KubectlClient::Get.pods_by_nodes(KubectlClient::Get.schedulable_nodes_list)
pods = KubectlClient::Get.pods_by_label(all_pods, label_key, label_value)
first_labeled_pod = pods[0]?
Log.info { "first_labeled_pod: #{first_labeled_pod}" }
if first_labeled_pod && first_labeled_pod.dig?("metadata", "name")
Log.info { "first_labeled_pod #{first_labeled_pod} metadata name: #{first_labeled_pod.dig?("metadata", "name")}" }
pod_name = first_labeled_pod.dig("metadata", "name")
Log.info { "pod_name: #{pod_name}" }
nodes = KubectlClient::Get.nodes_by_pod(first_labeled_pod)
node = nodes.first
#create a unique name for the log
# rnd = Random.new
# name_id = rnd.next_int
# tshark_log_name = "/tmp/tshark-#{name_id}.json"
# Log.info { "tshark_log_name #{tshark_log_name}" }
#
# #tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
# #command= -ni any -Y nas_5gs.mm.type_id -T json
# #todo check if tshark running already to keep from saturating network
# #todo play with reducing default duration
# ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
# ClusterTools.exec_by_node_bg("tshark -ni any -a duration:120 -Y nas_5gs.mm.type_id -T json 2>&1 | tee #{tshark_log_name}", node)
# Log.info { "after exec by node bg" }
# resp = tshark_log_name
resp = log_of_tshark_by_node_bg(command, node, duration: "120")
else
resp = "label key:#{label_key} value: #{label_value} not found"
# Searches the capture file for lines matching the specified regex pattern.
# Returns an array of matching lines.
def regex_search(pattern : Regex) : Array(String)
matches = [] of String

if @capture_file_path.empty?
Log.warn { "Cannot find a match using a regular expression before a capture has been started." }
else
Log.info { "Collecting lines matching the regular expression #{pattern}." }
file_content = File.read(@capture_file_path)
matches = file_content.scan(pattern).map(&.string)
Log.debug { "Printing out matching lines:\n#{matches}" }
end

matches
end
Log.info { "resp #{resp}" }
resp
end

# Checks if any line in the capture file matches the specified regex pattern.
# Returns true if a match is found, otherwise false.
def regex_match?(pattern : Regex) : Bool
if @capture_file_path.empty?
Log.warn { "Cannot find a match using a regular expression before a capture has been started." }
else
Log.info { "Finding a match for regular expression: #{pattern} in file: #{capture_file_path}." }
file_content = File.read(@capture_file_path)
if file_content.scan(pattern).any?
Log.debug { "Match found for regular expression: #{pattern}" }
return true
end
end

def self.log_of_tshark_by_node(command, node, duration="120") : String
Log.info { "log_of_tshark_by_node: command #{command}" }
#create a unique name for the log
rnd = Random.new
name_id = rnd.next_int.abs
tshark_log_name = "/tmp/tshark-#{name_id}.json"
Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" }

#tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
#command= -ni any -Y nas_5gs.mm.type_id -T json
#todo check if tshark running already to keep from saturating network
ClusterTools.exec_by_node("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
Log.info { "after exec by node bg" }
tshark_log_name
end
false
end

def self.log_of_tshark_by_node_bg(command, node, duration="120") : String
Log.info { "log_of_tshark_by_node: command #{command}" }
#create a unique name for the log
rnd = Random.new
name_id = rnd.next_int.abs
tshark_log_name = "/tmp/tshark-#{name_id}.json"
Log.info { "log_of_tshark_by_node tshark_log_name #{tshark_log_name}" }

#tshark -ni any -Y nas_5gs.mm.type_id -T json 2>&1 | tee hi.log
#command= -ni any -Y nas_5gs.mm.type_id -T json
#todo check if tshark running already to keep from saturating network
ClusterTools.exec_by_node_bg("tshark #{command} -a duration:#{duration} 2>&1 | tee #{tshark_log_name}", node)
Log.info { "after exec by node bg" }
tshark_log_name
end
# Retrieves the file path where the capture is stored.
def get_capture_file_path : String
@capture_file_path
end

private def generate_capture_file_path : String
name_id = Random.new.next_int.abs

def self.regex_tshark_log_scan(regex, tshark_log_name)
Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" }
resp = File.read("#{tshark_log_name}")
Log.debug { "tshark_log_name resp: #{resp}" }
if resp
Log.debug { "resp: #{resp}" }
ret = resp.scan(regex)
else
Log.info { "file empty" }
ret = nil
"/tmp/tshark-#{name_id}.pcap"
end
Log.info { "#{regex}: #{ret}" }
ret
end

def self.regex_tshark_log_match(regex, tshark_log_name)
Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" }
resp = File.read("#{tshark_log_name}")
Log.info { "tshark_log_name resp: #{resp}" }
if resp
Log.info { "resp: #{resp}" }
ret = resp =~ regex
else
Log.info { "file empty" }
ret = nil
private def launch_capture(command : String)
begin
# Start tshark capture.
ClusterTools.exec_by_node_bg(command, @node_match.not_nil!)
rescue ex : Exception
error_message = "Could not start tshark capture process: #{ex.message}"
Log.error { error_message }
raise K8sTsharkError.new(error_message)
end
end
Log.info { "#{regex}: #{ret}" }
ret
end

def self.regex_tshark_log(regex, tshark_log_name)
Log.info { "regex_tshark_log regex tshark_log_name: #{regex} #{tshark_log_name}" }
regex_found : Bool | Nil
if regex_tshark_log_match(regex, tshark_log_name)
regex_found = true
else
regex_found = false
private def retrieve_pid(command : String, pid_file : String)
begin
# Store the pid of the tshark process.
ClusterTools.exec_by_node_bg(command, @node_match.not_nil!)

error_message = "Could not retrieve the PID of the tshark process"

# Wait for pidfile to be readable using repeat_with_timeout
pid_found = repeat_with_timeout(timeout: 10, errormsg: error_message) do
File.exists?(pid_file) && File.size(pid_file) > 0
end

if pid_found
@pid = File.read(pid_file).strip.to_i
Log.info { "tshark process started with PID: #{@pid}" }
else
# Attempt to kill the capture process if it was started.
ClusterTools.exec_by_node_bg("pkill -15 tshark && sleep 1 && pkill -9 tshark", @node_match.not_nil!)
raise K8sTsharkError.new(error_message)
end
ensure
File.delete(pid_file) if File.exists?(pid_file)
end
end
regex_found
end

class K8sTsharkError < Exception
def initialize(message : String)
super(message)
end
end
end
Loading
Loading