Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add identity search workflow #491

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ jobs:
check-latest: true
- name: run e2e test
run: ./pkg/test/e2e/e2e_test.sh
- name: run identity monitor test
run: ./pkg/test/identity_workflow/identity_workflow_e2e_test.sh

37 changes: 37 additions & 0 deletions cmd/monitor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright 2024 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"time"

"github.com/sigstore/rekor-monitor/pkg/identity"
"github.com/sigstore/rekor-monitor/pkg/notifications"
)

type IdentityMonitorConfiguration struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also include once so this either runs indefinitely or only for one iteration?

If so, this would need to be restructured a bit, so that we continually pull in the start index from the last checkpoint that was recorded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related, I think if you specify start/end, that should effectively be the same as once, as a one-off search across indices (which could be used if you are running the search for the first time, for example, or catching up)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering how to include once but I think that makes sense for constantly pulling in the last checkpoint, will do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also decide between the indefinite loop being in the cmd package or in the inner API. Right now, the functionality is embedded in the API, and I wouldn’t expect API users to need to loop indefinitely. Also it’s not possible to test an endless loop, so we can maintain good coverage in the API where the cmd package isn’t expected to have coverage.

Copy link
Collaborator Author

@linus-sun linus-sun Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looping in the cmd package definitely makes more sense- will implement that way in this PR, and refactor consistency check as such in #492 (edit: #492 seems a bit too big, will refactor this in a future PR)

StartIndex *int `yaml:"startIndex"`
EndIndex *int `yaml:"endIndex"`
MonitoredValues identity.MonitoredValues `yaml:"monitoredValues"`
ServerURL string `yaml:"serverURL"`
OutputIdentitiesFile string `yaml:"outputIdentities"`
LogInfoFile string `yaml:"logInfoFile"`
GitHubIssue notifications.GitHubIssueInput `yaml:"githubIssue"`
EmailNotificationSMTP notifications.EmailNotificationInput `yaml:"emailNotificationSMTP"`
EmailNotificationMailgun notifications.MailgunNotificationInput `yaml:"emailNotificationMailgun"`
EmailNotificationSendGrid notifications.SendGridNotificationInput `yaml:"emailNotificationSendGrid"`
Interval *time.Duration `yaml:"interval"`
}
143 changes: 143 additions & 0 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//
// Copyright 2021 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"runtime"
"strings"
"time"

"github.com/sigstore/rekor-monitor/pkg/rekor"
"github.com/sigstore/rekor-monitor/pkg/util/file"
"github.com/sigstore/rekor/pkg/client"
"github.com/sigstore/rekor/pkg/util"
"gopkg.in/yaml.v2"
"sigs.k8s.io/release-utils/version"
)

// Default values for monitoring job parameters
const (
publicRekorServerURL = "https://rekor.sigstore.dev"
logInfoFileName = "logInfo.txt"
outputIdentitiesFileName = "identities.txt"
)

