Skip to content

Commit

Permalink
Add parsing of a stream (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimus committed Jan 16, 2021
1 parent 983ad63 commit 6c6d5da
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 52 deletions.
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Config struct {
// BatchSize sets the maximum number of elements in names-strings slice.
BatchSize int

// WithStream changes from parsing a batch by batch, to parsing one name
// at a time. When WithStream is true, BatchSize setting is ignored.
WithStream bool

// IgnoreHTMLTags can be set to true when it is desirable to clean up names from
// a few HTML tags often present in names-strings that were planned to be
// presented via an HTML page.
Expand Down Expand Up @@ -107,6 +111,13 @@ func OptBatchSize(i int) Option {
}
}

// OptWithDetails sets the WithDetails field.
func OptWithStream(b bool) Option {
return func(cfg *Config) {
cfg.WithStream = b
}
}

// OptPort sets a port for web-service.
func OptPort(i int) Option {
return func(cfg *Config) {
Expand Down
58 changes: 58 additions & 0 deletions gnparser/cmd/parse_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package cmd

import (
"bufio"
"fmt"
"io"
"log"
"sync"

"github.com/gnames/gnlib/format"
"github.com/gnames/gnparser"
"github.com/gnames/gnparser/entity/output"
)

func parseFile(
gnp gnparser.GNParser,
f io.Reader,
) {
batch := make([]string, batchSize)
chOut := make(chan []output.Parsed)
var wg sync.WaitGroup

wg.Add(1)
go processResults(chOut, &wg, gnp.Format())

sc := bufio.NewScanner(f)
var i, count int
for sc.Scan() {
batch[count] = sc.Text()
count++
if count == batchSize {
i++
log.Printf("Parsing %d-th line\n", count*i)
chOut <- gnp.ParseNames(batch)
batch = make([]string, batchSize)
count = 0
}
}
chOut <- gnp.ParseNames(batch[:count])
close(chOut)
wg.Wait()
}

func processResults(
out <-chan []output.Parsed,
wg *sync.WaitGroup,
f format.Format,
) {
defer wg.Done()
if f == format.CSV {
fmt.Println(output.CSVHeader())
}
for pr := range out {
for i := range pr {
fmt.Println(pr[i].Output(f))
}
}
}
14 changes: 14 additions & 0 deletions gnparser/cmd/parse_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package cmd

func getNames(
f io.Reader,
) <-chan string {
chIn = make(chan string)
}

func parseStream(
gnp gnparser.GNParser),
f io.Reader,
) {
chIn = getNames(f)
}
84 changes: 32 additions & 52 deletions gnparser/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
package cmd

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"

"github.com/gnames/gnlib/format"
"github.com/gnames/gnlib/sys"
Expand All @@ -55,6 +52,10 @@ interfaces. There are 3 possible settings: 'csv', 'compact', 'pretty'.
# BatchSize to 1.
# BatchSize 50000
# WithStream switches parsing of a large number of name-strings to a
# one-at-a-time stream. When WithStream is true, BatchSize is ignored.
# WithStream false
# IgnoreHTMLTags can be set to true if it is desirable to not try to remove from
# a few HTML tags often present in names-strings that were planned to be
# presented via an HTML page.
Expand All @@ -81,6 +82,7 @@ type cfgData struct {
JobsNum int
BatchSize int
IgnoreHTMLTags bool
WithStream bool
WithDetails bool
Port int
}
Expand Down Expand Up @@ -127,6 +129,7 @@ gnparser -j 5 -p 8080
jobsNumFlag(cmd)
ignoreHTMLTagsFlag(cmd)
withDetailsFlag(cmd)
withStreamFlag(cmd)
batchSizeFlag(cmd)
port := portFlag(cmd)
cfg := config.NewConfig(opts...)
Expand Down Expand Up @@ -173,6 +176,9 @@ func init() {
rootCmd.Flags().IntP("batch_size", "b", 0,
"maximum number of names in a batch send for processing.")

rootCmd.Flags().BoolP("stream", "s", false,
"parse one name at a time in a stream instead of a batch parsing")

rootCmd.Flags().BoolP("ignore_tags", "i", false,
"ignore HTML entities and tags when parsing.")

Expand All @@ -199,6 +205,7 @@ func initConfig() {
// config file settings
viper.BindEnv("Format", "GNPARSER_FORMAT")
viper.BindEnv("JobsNum", "GNPARSER_JOBS_NUM")
viper.BindEnv("WithStream", "GNPARSER_WITH_STREAM")
viper.BindEnv("IgnoreHTMLTags", "GNPARSER_IGNORE_HTML_TAGS")
viper.BindEnv("WithDetails", "GNPARSER_WITH_DETAILS")
viper.BindEnv("Port", "GNPARSER_PORT")
Expand Down Expand Up @@ -230,10 +237,13 @@ func getOpts() []config.Option {
if cfg.BatchSize > 0 {
opts = append(opts, config.OptBatchSize(cfg.BatchSize))
}
if cfg.IgnoreHTMLTags != false {
if cfg.WithStream {
opts = append(opts, config.OptWithStream(cfg.WithStream))
}
if cfg.IgnoreHTMLTags {
opts = append(opts, config.OptIgnoreHTMLTags(cfg.IgnoreHTMLTags))
}
if cfg.WithDetails != false {
if cfg.WithDetails {
opts = append(opts, config.OptWithDetails(cfg.WithDetails))
}
if cfg.Port != 0 {
Expand Down Expand Up @@ -322,6 +332,17 @@ func withDetailsFlag(cmd *cobra.Command) {
}
}

func withStreamFlag(cmd *cobra.Command) {
withDet, err := cmd.Flags().GetBool("stream")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if withDet {
opts = append(opts, config.OptWithStream(true))
}
}

func batchSizeFlag(cmd *cobra.Command) {
bs, err := cmd.Flags().GetInt("batch_size")
if err != nil {
Expand Down Expand Up @@ -351,7 +372,7 @@ func processStdin(cmd *cobra.Command, cfg config.Config) {
return
}
gnp := gnparser.NewGNParser(cfg)
parseFile(gnp, os.Stdin)
parseBatch(gnp, os.Stdin)
}

func checkStdin() bool {
Expand Down Expand Up @@ -388,7 +409,11 @@ func parse(
log.Fatal(err)
os.Exit(1)
}
parseFile(gnp, f)
if cfg.WithStream {
parseStream(gnp, f)
} else {
parseBatch(gnp, f)
}
f.Close()
} else {
parseString(gnp, data)
Expand All @@ -404,51 +429,6 @@ func fileExists(path string) bool {
return false
}

func parseFile(
gnp gnparser.GNParser,
f io.Reader,
) {
batch := make([]string, batchSize)
chOut := make(chan []output.Parsed)
var wg sync.WaitGroup

wg.Add(1)
go processResults(chOut, &wg, gnp.Format())

sc := bufio.NewScanner(f)
var i, count int
for sc.Scan() {
batch[count] = sc.Text()
count++
if count == batchSize {
i++
log.Printf("Parsing %d-th line\n", count*i)
chOut <- gnp.ParseNames(batch)
batch = make([]string, batchSize)
count = 0
}
}
chOut <- gnp.ParseNames(batch[:count])
close(chOut)
wg.Wait()
}

func processResults(
out <-chan []output.Parsed,
wg *sync.WaitGroup,
f format.Format,
) {
defer wg.Done()
if f == format.CSV {
fmt.Println(output.CSVHeader())
}
for pr := range out {
for i := range pr {
fmt.Println(pr[i].Output(f))
}
}
}

func parseString(gnp gnparser.GNParser, name string) {
res := gnp.ParseName(name)
f := gnp.Format()
Expand Down

0 comments on commit 6c6d5da

Please sign in to comment.