Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat]Azure module - activity logs #13776

Merged
merged 24 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ grouped in the following categories:
* <<exported-fields-apache>>
* <<exported-fields-auditd>>
* <<exported-fields-aws>>
* <<exported-fields-azure>>
* <<exported-fields-azure>>
* <<exported-fields-beat-common>>
* <<exported-fields-cef>>
* <<exported-fields-cef-module>>
Expand Down Expand Up @@ -1241,6 +1243,31 @@ type: keyword

--

[[exported-fields-azure]]
== azure fields

azure Module



[float]
=== azure




[float]
=== activitylogs

Fields for azure Activity logs.


[[exported-fields-azure]]
== Decode azure processor fields fields

Common Event Format (azure) data.


[[exported-fields-beat-common]]
== Beat fields

Expand Down
60 changes: 60 additions & 0 deletions filebeat/docs/modules/azure.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
////
This file is generated! See scripts/docs_collector.py
////

[[filebeat-module-azure]]
:modulename: azure
:has-dashboards: true

== azure module

This is the azure module.

include::../include/what-happens.asciidoc[]

[float]
=== Compatibility

TODO: document with what versions of the software is this tested


include::../include/running-modules.asciidoc[]

[float]
=== Example dashboard

This module comes with a sample dashboard. For example:

TODO: include an image of a sample dashboard. If you do not include a dashboard,
remove this section and set `:has-dashboards: false` at the top of this file.

include::../include/configuring-intro.asciidoc[]

TODO: provide an example configuration

:fileset_ex: {fileset}

include::../include/config-option-intro.asciidoc[]

TODO: document the variables from each fileset. If you're describing a variable
that's common to other modules, you can reuse shared descriptions by including
the relevant file. For example:

[float]
==== `{fileset}` log fileset settings

include::../include/var-paths.asciidoc[]

:has-dashboards!:

:fileset_ex!:

:modulename!:


[float]
=== Fields

For a description of each field in the module, see the
<<exported-fields-azure,exported fields>> section.

2 changes: 2 additions & 0 deletions filebeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This file is generated! See scripts/docs_collector.py
* <<filebeat-module-apache>>
* <<filebeat-module-auditd>>
* <<filebeat-module-aws>>
* <<filebeat-module-azure>>
* <<filebeat-module-cef>>
* <<filebeat-module-cisco>>
* <<filebeat-module-coredns>>
Expand Down Expand Up @@ -44,6 +45,7 @@ include::modules-overview.asciidoc[]
include::modules/apache.asciidoc[]
include::modules/auditd.asciidoc[]
include::modules/aws.asciidoc[]
include::modules/azure.asciidoc[]
include::modules/cef.asciidoc[]
include::modules/cisco.asciidoc[]
include::modules/coredns.asciidoc[]
Expand Down
62 changes: 62 additions & 0 deletions filebeat/input/kafka/azure_log_patterns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka

import "time"

const (
ActivityLogs = "ActivityLogs"
narph marked this conversation as resolved.
Show resolved Hide resolved
narph marked this conversation as resolved.
Show resolved Hide resolved
AuditLogs = "AuditLogs"
)