// This main function performs a periodic identity search.
// Upon starting, any existing latest snapshot data is loaded and the function runs
// indefinitely to perform identity search for every time interval that was specified.
func main() {
// Command-line flags that are parameters to the verifier job
configFilePath := flag.String("config-file", "", "path to yaml configuration file containing identity monitor settings")
once := flag.Bool("once", true, "whether to run the monitor on a repeated interval or once")
flag.Parse()

if configFilePath == nil {
log.Fatalf("empty configuration file path")
}

readConfig, err := os.ReadFile(*configFilePath)
if err != nil {
log.Fatalf("error reading from identity monitor configuration file: %v", err)
}

configString := string(readConfig)
var config IdentityMonitorConfiguration
if err := yaml.Unmarshal([]byte(configString), &config); err != nil {
log.Fatalf("error parsing identities: %v", err)
}

rekorClient, err := client.GetRekorClient(config.ServerURL, client.WithUserAgent(strings.TrimSpace(fmt.Sprintf("rekor-monitor/%s (%s; %s)", version.GetVersionInfo().GitVersion, runtime.GOOS, runtime.GOARCH))))
if err != nil {
log.Fatalf("getting Rekor client: %v", err)
}

if config.StartIndex == nil || config.EndIndex == nil {
logInfo, err := rekor.GetLogInfo(context.Background(), rekorClient)
if err != nil {
log.Fatalf("error getting log info: %v", err)
}

checkpoint, err := rekor.ReadLatestCheckpoint(logInfo)
if err != nil {
log.Fatalf("error reading checkpoint: %v", err)
}

var prevCheckpoint *util.SignedCheckpoint
prevCheckpoint, err = file.ReadLatestCheckpoint(config.LogInfoFile)
if err != nil {
log.Fatalf("reading checkpoint log: %v", err)
}

checkpointStartIndex, checkpointEndIndex := rekor.GetCheckpointIndices(logInfo, prevCheckpoint, checkpoint)
if config.StartIndex == nil {
config.StartIndex = &checkpointStartIndex
}
if config.EndIndex == nil {
config.EndIndex = &checkpointEndIndex
}
}

if config.ServerURL == "" {
config.ServerURL = publicRekorServerURL
}
if config.LogInfoFile == "" {
config.LogInfoFile = logInfoFileName
}
if config.OutputIdentitiesFile == "" {
config.OutputIdentitiesFile = outputIdentitiesFileName
}
if config.Interval == nil {
defaultInterval := time.Hour
config.Interval = &defaultInterval
}

ticker := time.NewTicker(*config.Interval)
defer ticker.Stop()

// To get an immediate first tick
for ; ; <-ticker.C {
err = rekor.IdentitySearch(*config.StartIndex, *config.EndIndex, rekorClient, config.MonitoredValues, config.OutputIdentitiesFile)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to successfully complete identity search: %v", err)
return
}

if *once {
return
}

prevCheckpoint, checkpoint, err := rekor.GetPrevCurrentCheckpoints(rekorClient, config.LogInfoFile)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to fetch previous and current checkpoints: %v", err)
return
}

logInfo, err := rekor.GetLogInfo(context.Background(), rekorClient)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to fetch log info: %v", err)
return
}

checkpointStartIndex, checkpointEndIndex := rekor.GetCheckpointIndices(logInfo, prevCheckpoint, checkpoint)
config.StartIndex = &checkpointStartIndex
config.EndIndex = &checkpointEndIndex
}
}
12 changes: 6 additions & 6 deletions pkg/notifications/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
// EmailNotificationInput extends the NotificationPlatform interface to support
// found identity notification by sending emails to a specified user.
type EmailNotificationInput struct {
RecipientEmailAddress string
SenderEmailAddress string
SenderSMTPUsername string
SenderSMTPPassword string
SMTPHostURL string
SMTPCustomOptions []mail.Option
RecipientEmailAddress string `yaml:"recipientEmailAddress"`
SenderEmailAddress string `yaml:"senderEmailAddress"`
SenderSMTPUsername string `yaml:"senderSMTPUsername"`
SenderSMTPPassword string `yaml:"senderSMTPPassword"`
SMTPHostURL string `yaml:"SMTPHostURL"`
SMTPCustomOptions []mail.Option `yaml:"SMTPCustomOptions"`
}

