Skip to content

Commit

Permalink
Add parsing of a stream (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimus committed Jan 18, 2021
1 parent 983ad63 commit 2d68108
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 147 deletions.
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Config struct {
// BatchSize sets the maximum number of elements in names-strings slice.
BatchSize int

// WithStream changes from parsing a batch by batch, to parsing one name
// at a time. When WithStream is true, BatchSize setting is ignored.
WithStream bool

// IgnoreHTMLTags can be set to true when it is desirable to clean up names from
// a few HTML tags often present in names-strings that were planned to be
// presented via an HTML page.
Expand Down Expand Up @@ -107,6 +111,13 @@ func OptBatchSize(i int) Option {
}
}

// OptWithDetails sets the WithDetails field.
func OptWithStream(b bool) Option {
return func(cfg *Config) {
cfg.WithStream = b
}
}

// OptPort sets a port for web-service.
func OptPort(i int) Option {
return func(cfg *Config) {
Expand Down
2 changes: 1 addition & 1 deletion entity/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (p Parsed) Output(f format.Format) string {
}
}

func CSVHeader() string {
func HeaderCSV() string {
return "Id,Verbatim,Cardinality,CanonicalStem,CanonicalSimple,CanonicalFull,Authorship,Year,Quality"
}

Expand Down
21 changes: 20 additions & 1 deletion entity/output/parsed_result.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
package output

import "fmt"

// ParseResult structure contains parsing output, its place in the
// slice, and an unexpected error, if it happened durin the parsing.
type ParseResult struct {
Index int
Idx int
Parsed Parsed
Error error
}

func (pr ParseResult) Index() int {
return pr.Idx
}

func (pr ParseResult) Unpack(v interface{}) error {
if pr.Error != nil {
return pr.Error
}
switch p := v.(type) {
case *Parsed:
*p = pr.Parsed
return nil
default:
return fmt.Errorf("cannot use %T as Parsed", v)
}
}
4 changes: 2 additions & 2 deletions gnparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (gnp gnparser) ParseNames(names []string) []output.Parsed {
if !ok {
return
}
res[v.Index] = v.Parsed
res[v.Idx] = v.Parsed
}
}
}()
Expand Down Expand Up @@ -121,7 +121,7 @@ func (gnp gnparser) parseWorker(
select {
case <-ctx.Done():
return
case chOut <- output.ParseResult{Index: v.Index, Parsed: parsed}:
case chOut <- output.ParseResult{Idx: v.Index, Parsed: parsed}:
}
}
}
Expand Down
61 changes: 61 additions & 0 deletions gnparser/cmd/parse_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cmd

import (
"bufio"
"fmt"
"io"
"log"
"sync"

"github.com/gnames/gnlib/format"
"github.com/gnames/gnparser"
"github.com/gnames/gnparser/entity/output"
)

func parseBatch(
gnp gnparser.GNParser,
f io.Reader,
) {
batch := make([]string, batchSize)
chOut := make(chan []output.Parsed)
var wg sync.WaitGroup

wg.Add(1)
go processResults(chOut, &wg, gnp.Format())

sc := bufio.NewScanner(f)
var i, count int
for sc.Scan() {
batch[count] = sc.Text()
count++
if count == batchSize {
i++
log.Printf("Parsing %d-th line\n", count*i)
chOut <- gnp.ParseNames(batch)
batch = make([]string, batchSize)
count = 0
}
}
chOut <- gnp.ParseNames(batch[:count])
close(chOut)
if err := sc.Err(); err != nil {
log.Panic(err)
}
wg.Wait()
}

func processResults(
out <-chan []output.Parsed,
wg *sync.WaitGroup,
f format.Format,
) {
defer wg.Done()
if f == format.CSV {
fmt.Println(output.HeaderCSV())
}
for pr := range out {
for i := range pr {
fmt.Println(pr[i].Output(f))
}
}
}
81 changes: 81 additions & 0 deletions gnparser/cmd/parse_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cmd

import (
"bufio"
"context"
"fmt"
"io"
"log"
"sync"

"github.com/gnames/gnlib/format"
"github.com/gnames/gnparser"
"github.com/gnames/gnparser/entity/input"
"github.com/gnames/gnparser/entity/output"
)

func getNames(
ctx context.Context,
f io.Reader,
) <-chan input.Name {
chIn := make(chan input.Name)
sc := bufio.NewScanner(f)

go func() {
defer close(chIn)
var count int
for sc.Scan() {
nameString := sc.Text()
select {
case <-ctx.Done():
return
case chIn <- input.Name{Index: count, NameString: nameString}:
}
count++
}
}()
if err := sc.Err(); err != nil {
log.Panic(err)
}
return chIn
}

func parseStream(
gnp gnparser.GNParser,
f io.Reader,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
chIn := getNames(ctx, f)
chOut := make(chan output.Parsed)
var wg sync.WaitGroup
wg.Add(1)

if gnp.Format() == format.CSV {
output.HeaderCSV()
}

go gnp.ParseNameStream(ctx, chIn, chOut)

go func() {
defer cancel()
defer wg.Done()
var count int
for {
count++
if count%50_000 == 0 {
log.Printf("Processing %d-th name", count)
}
select {
case <-ctx.Done():
return
case v, ok := <-chOut:
if !ok {
return
}
fmt.Println(v.Output(gnp.Format()))
}
}
}()
wg.Wait()
}
86 changes: 33 additions & 53 deletions gnparser/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
package cmd

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"

