Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge staging to master and bump to v0.0.5 #15

Merged
merged 2 commits into from
Sep 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.4
0.0.5
14 changes: 7 additions & 7 deletions ffsEvent/ffsEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"
)

func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitGroup) {
func FFSQuery (configuration config.Config, query config.FFSQuery) {
//Initialize query waitGroup
var wgQuery sync.WaitGroup

Expand Down Expand Up @@ -125,7 +125,7 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
info, code, err := elasticClient.Ping(query.Elasticsearch.ElasticURL).Do(ctx)

if err != nil {
//TODO hanlde error
//TODO handle error
log.Println("error reaching elastic server")
panic(err)
}
Expand All @@ -141,7 +141,7 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
go func() {
for _, inProgressQuery := range inProgressQueries {
query = setOnOrBeforeAndAfter(query,inProgressQuery.OnOrBefore,inProgressQuery.OnOrAfter)
queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, nil, wg, wgQuery, true, elasticClient, ctx, processor)
queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, nil, true, elasticClient, ctx, processor)
}
}()
}
Expand Down Expand Up @@ -177,8 +177,9 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
for {
select {
case <- queryIntervalTimeTicker.C:
go queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, queryIntervalTimeTicker, wg, wgQuery, false, elasticClient, ctx, processor)
go queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, queryIntervalTimeTicker,false, elasticClient, ctx, processor)
}
defer wgQuery.Done()
}
}()
wgQuery.Wait()
Expand All @@ -194,7 +195,7 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
}
}

func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProgressQuery, authData ffs.AuthData, configuration config.Config, lastCompletedQuery *eventOutput.InProgressQuery, maxTime time.Time, queryIntervalTimeTicker *time.Ticker, wg sync.WaitGroup, wgQuery sync.WaitGroup, cleanUpQuery bool, client *elastic.Client, ctx context.Context, process *elastic.BulkProcessor) {
func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProgressQuery, authData ffs.AuthData, configuration config.Config, lastCompletedQuery *eventOutput.InProgressQuery, maxTime time.Time, queryIntervalTimeTicker *time.Ticker, cleanUpQuery bool, client *elastic.Client, ctx context.Context, process *elastic.BulkProcessor) {
var done bool
var err error
//Increment time
Expand All @@ -208,11 +209,10 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg

//Stop the goroutine if the max time is past
if done {
wg.Done()
wgQuery.Done()
if queryIntervalTimeTicker != nil {
queryIntervalTimeTicker.Stop()
}
return
}
}

Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func main() {
wg.Add(len(configuration.FFSQueries))
go func() {
for _, query := range configuration.FFSQueries {
go ffsEvent.FFSQuery(configuration, query, wg)
go ffsEvent.FFSQuery(configuration, query)
wg.Done()
}
}()

Expand Down