Skip to content

ashwin-patil/blue-teaming-with-kql

Repository files navigation

blue-teaming-with-kql

Repository with Sample KQL Query examples for Threat Hunting

This folder has various KQL examples related to Threat Hunting/Blue Teaming presented at [Blue Team Village at GrayHat 2020] (https://grayhat.co/event/blue-teaming-with-kusto-query-language-kql/) and a presentation given at KQLCafe Podcast- Aug 2022.

Presentation:

GrayHat-BlueTeamingwithKQL

Blue Teaming with KQL – 2022 KQL|Café Edition

GrayHat Talk Recorded Video

IMAGE ALT TEXT HERE

KQLCafe-Aug2022 Recorded Video

[PendingforPublishing]

Jupyter Notebook :

Click on nbviewer Badge :: nbviewer

KQLCafe-2022

Structure of Basic KQL Query

  1. Variable Declaration
  2. Table Name
  3. Datetime Filtering
  4. Event Type Filtering
  5. Output Formatting/ Display selected Fields
  6. Limit Results

Image

KQL Basic Searches

Search for presence of keyword and output tables where it is present

  search "badaccount" 
  | where TimeGenerated > ago(4h) 
  | summarize count() by $tableName

Search for IP in multiple tables - irrespective of field names

  search "8.8.8.8" in ("AzureNetworkAnalytics_CL", "CommonSecurityLog") 
  | where TimeGenerated > ago(1h)
  | limit 100

Sort by time

  AzureActivity 
  | where TimeGenerated > ago(1h) 
  | sort by TimeGenerated desc

Filter by value

  SecurityEvent
  | where TimeGenerated > ago(1h)
  | where EventID == 4688
  | limit 100

Aggregation by Field name

  OfficeActivity
  | where TimeGenerated > ago(1h)
  | summarize count() by OperationName

Exploring Tables and Schemas

DataTypes ingested along with the Sizes

  Usage
  | where TimeGenerated > ago(1d)
  | summarize DataSizeinMB = sum(Quantity) by DataType
  | sort by DataSizeinMB desc 

Schema and datatypes for each field of Table

  AzureActivity
  | getschema 

Tables across Workspace Queries

  union workspace('WorkSpace01').Heartbeat, workspace('WorkSpace02').Heartbeat
  | where TimeGenerated > ago(1d)
  | where Computer == "CH-UBNTVM"
  | limit 100

Asset/Device Details

Asset Details

  Heartbeat
  | where ComputerIP == "40.71.227.249"
  | summarize LastReported = max(TimeGenerated) by Computer, ComputerIP, RemoteIPCountry, 
  ComputerEnvironment, OSType, OSMajorVersion, OSMinorVersion, SubscriptionId, TenantId

Microsoft 365 Defender - Device Information

  DeviceInfo
  | where DeviceName == "contosohost" and isnotempty(OSPlatform)
  | project TenantId, DeviceName, PublicIP, IsAzureADJoined, OSPlatform, OSBuild, OSArchitecture, LoggedOnUsers

Microsoft 365 Defender - Hostname based on Private IP addresses

  DeviceNetworkInfo
  | mv-expand IPAddresses
  | extend IPAddress = tostring(parse_json(IPAddresses).IPAddress)
  | where IPAddress== '10.0.0.100' 
  | project DeviceName, NetworkAdapterType, TunnelType, MacAddress

Query Parameterization

Query Parameterization - Dynamic List - in~ operator

  let timeframe = 1d;
  let EventNameList = dynamic(["UpdateTrail","DeleteTrail","StopLogging","DeleteFlowLogs","DeleteEventBus"]);
  AWSCloudTrail
  | where TimeGenerated > ago(timeframe)
  | where EventName in~ (EventNameList)
  | limit 100

Query Parameterization - Dynamic list - has_any operator

Expensive Computes
  let timeframe = 1d;
  let tokens = dynamic(["416","208","128","120","96","80","72","64","48","44","40","g5","gs5","g4","gs4","nc12","nc24","nv12"]);
  let operationList = dynamic(["Create or Update Virtual Machine", "Create Deployment"]);
  AzureActivity
  | where TimeGenerated >= ago(timeframe)
  | where OperationName in (operationList)
  | where ActivityStatus == "Accepted" 
  | where isnotempty(Properties)
  | extend vmSize = tolower(tostring(parse_json(tostring(parse_json(tostring(parse_json(tostring
  (parse_json(Properties).responseBody)).properties)).hardwareProfile)).vmSize))
  | where isnotempty(vmSize)
  | where vmSize has_any (tokens) 
  | limit 100

Contains vs has demo

  let CustomLogs = datatable(Username:string)
  [
      "abcadmin123",
      "admin123",
      "admin",
      "samadmin"
  ];
  CustomLogs 
  //| where Username has "admin"
  | where Username contains "admin"

Dynamic DataTypes

Datetime

Todatetime demo

  let CustomLogs = datatable(TimeGenerated:string)
  [
      "2020-10-23 01:00:00",
      "2020-10-24 02:00:00"
  ];
  CustomLogs 
  | extend TimeGenerated1 = todatetime(TimeGenerated)
  | getschema

Datetime conversion demo

  let CustomLogs = datatable(TimeGenerated:string)
  [
      "2020-10-23 01:00:00",
      "2020-10-24 02:00:00"
  ];
  CustomLogs 
  | extend TimeGenerated1 = todatetime(TimeGenerated)
  | extend Day = format_datetime(TimeGenerated1, "yyyy-MM-dd")

Regex Extraction

Matches regex demo

  let PrivateIPregex = @'^127\.|^10\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[0-1]\.|^192\.168\.';
  let endtime = 1d;
  CommonSecurityLog
  | where TimeGenerated >= ago(endtime) 
  | where DeviceVendor =~ "Cisco"
  | where DeviceAction =~ "denied"
  | extend SourceIPType = iff(SourceIP matches regex PrivateIPregex,"private" ,"public" )
  | where SourceIPType == "public"
  | summarize count() by SourceIP
  | join (
    // Successful signins from IPs blocked by the firewall solution are suspect
    // Include fully successful sign-ins, but also ones that failed only at MFA stage
    // as that supposes the password was sucessfully guessed.
      SigninLogs
      | where ResultType in ("0", "50074", "50076") 
      ) on $left.SourceIP == $right.IPAddress
  | limit 100

Extract Key value pair from AdditionalExtension field in CommonSecurityLog

  let CommonSecurityLog = datatable (DeviceVendor: string, AdditionalExtensions: string)
  [
  "ZScaler", "country=United States;sourceAddress=10.10.10.10;sourcehostname=http://abc.ac.com;deviceTranslatedPort=60095;tunnelType=IPSEC;dnat=No;stateful=Yes;reason=Allow DNS;cs6label=threatname;destCountry=Italy;avgduration=143",
  "Fortinet", "FortinetFortiGatelogid=1059028704;cat=utm:app-ctrl;FortinetFortiGatesubtype=app-ctrl;FortinetFortiGateeventtype=signature;FortinetFortiGatevd=root",
  "Palo Alto Networks", "cat=general;PanOSDGl1=0;PanOSDGl2=0;PanOSDGl3=0;PanOSDGl4=0;PanOSVsysName=;PanOSActionFlags=0x0"
  ];
  CommonSecurityLog
  | extend AdditionalExtensions = extract_all(@"(?P<key>\w+)=(?P<value>[a-zA-Z0-9-_:/@. ]+)", dynamic(["key","value"]), AdditionalExtensions)
  | mv-apply AdditionalExtensions on (
  summarize AdditionalExtensionsParsed = make_bag(pack(tostring(AdditionalExtensions[0]), AdditionalExtensions[1]))
  )

Functions

User Defined

Function Demo - GetAllAlertsOnHost

Source - https://github.com/Azure/Azure-Sentinel/blob/master/Exploration%20Queries/InputEntity_Host/AlertsOnHost.txt

  let GetAllAlertsOnHost = (suspiciousEventTime:datetime, v_Host:string){
  //-3d and +6h as some alerts fire after accumulation of events
  let v_StartTime = suspiciousEventTime-3d;
  let v_EndTime = suspiciousEventTime+6h;
  SecurityAlert
  | where TimeGenerated between (v_StartTime .. v_EndTime)
  // expand JSON properties
  | extend Extprop = parsejson(ExtendedProperties)
  | extend Computer = toupper(tostring(Extprop["Compromised Host"]))
  | where Computer contains v_Host
  | project TimeGenerated, AlertName, Computer, ExtendedProperties
  };
  // change datetime value and hostname value below
  GetAllAlertsOnHost(datetime('2020-10-23T00:00:00.000'), toupper("VICTIM00"))

Built-in Functions

Parse_path demo

  let SecurityEvent = datatable (EventID: string, ​ShareLocalPath: string)
  [
  "5145",@"\\shared\users\temp\file.txt.gz",
  "5145",@"\\shared\users\temp\bad.exe",
  "5145",@"\\shared\users\temp\script.ps1"
  ];
  SecurityEvent
  | where EventID == 5145
  | extend ShareLocalPathParsed = parse_path(ShareLocalPath)
  | extend extension = tostring(parse_json(ShareLocalPathParsed).Extension),
   FileName = tostring(parse_json(ShareLocalPathParsed).Filename), 
   DirName = tostring(parse_json(ShareLocalPathParsed).DirectoryName)

ip4_is_match with lookup demo

  let lookup = dynamic (["13.66.60.119/32","13.66.143.220/30","13.66.202.14/32"]);
  let AzureSubnetMatchedIPs=materialize(
  CommonSecurityLog
  | where TimeGenerated > ago(4h)
  | mv-apply l=lookup to typeof(string) on
  (
  where ipv4_is_match (DestinationIP, l)
  )
  | project-away l);
  AzureSubnetMatchedIPs
  | limit 100 

Windows XML Parsing of Dynamic Field - EventData

   Event
   | where TimeGenerated > ago(4h)
   | extend EventData = parse_xml(EventData).DataItem.EventData.Data
   | mv-expand bagexpansion=array EventData
   | evaluate bag_unpack(EventData)
   | extend Key=tostring(['@Name']), Value=['#text']
   | evaluate pivot(Key, any(Value), TimeGenerated, EventLog, Computer, EventID)

Externaldata Demo

KQL Blog - Using External data sources to enrich network logs using Azure storage and KQL

  let covidIndicators = (externaldata(TimeGenerated:datetime, FileHashValue:string, FileHashType: string )
  [@"https://raw.githubusercontent.com/Azure/Azure-Sentinel/master/Sample%20Data/Feeds/Microsoft.Covid19.Indicators.csv"]
  with (format="csv"));
  covidIndicators

Externaldata - Azure IP ranges feed.

Link is not static and gets expired as new content arrives

  let AzureIPRangesPublicCloud = (externaldata(changeNumber:string, cloud:string, values: dynamic)
  [@"https://download.microsoft.com/download/7/1/D/71D86715-5596-4529-9B13-DA13A5DE5B63/ServiceTags_Public_20201019.json"] 
  with (format="multijson"));
  let AzureSubnetRangeAllowlist = AzureIPRangesPublicCloud 
  | mv-expand values 
  | extend addressPrefixes = parse_json(parse_json(values).properties).addressPrefixes; 
  AzureSubnetRangeAllowlist

Time Series Analysis

KQL Blog - Time Series Analysis and it`s applications in Security

Time Series Analysis - Process Execution Anomaly

  let starttime = 14d;
  let endtime = 1d;
  let timeframe = 1h;
  let TotalEventsThreshold = 5;
  let ExeList = dynamic(["powershell.exe","cmd.exe","wmic.exe","psexec.exe","cacls.exe","rundll.exe"]);
  let TimeSeriesData = 
  SecurityEvent
  | where EventID == 4688 | extend Process = tolower(Process)
  | where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
  | where Process in (ExeList)
  | project TimeGenerated, Computer, AccountType, Account, Process
  | make-series Total=count() on TimeGenerated from ago(starttime) to ago(endtime) step timeframe by Process;
  let TimeSeriesAlerts = TimeSeriesData
  | extend (anomalies, score, baseline) = series_decompose_anomalies(Total, 1.5, -1, 'linefit')
  | mv-expand Total to typeof(double), TimeGenerated to typeof(datetime), anomalies to typeof(double), score to typeof(double), baseline to typeof(long)
  | where anomalies > 0
  | project Process, TimeGenerated, Total, baseline, anomalies, score
  | where Total > TotalEventsThreshold;
  TimeSeriesAlerts
  | join (
  SecurityEvent
  | where EventID == 4688 | extend Process = tolower(Process)
  | summarize CommandlineCount = count() by bin(TimeGenerated, 1h), Process, CommandLine, Computer, Account
  ) on Process, TimeGenerated 
  | project AnomalyHour = TimeGenerated, Computer, Account, Process, CommandLine, CommandlineCount, Total, baseline, anomalies, score 
  | extend timestamp = AnomalyHour, AccountCustomEntity = Account, HostCustomEntity = Computer

Network Beaconing

Reference Work:

KQL Blog - Detect Network Beaconing via Intr-Request time delta patterns in Azure Sentinel

  let starttime = 2d;
  let endtime = 1d;
  let TimeDeltaThreshold = 10;
  let TotalEventsThreshold = 15;
  let PercentBeaconThreshold = 80;
  let PrivateIPregex = @'^127\.|^10\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[0-1]\.|^192\.168\.';
  let DestIPList = CommonSecurityLog
  | where DeviceVendor == "Palo Alto Networks" and Activity == "TRAFFIC"
  | where TimeGenerated between (ago(starttime)..ago(endtime))
  | extend DestinationIPType = iff(DestinationIP matches regex PrivateIPregex,"private" ,"public" )
  | where DestinationIPType == "public"
  | summarize dcount(SourceIP) by DestinationIP
  | where dcount_SourceIP < 5
  | distinct DestinationIP;
  CommonSecurityLog
  | where DeviceVendor == "Palo Alto Networks" and Activity == "TRAFFIC"
  | where TimeGenerated between (ago(starttime)..ago(endtime))
  | where DestinationIP in ((DestIPList))
  | project TimeGenerated, DeviceName, SourceUserID, SourceIP, SourcePort, DestinationIP, DestinationPort, ReceivedBytes, SentBytes
  | sort by SourceIP asc,TimeGenerated asc, DestinationIP asc, DestinationPort asc
  | serialize
  | extend nextTimeGenerated = next(TimeGenerated, 1), nextSourceIP = next(SourceIP, 1)
  | extend TimeDeltainSeconds = datetime_diff('second',nextTimeGenerated,TimeGenerated)
  | where SourceIP == nextSourceIP
  //Whitelisting criteria/ threshold criteria
  | where TimeDeltainSeconds > TimeDeltaThreshold 
  | project TimeGenerated, TimeDeltainSeconds, DeviceName, SourceUserID, SourceIP, SourcePort, DestinationIP, DestinationPort, ReceivedBytes, SentBytes
  | summarize count(), sum(ReceivedBytes), sum(SentBytes), make_list(TimeDeltainSeconds) 
  by TimeDeltainSeconds, bin(TimeGenerated, 1h), DeviceName, SourceUserID, SourceIP, DestinationIP, DestinationPort
  | summarize (MostFrequentTimeDeltaCount, MostFrequentTimeDeltainSeconds) = arg_max(count_, TimeDeltainSeconds), TotalEvents=sum(count_), TotalSentBytes = sum(sum_SentBytes), TotalReceivedBytes = sum(sum_ReceivedBytes) 
  by bin(TimeGenerated, 1h), DeviceName, SourceUserID, SourceIP, DestinationIP, DestinationPort
  | where TotalEvents > TotalEventsThreshold 
  | extend BeaconPercent = MostFrequentTimeDeltaCount/toreal(TotalEvents) * 100
  | where BeaconPercent > PercentBeaconThreshold
  | extend timestamp = TimeGenerated, IPCustomEntity = DestinationIP, AccountCustomEntity = SourceUserID, HostCustomEntity = DeviceName

KQL Programmatic Interfaces

QueryProvider Demo

Demo

KQL Gallery

Curated list of KQL queries worth highlighting

One-stop KQL query for most regex (IP, URL, API Access tokens, Crypto Wallets) needs.
Regex Credits - bee-san/PyWhat Github

let pywhatregex = (externaldata(Name:string, Regex:string, plural_name: string, Description: string, Rarity:string, URL: string, Tags: dynamic )
[@"https://raw.githubusercontent.com/bee-san/pyWhat/main/pywhat/Data/regex.json"] with (format="multijson"));
pywhatregex

subquery to select a specific regex

let pywhatregex = (externaldata(Name:string, Regex:string, plural_name: string, Description: string, Rarity:string, URL: string, Tags: dynamic )
[@"https://raw.githubusercontent.com/bee-san/pyWhat/main/pywhat/Data/regex.json"] with (format="multijson"));
let githubaccesstokenregex = pywhatregex | where Name == "GitHub Access Token" | project Regex;
githubaccesstokenregex

KQLCafe-2022

Practical Detection Engineering/Hunting with KQL

Simple aggregation and threshold-based query

Suspicious enumeration using Adfind tool

let lookupwindow = 2m;
  let threshold = 3; //number of commandlines in the set below
  let DCADFSServersList = dynamic (["DCServer01", "DCServer02", "ADFSServer01"]); // Enter a reference list of hostnames for your DC/ADFS servers
  let tokens = dynamic(["objectcategory","domainlist","dcmodes","adinfo","trustdmp","computers_pwdnotreqd","Domain Admins", "objectcategory=person", "objectcategory=computer", "objectcategory=*"]);
  SecurityEvent
  //| where Computer in (DCADFSServersList) // Uncomment to limit it to your DC/ADFS servers list if specified above or any pattern in hostnames (startswith, matches regex, etc).
  | where EventID == 4688
  | where CommandLine has_any (tokens)
  | where CommandLine matches regex "(.*)>(.*)"
  | summarize Commandlines = make_set(CommandLine), LastObserved=max(TimeGenerated) by bin(TimeGenerated, lookupwindow), Account, Computer, ParentProcessName, NewProcessName
  | extend Count = array_length(Commandlines)
  | where Count > threshold

Bringing context from other data sources

Privileged Accounts - Failed MFA Populating Privileged accounts dynamically via IdentityInfo table.

let starttime = 2d;
  let endtime = 1d;
  let aadFunc = (tableName:string){
  IdentityInfo
  | where AssignedRoles contains "Admin"
  | mv-expand AssignedRoles
  | extend Roles = tostring(AssignedRoles), AccountUPN = tolower(AccountUPN)
  | where Roles contains "Admin"
  | distinct Roles, AccountUPN
  | join kind=inner (
    // Failed Signins attempts with reasoning related to MFA.
    table(tableName)
    | where TimeGenerated between(ago(starttime)..ago(endtime))
    | where ResultDescription has_any ("MFA", "second factor", "multi-factor", "second factor") or ResultType in (50074, 50076, 50079, 50072, 53004, 500121)
  ) on $left.AccountUPN == $right.UserPrincipalName
  | extend timestamp = TimeGenerated, IPCustomEntity = IPAddress, AccountCustomEntity = UserPrincipalName
  };
  let aadSignin = aadFunc("SigninLogs");
  let aadNonInt = aadFunc("AADNonInteractiveUserSignInLogs");
  union isfuzzy=true aadSignin, aadNonInt

Rare events – Not historically seen events

Palo Alto Threat signatures from Unusual IP addresses

let starttime = 7d;
  let endtime = 1d;
  let timeframe = 1h;
  let HistThreshold = 25; 
  let CurrThreshold = 10; 
  let HistoricalThreats = CommonSecurityLog
  | where isnotempty(SourceIP)
  | where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
  | where DeviceVendor =~ "Palo Alto Networks"
  | where Activity =~ "THREAT" and SimplifiedDeviceAction =~ "alert" 
  | where DeviceEventClassID in ('spyware', 'scan', 'file', 'vulnerability', 'flood', 'packet', 'virus','wildfire', 'wildfire-virus')
  | summarize TotalEvents = count(), ThreatTypes = make_set(DeviceEventClassID), DestinationIpList = make_set(DestinationIP), FirstSeen = min(TimeGenerated) , LastSeen = max(TimeGenerated) by SourceIP, DeviceAction, DeviceVendor;
  let CurrentHourThreats =  CommonSecurityLog
  | where isnotempty(SourceIP)
  | where TimeGenerated > ago(timeframe)
  | where DeviceVendor =~ "Palo Alto Networks"
  | where Activity =~ "THREAT" and SimplifiedDeviceAction =~ "alert" 
  | where DeviceEventClassID in ('spyware', 'scan', 'file', 'vulnerability', 'flood', 'packet', 'virus','wildfire', 'wildfire-virus')
  | summarize TotalEvents = count(), ThreatTypes = make_set(DeviceEventClassID), DestinationIpList = make_set(DestinationIP), FirstSeen = min(TimeGenerated) , LastSeen = max(TimeGenerated) by SourceIP, DeviceAction, DeviceProduct, DeviceVendor;
  CurrentHourThreats 
  | where TotalEvents < CurrThreshold
  | join kind = leftanti (HistoricalThreats 
  | where TotalEvents > HistThreshold) on SourceIP

Pivot- To create heatmap like data structure to identify hourly spikes

Use granny-asc option with project-reorder to sort columns with numbers as name.

let end = now();
let start = end - 7d;
SecurityEvent
| where EventID == 4625
| where TimeGenerated >= startofday(start)
| where TimeGenerated <= startofday(end)
| extend
    HourOfLogin = toint(hourofday(TimeGenerated)),
    DayNumberofWeek = dayofweek(TimeGenerated),
    Date = format_datetime(TimeGenerated, "yyyy-MM-dd")
| extend DayofWeek = case(DayNumberofWeek == "00:00:00", "Sunday", DayNumberofWeek == "1.00:00:00", "Monday", DayNumberofWeek == "2.00:00:00", "Tuesday", DayNumberofWeek == "3.00:00:00", "Wednesday", DayNumberofWeek == "4.00:00:00", "Thursday", DayNumberofWeek == "5.00:00:00", "Friday", DayNumberofWeek == "6.00:00:00", "Saturday", "InvalidTimeStamp")
| evaluate pivot(HourOfLogin, count(), DayofWeek, Date)
| project-reorder Date, DayofWeek, * granny-asc 
| sort by Date asc

Extending KQL

Github Action for dynamic TI Feeds

  • Allows to connect external data sources.
  • Limited to static sites or blob storage data sources.

Use case - Nord VPN API not accessible via externaldata

Threat Essentials - Signins from Nord VPN Providers

Github Actions:

ADX/LA Interoperability

  • KQL has varying support in Azure Data Explorer (ADX) and Azure Log Analytics(LA)/Sentinel.
  • You can connect both products from each other and can run native KQL against it.
  • Connect additional data sources without duplicating data.
  • Use Kusto explorer client with rich features on LA data.
  • Extend support of missing KQL operators in LA/Sentinel.

Connect ADX via LA: Cross-resource query Azure Data Explorer by using Azure Monitor - Azure Monitor | Microsoft Docs

Connect LA via ADX: Query data in Azure Monitor with Azure Data Explorer | Microsoft Docs