"github.com/gnames/gnlib/format"
"github.com/gnames/gnlib/sys"
Expand All @@ -55,6 +52,10 @@ interfaces. There are 3 possible settings: 'csv', 'compact', 'pretty'.
# BatchSize to 1.
# BatchSize 50000
# WithStream switches parsing of a large number of name-strings to a
# one-at-a-time stream. When WithStream is true, BatchSize is ignored.
# WithStream false
# IgnoreHTMLTags can be set to true if it is desirable to not try to remove from
# a few HTML tags often present in names-strings that were planned to be
# presented via an HTML page.
Expand All @@ -81,6 +82,7 @@ type cfgData struct {
JobsNum int
BatchSize int
IgnoreHTMLTags bool
WithStream bool
WithDetails bool
Port int
}
Expand Down Expand Up @@ -127,6 +129,7 @@ gnparser -j 5 -p 8080
jobsNumFlag(cmd)
ignoreHTMLTagsFlag(cmd)
withDetailsFlag(cmd)
withStreamFlag(cmd)
batchSizeFlag(cmd)
port := portFlag(cmd)
cfg := config.NewConfig(opts...)
Expand Down Expand Up @@ -173,6 +176,9 @@ func init() {
rootCmd.Flags().IntP("batch_size", "b", 0,
"maximum number of names in a batch send for processing.")

rootCmd.Flags().BoolP("stream", "s", false,
"parse one name at a time in a stream instead of a batch parsing")

rootCmd.Flags().BoolP("ignore_tags", "i", false,
"ignore HTML entities and tags when parsing.")

Expand All @@ -199,6 +205,7 @@ func initConfig() {
// config file settings
viper.BindEnv("Format", "GNPARSER_FORMAT")
viper.BindEnv("JobsNum", "GNPARSER_JOBS_NUM")
viper.BindEnv("WithStream", "GNPARSER_WITH_STREAM")
viper.BindEnv("IgnoreHTMLTags", "GNPARSER_IGNORE_HTML_TAGS")
viper.BindEnv("WithDetails", "GNPARSER_WITH_DETAILS")
viper.BindEnv("Port", "GNPARSER_PORT")
Expand Down Expand Up @@ -230,10 +237,13 @@ func getOpts() []config.Option {
if cfg.BatchSize > 0 {
opts = append(opts, config.OptBatchSize(cfg.BatchSize))
}
if cfg.IgnoreHTMLTags != false {
if cfg.WithStream {
opts = append(opts, config.OptWithStream(cfg.WithStream))
}
if cfg.IgnoreHTMLTags {
opts = append(opts, config.OptIgnoreHTMLTags(cfg.IgnoreHTMLTags))
}
if cfg.WithDetails != false {
if cfg.WithDetails {
opts = append(opts, config.OptWithDetails(cfg.WithDetails))
}
if cfg.Port != 0 {
Expand Down Expand Up @@ -322,6 +332,17 @@ func withDetailsFlag(cmd *cobra.Command) {
}
}

func withStreamFlag(cmd *cobra.Command) {
withDet, err := cmd.Flags().GetBool("stream")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if withDet {
opts = append(opts, config.OptWithStream(true))
}
}

func batchSizeFlag(cmd *cobra.Command) {
bs, err := cmd.Flags().GetInt("batch_size")
if err != nil {
Expand Down Expand Up @@ -351,7 +372,7 @@ func processStdin(cmd *cobra.Command, cfg config.Config) {
return
}
gnp := gnparser.NewGNParser(cfg)
parseFile(gnp, os.Stdin)
parseBatch(gnp, os.Stdin)
}

func checkStdin() bool {
Expand Down Expand Up @@ -388,7 +409,11 @@ func parse(
log.Fatal(err)
os.Exit(1)
}
parseFile(gnp, f)
if cfg.WithStream {
parseStream(gnp, f)
} else {
parseBatch(gnp, f)
}
f.Close()
} else {
parseString(gnp, data)
Expand All @@ -404,56 +429,11 @@ func fileExists(path string) bool {
return false
}

func parseFile(
gnp gnparser.GNParser,
f io.Reader,
) {
batch := make([]string, batchSize)
chOut := make(chan []output.Parsed)
var wg sync.WaitGroup

wg.Add(1)
go processResults(chOut, &wg, gnp.Format())

sc := bufio.NewScanner(f)
var i, count int
for sc.Scan() {
batch[count] = sc.Text()
count++
if count == batchSize {
i++
log.Printf("Parsing %d-th line\n", count*i)
chOut <- gnp.ParseNames(batch)
batch = make([]string, batchSize)
count = 0
}
}
chOut <- gnp.ParseNames(batch[:count])
close(chOut)
wg.Wait()
}

func processResults(
out <-chan []output.Parsed,
wg *sync.WaitGroup,
f format.Format,
) {
defer wg.Done()
if f == format.CSV {
fmt.Println(output.CSVHeader())
}
for pr := range out {
for i := range pr {
fmt.Println(pr[i].Output(f))
}
}
}

func parseString(gnp gnparser.GNParser, name string) {
res := gnp.ParseName(name)
f := gnp.Format()
if f == format.CSV {
fmt.Println(output.CSVHeader())
fmt.Println(output.HeaderCSV())
}
fmt.Println(res.Output(f))
}
Loading

0 comments on commit 2d68108

Please sign in to comment.