Skip to content

Commit

Permalink
Merge pull request #15 from BenB196/staging
Browse files Browse the repository at this point in the history
Merge staging to master and bump to v0.0.5
  • Loading branch information
BenB196 committed Sep 22, 2019
2 parents 95e8878 + 400b08d commit 5b39c83
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.4
0.0.5
14 changes: 7 additions & 7 deletions ffsEvent/ffsEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"
)

func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitGroup) {
func FFSQuery (configuration config.Config, query config.FFSQuery) {
//Initialize query waitGroup
var wgQuery sync.WaitGroup

Expand Down Expand Up @@ -125,7 +125,7 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
info, code, err := elasticClient.Ping(query.Elasticsearch.ElasticURL).Do(ctx)

if err != nil {
//TODO hanlde error
//TODO handle error
log.Println("error reaching elastic server")
panic(err)
}
Expand All @@ -141,7 +141,7 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
go func() {
for _, inProgressQuery := range inProgressQueries {
query = setOnOrBeforeAndAfter(query,inProgressQuery.OnOrBefore,inProgressQuery.OnOrAfter)
queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, nil, wg, wgQuery, true, elasticClient, ctx, processor)
queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, nil, true, elasticClient, ctx, processor)
}
}()
}
Expand Down Expand Up @@ -177,8 +177,9 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
for {
select {
case <- queryIntervalTimeTicker.C:
go queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, queryIntervalTimeTicker, wg, wgQuery, false, elasticClient, ctx, processor)
go queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, queryIntervalTimeTicker,false, elasticClient, ctx, processor)
}
defer wgQuery.Done()
}
}()
wgQuery.Wait()
Expand All @@ -194,7 +195,7 @@ func FFSQuery (configuration config.Config, query config.FFSQuery, wg sync.WaitG
}
}

func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProgressQuery, authData ffs.AuthData, configuration config.Config, lastCompletedQuery *eventOutput.InProgressQuery, maxTime time.Time, queryIntervalTimeTicker *time.Ticker, wg sync.WaitGroup, wgQuery sync.WaitGroup, cleanUpQuery bool, client *elastic.Client, ctx context.Context, process *elastic.BulkProcessor) {
func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProgressQuery, authData ffs.AuthData, configuration config.Config, lastCompletedQuery *eventOutput.InProgressQuery, maxTime time.Time, queryIntervalTimeTicker *time.Ticker, cleanUpQuery bool, client *elastic.Client, ctx context.Context, process *elastic.BulkProcessor) {
var done bool
var err error
//Increment time
Expand All @@ -208,11 +209,10 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg

//Stop the goroutine if the max time is past
if done {
wg.Done()
wgQuery.Done()
if queryIntervalTimeTicker != nil {
queryIntervalTimeTicker.Stop()
}
return
}
}

Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func main() {
wg.Add(len(configuration.FFSQueries))
go func() {
for _, query := range configuration.FFSQueries {
go ffsEvent.FFSQuery(configuration, query, wg)
go ffsEvent.FFSQuery(configuration, query)
wg.Done()
}
}()

Expand Down

0 comments on commit 5b39c83

Please sign in to comment.