func GenerateEmailBody(monitoredIdentities []identity.MonitoredIdentity) (string, error) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/notifications/github_issues.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ var (
// GitHubIssueInput extends the NotificationPlatform interface to support found identity
// notification via creating new GitHub issues in a given repo.
type GitHubIssueInput struct {
AssigneeUsername string
RepositoryOwner string
RepositoryName string
AssigneeUsername string `yaml:"assigneeUsername"`
RepositoryOwner string `yaml:"repositoryOwner"`
RepositoryName string `yaml:"repositoryName"`
// The PAT or other access token to authenticate creating an issue.
// The authentication token requires repo write and push access.
AuthenticationToken string
AuthenticationToken string `yaml:"authenticationToken"`
// For users who want to pass in a custom client.
// If nil, a default client with the given authentication token will be instantiated.
GitHubClient *github.Client
GitHubClient *github.Client `yaml:"githubClient"`
}

func generateGitHubIssueBody(monitoredIdentities []identity.MonitoredIdentity) (string, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/notifications/mailgun.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
// MailgunNotificationInput extends the NotificationPlatform interface to support
// found identity notification by sending emails to a specified user via Mailgun.
type MailgunNotificationInput struct {
RecipientEmailAddress string
SenderEmailAddress string
MailgunAPIKey string
MailgunDomainName string
RecipientEmailAddress string `yaml:"recipientEmailAddress"`
SenderEmailAddress string `yaml:"senderEmailAddress"`
MailgunAPIKey string `yaml:"mailgunAPIKey"`
MailgunDomainName string `yaml:"mailgunDomainName"`
}

// Send takes in an MailgunNotificationInput and attempts to send the
Expand Down
10 changes: 5 additions & 5 deletions pkg/notifications/sendgrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
// SendGrid extends the NotificationPlatform interface to support
// found identity notification by sending emails to a specified user via SendGrid.
type SendGridNotificationInput struct {
RecipientName string
RecipientEmailAddress string
SenderName string
SenderEmailAddress string
SendGridAPIKey string
RecipientName string `yaml:"recipientName"`
RecipientEmailAddress string `yaml:"recipientEmailAddress"`
SenderName string `yaml:"senderName"`
SenderEmailAddress string `yaml:"senderEmailAddress"`
SendGridAPIKey string `yaml:"sendGridAPIKey"`
}

// Send takes in an SendGridNotificationInput and attempts to send the
Expand Down
73 changes: 55 additions & 18 deletions pkg/rekor/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,28 @@ func oidMatchesPolicy(cert *x509.Certificate, oid asn1.ObjectIdentifier, extensi
return false, nil, "", nil
}

// writeIdentitiesBetweenCheckpoints monitors for given identities between two checkpoints and writes any found identities to file.
func writeIdentitiesBetweenCheckpoints(logInfo *models.LogInfo, prevCheckpoint *util.SignedCheckpoint, checkpoint *util.SignedCheckpoint, monitoredValues identity.MonitoredValues, rekorClient *client.Rekor, outputIdentitiesFile string) error {
func GetPrevCurrentCheckpoints(client *client.Rekor, logInfoFile string) (*util.SignedCheckpoint, *util.SignedCheckpoint, error) {
logInfo, err := GetLogInfo(context.Background(), client)
if err != nil {
return nil, nil, fmt.Errorf("error getting log info: %v", err)
}

checkpoint, err := ReadLatestCheckpoint(logInfo)
if err != nil {
return nil, nil, fmt.Errorf("error reading checkpoint: %v", err)
}

var prevCheckpoint *util.SignedCheckpoint
prevCheckpoint, err = file.ReadLatestCheckpoint(logInfoFile)
if err != nil {
return nil, nil, fmt.Errorf("reading checkpoint log: %v", err)
}

return prevCheckpoint, checkpoint, nil
}

// GetCheckpointIndices fetches the start and end indexes between two checkpoints and returns them.
func GetCheckpointIndices(logInfo *models.LogInfo, prevCheckpoint *util.SignedCheckpoint, checkpoint *util.SignedCheckpoint) (int, int) {
// Get log size of inactive shards
totalSize := 0
for _, s := range logInfo.InactiveShards {
Expand All @@ -351,26 +371,43 @@ func writeIdentitiesBetweenCheckpoints(logInfo *models.LogInfo, prevCheckpoint *
startIndex := int(prevCheckpoint.Size) + totalSize - 1 //nolint: gosec // G115, log will never be large enough to overflow
endIndex := int(checkpoint.Size) + totalSize - 1 //nolint: gosec // G115

// Search for identities in the log range
if identity.MonitoredValuesExist(monitoredValues) {
entries, err := GetEntriesByIndexRange(context.Background(), rekorClient, startIndex, endIndex)
if err != nil {
return fmt.Errorf("error getting entries by index range: %v", err)
}
idEntries, err := MatchedIndices(entries, monitoredValues)
if err != nil {
return fmt.Errorf("error finding log indices: %v", err)
}
return startIndex, endIndex
}

if len(idEntries) > 0 {
for _, idEntry := range idEntries {
fmt.Fprintf(os.Stderr, "Found %s\n", idEntry.String())
func IdentitySearch(startIndex int, endIndex int, rekorClient *client.Rekor, monitoredValues identity.MonitoredValues, outputIdentitiesFile string) error {

if err := file.WriteIdentity(outputIdentitiesFile, idEntry); err != nil {
return fmt.Errorf("failed to write entry: %v", err)
}
entries, err := GetEntriesByIndexRange(context.Background(), rekorClient, startIndex, endIndex)
if err != nil {
return fmt.Errorf("error getting entries by index range: %v", err)
}
idEntries, err := MatchedIndices(entries, monitoredValues)
if err != nil {
return fmt.Errorf("error finding log indices: %v", err)
}

if len(idEntries) > 0 {
for _, idEntry := range idEntries {
fmt.Fprintf(os.Stderr, "Found %s\n", idEntry.String())

if err := file.WriteIdentity(outputIdentitiesFile, idEntry); err != nil {
return fmt.Errorf("failed to write entry: %v", err)
}
}
}
return nil
}

// writeIdentitiesBetweenCheckpoints monitors for given identities between two checkpoints and writes any found identities to file.
func writeIdentitiesBetweenCheckpoints(logInfo *models.LogInfo, prevCheckpoint *util.SignedCheckpoint, checkpoint *util.SignedCheckpoint, monitoredValues identity.MonitoredValues, rekorClient *client.Rekor, outputIdentitiesFile string) error {
// Get log size of inactive shards
startIndex, endIndex := GetCheckpointIndices(logInfo, prevCheckpoint, checkpoint)

// Search for identities in the log range
if identity.MonitoredValuesExist(monitoredValues) {
err := IdentitySearch(startIndex, endIndex, rekorClient, monitoredValues, outputIdentitiesFile)
if err != nil {
return fmt.Errorf("error monitoring for identities: %v", err)
}
}
return nil
}
14 changes: 12 additions & 2 deletions pkg/rekor/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,21 @@ func GetLogVerifier(ctx context.Context, rekorClient *client.Rekor) (signature.V
return verifier, nil
}

// ReadLatestCheckpoint fetches the latest checkpoint from log info fetched from Rekor.
// It returns the checkpoint if it successfully fetches one; otherwise, it returns an error.
func ReadLatestCheckpoint(logInfo *models.LogInfo) (*util.SignedCheckpoint, error) {
checkpoint := &util.SignedCheckpoint{}
if err := checkpoint.UnmarshalText([]byte(*logInfo.SignedTreeHead)); err != nil {
return nil, fmt.Errorf("unmarshalling logInfo.SignedTreeHead to Checkpoint: %v", err)
}
return checkpoint, nil
}

// verifyLatestCheckpoint fetches and verifies the signature of the latest checkpoint from log info fetched from Rekor.
// If it successfully verifies the checkpoint's signature, it returns the checkpoint; otherwise, it returns an error.
func verifyLatestCheckpointSignature(logInfo *models.LogInfo, verifier signature.Verifier) (*util.SignedCheckpoint, error) {
checkpoint := &util.SignedCheckpoint{}
if err := checkpoint.UnmarshalText([]byte(*logInfo.SignedTreeHead)); err != nil {
checkpoint, err := ReadLatestCheckpoint(logInfo)
if err != nil {
return nil, fmt.Errorf("unmarshalling logInfo.SignedTreeHead to Checkpoint: %v", err)
}
if !checkpoint.Verify(verifier) {
Expand Down
Loading
Loading