// ActivityLogs structure matches the azure activity log format
narph marked this conversation as resolved.
Show resolved Hide resolved
narph marked this conversation as resolved.
Show resolved Hide resolved
type ActivityLog struct {
Time time.Time `json:"time"`
ResourceID string `json:"resourceId"`
OperationName string `json:"operationName"`
Category string `json:"category"`
ResultType string `json:"resultType"`
ResultSignature string `json:"resultSignature"`
DurationMs int `json:"durationMs"`
CallerIPAddress string `json:"callerIpAddress"`
CorrelationID string `json:"correlationId"`
Identity struct {
Authorization struct {
Scope string `json:"scope"`
Action string `json:"action"`
Evidence struct {
Role string `json:"role"`
RoleAssignmentScope string `json:"roleAssignmentScope"`
RoleAssignmentID string `json:"roleAssignmentId"`
RoleDefinitionID string `json:"roleDefinitionId"`
PrincipalID string `json:"principalId"`
PrincipalType string `json:"principalType"`
} `json:"evidence"`
} `json:"authorization"`
Claims struct {
Aud string `json:"aud"`
Iss string `json:"iss"`
Iat string `json:"iat"`
Nbf string `json:"nbf"`
Exp string `json:"exp"`
Aio string `json:"aio"`
Appid string `json:"appid"`
Appidacr string `json:"appidacr"`
HTTPSchemasMicrosoftComIdentityClaimsIdentityprovider string `json:"http://schemas.microsoft.com/identity/claims/identityprovider"`
HTTPSchemasMicrosoftComIdentityClaimsObjectidentifier string `json:"http://schemas.microsoft.com/identity/claims/objectidentifier"`
HTTPSchemasXmlsoapOrgWs200505IdentityClaimsNameidentifier string `json:"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier"`
HTTPSchemasMicrosoftComIdentityClaimsTenantid string `json:"http://schemas.microsoft.com/identity/claims/tenantid"`
Uti string `json:"uti"`
Ver string `json:"ver"`
} `json:"claims"`
} `json:"identity"`
Level string `json:"level"`
Location string `json:"location"`
Properties struct {
StatusCode string `json:"statusCode"`
ServiceRequestID interface{} `json:"serviceRequestId"`
StatusMessage string `json:"statusMessage"`
} `json:"properties,omitempty"`
}

type AzureActivityLogs struct {
narph marked this conversation as resolved.
Show resolved Hide resolved
narph marked this conversation as resolved.
Show resolved Hide resolved
Records []ActivityLog `json:"records"`
}
37 changes: 19 additions & 18 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,23 @@ import (

type kafkaInputConfig struct {
// Kafka hosts with port, e.g. "localhost:9092"
Hosts []string `config:"hosts" validate:"required"`
Topics []string `config:"topics" validate:"required"`
GroupID string `config:"group_id" validate:"required"`
ClientID string `config:"client_id"`
Version kafka.Version `config:"version"`
InitialOffset initialOffset `config:"initial_offset"`
ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"`
ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"`
WaitClose time.Duration `config:"wait_close" validate:"min=0"`
MaxWaitTime time.Duration `config:"max_wait_time"`
IsolationLevel isolationLevel `config:"isolation_level"`
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Username string `config:"username"`
Password string `config:"password"`
Hosts []string `config:"hosts" validate:"required"`
Topics []string `config:"topics" validate:"required"`
GroupID string `config:"group_id" validate:"required"`
ClientID string `config:"client_id"`
Version kafka.Version `config:"version"`
InitialOffset initialOffset `config:"initial_offset"`
ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"`
ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"`
WaitClose time.Duration `config:"wait_close" validate:"min=0"`
MaxWaitTime time.Duration `config:"max_wait_time"`
IsolationLevel isolationLevel `config:"isolation_level"`
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Username string `config:"username"`
Password string `config:"password"`
AzureLogs string `config:"azure_logs"`
}

type kafkaFetch struct {
Expand Down Expand Up @@ -103,7 +104,7 @@ var (

// The default config for the kafka input. When in doubt, default values
// were chosen to match sarama's defaults.
func defaultConfig() kafkaInputConfig {
func DefaultConfig() kafkaInputConfig {
narph marked this conversation as resolved.
Show resolved Hide resolved
narph marked this conversation as resolved.
Show resolved Hide resolved
return kafkaInputConfig{
Version: kafka.Version("1.0.0"),
InitialOffset: initialOffsetOldest,
Expand Down Expand Up @@ -143,7 +144,7 @@ func (c *kafkaInputConfig) Validate() error {
return nil
}

func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) {
func NewSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) {
narph marked this conversation as resolved.
Show resolved Hide resolved
narph marked this conversation as resolved.
Show resolved Hide resolved
k := sarama.NewConfig()

version, ok := config.Version.Get()
Expand Down
74 changes: 57 additions & 17 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -62,7 +63,7 @@ func NewInput(
inputContext input.Context,
) (input.Input, error) {

config := defaultConfig()
config := DefaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "reading kafka input config")
}
Expand All @@ -85,7 +86,7 @@ func NewInput(
return nil, err
}

saramaConfig, err := newSaramaConfig(config)
saramaConfig, err := NewSaramaConfig(config)
if err != nil {
return nil, errors.Wrap(err, "initializing Sarama config")
}
Expand All @@ -104,9 +105,11 @@ func NewInput(
func (input *kafkaInput) runConsumerGroup(
context context.Context, consumerGroup sarama.ConsumerGroup,
) {

handler := &groupHandler{
version: input.config.Version,
outlet: input.outlet,
logType: input.config.AzureLogs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I follow what's happening here (in generic kafka config there is no AzureLogs field so this gets set to the empty string -> no special processing), but a comment would be nice to clarify that this is a no-op if there's no Azure-specific configuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AzureLogs have been replaced with a more generic option "YieldEventsFromField" and added some comments there

}

input.saramaWaitGroup.Add(1)
Expand Down Expand Up @@ -234,6 +237,7 @@ type groupHandler struct {
version kafka.Version
session sarama.ConsumerGroupSession
outlet channel.Outleter
logType string
}

// The metadata attached to incoming events so they can be ACKed once they've
Expand All @@ -247,7 +251,7 @@ func (h *groupHandler) createEvent(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
message *sarama.ConsumerMessage,
) beat.Event {
) []beat.Event {
timestamp := time.Now()
kafkaFields := common.MapStr{
"topic": claim.Topic(),
Expand All @@ -266,19 +270,27 @@ func (h *groupHandler) createEvent(
if versionOk && version.IsAtLeast(sarama.V0_11_0_0) {
kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers)
}
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": string(message.Value),
"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}

return event
// if azure input, then a check for the message is done regarding the list of events

var events []beat.Event
messages := h.parseMultipleMessages(message.Value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you need azure specific code in order to parse a message into multiple events. The rest could be done with a json processor in a pipeline, isn't it?

If that's the case, perhaps it would be better to have a generic way to say: spawn events from list under this JSON field: "records"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that is not possible, I could separate the json elements but they will still be generated as one event. The workaround would have been to create a new processor and a new interface for processors that can return multiple events.

for _, msg := range messages {
event := beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
"message": msg,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work ok? My recollection is that the explicit string conversion (string(message.Value) in the old code) was necessary for a lot of things to work, since otherwise it got interpreted as raw bytes by the backend, which messed up the logs and the indexing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no issues with it so far, let me know if you know a more secure option, happy to change it

"kafka": kafkaFields,
},
Private: eventMeta{
handler: h,
message: message,
},
}
events = append(events, event)

}
return events
}

func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error {
Expand Down Expand Up @@ -307,8 +319,36 @@ func (h *groupHandler) ack(message *sarama.ConsumerMessage) {

func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
event := h.createEvent(sess, claim, msg)
h.outlet.OnEvent(event)
events := h.createEvent(sess, claim, msg)
for _, event := range events {
h.outlet.OnEvent(event)
}
}
return nil
}

func (h *groupHandler) parseMultipleMessages(bMessage []byte) []string {
var messages []string
// check if logType has been set
switch h.logType {
// if no log type has been set then the original message should be returned
case "":
default:
return []string{string(bMessage)}
case ActivityLogs:
// if the fileset is activity logs a filtering of the messages should be done as the eventhub can return different types of messages
var obj AzureActivityLogs
err := json.Unmarshal(bMessage, &obj)
if err != nil {
return nil
}
for _, ms := range obj.Records {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
}
}
return messages
}
return nil
}
15 changes: 15 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ filebeat.modules:
# Profile name for aws credential
#var.credential_profile_name: fb-aws

#-------------------------------- Azure Module --------------------------------
- module: azure
# All logs
activitylogs:
enabled: true
var:
eventhubs_namespace: ""
topics: ["insights-operational-logs"]
consumer_group: "$Default"
connection_string: ""

# Set custom paths for the log files. If left empty,
# Filebeat will choose the paths depending on your OS.
#var.paths:

#--------------------------------- CEF Module ---------------------------------
- module: cef
log:
Expand Down
Loading