Skip to content

Commit

Permalink
Added option to test WITHSUFFIXTRIE on redis (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecosta90 authored Sep 29, 2022
1 parent e95391b commit f5fbaca
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 40 deletions.
25 changes: 13 additions & 12 deletions index/redisearch/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ type Index struct {
clientClient *goredis.ClusterClient
standaloneClient *goredis.Client
cluster bool
withSuffixTrie bool
}

// NewIndex creates a new index connecting to the redis host, and using the given name as key prefix
func NewIndex(addrs []string, pass string, temporary int, name string, md *index.Metadata, mode string) *Index {
func NewIndex(addrs []string, pass string, temporary int, name string, md *index.Metadata, mode string, withSuffixTrie bool) *Index {
ret := &Index{
hosts: addrs,

md: md,
password: pass,
temporary: temporary,

name: name,
commandPrefix: "FT",
cluster: false,
hosts: addrs,
md: md,
password: pass,
temporary: temporary,
name: name,
commandPrefix: "FT",
cluster: false,
withSuffixTrie: withSuffixTrie,
}
switch mode {
case "cluster":
Expand Down Expand Up @@ -160,8 +160,9 @@ func (i *Index) Create() error {
if opts.Sortable {
args = append(args, "SORTABLE")
}

// stemming per field not supported yet
if i.withSuffixTrie {
args = append(args, "WITHSUFFIXTRIE")
}

case index.NumericField:
args = append(args, f.Name, "NUMERIC")
Expand Down
63 changes: 35 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ import (

const (
// IndexName is the name of our index on all engines
IndexNamePrefix = "rd"
EN_WIKI_DATASET = "enwiki"
PMC_DATASET = "pmc"
REDDIT_DATASET = "reddit"
DEFAULT_DATASET = EN_WIKI_DATASET
BENCHMARK_SEARCH = "search"
BENCHMARK_PREFIX = "prefix"
BENCHMARK_WILDCARD = "wildcard"
BENCHMARK_DEFAULT = BENCHMARK_SEARCH
ENGINE_REDIS = "redis"
ENGINE_ELASTIC = "elastic"
ENGINE_SOLR = "solr"
TERM_QUERY_MAX_LEN = "term-query-prefix-max-len"
ENGINE_DEFAULT = ENGINE_REDIS
DEFAULT_STOPWORDS = "a,an,and,are,as,at,be,but,by,for,if,in,into,is,it,no,not,of,on,or,such,that,the,their,then,there,these,they,this,to,was,will,with"
redisMode = "redis.mode"
redisModeDefault = "single"
IndexNamePrefix = "rd"
EN_WIKI_DATASET = "enwiki"
PMC_DATASET = "pmc"
REDDIT_DATASET = "reddit"
DEFAULT_DATASET = EN_WIKI_DATASET
BENCHMARK_SEARCH = "search"
BENCHMARK_PREFIX = "prefix"
BENCHMARK_WILDCARD = "wildcard"
BENCHMARK_DEFAULT = BENCHMARK_SEARCH
ENGINE_REDIS = "redis"
ENGINE_ELASTIC = "elastic"
TERM_QUERY_MAX_LEN = "term-query-prefix-max-len"
ENGINE_DEFAULT = ENGINE_REDIS
DEFAULT_STOPWORDS = "a,an,and,are,as,at,be,but,by,for,if,in,into,is,it,no,not,of,on,or,such,that,the,their,then,there,these,they,this,to,was,will,with"
REDIS_MODE_SINGLE = "single"
REDIS_MODULE_OSS_CLUSTER = "cluster"
REDIS_MODE_SINGLE_DEFAULT = REDIS_MODE_SINGLE
)

// this mutex does not affect any of the client go-routines ( it's only to sync between main thread and datapoints processer go-routines )
Expand All @@ -63,12 +63,12 @@ var indexMetadataPMC = index.NewMetadata().
AddField(index.NewTextField("issue", 1))

// selectIndex selects and configures the index we are now running based on the engine name, hosts and number of shards
func selectIndex(indexMetadata *index.Metadata, engine string, hosts []string, user, pass string, temporary int, disableCache bool, name string, cmdPrefix string, shardCount, replicaCount, indexerNumCPUs int, tlsSkipVerify bool, bulkIndexerFlushIntervalSeconds int, bulkIndexerRefresh string, redisMode string) (index.Index, interface{}) {
func selectIndex(indexMetadata *index.Metadata, engine string, hosts []string, user, pass string, temporary int, disableCache bool, name string, cmdPrefix string, shardCount, replicaCount, indexerNumCPUs int, tlsSkipVerify bool, bulkIndexerFlushIntervalSeconds int, bulkIndexerRefresh string, redisMode string, withSuffixTrie bool) (index.Index, interface{}) {

switch engine {
case ENGINE_REDIS:
indexMetadata.Options = redisearch.IndexingOptions{Prefix: cmdPrefix}
idx := redisearch.NewIndex(hosts, pass, temporary, name, indexMetadata, redisMode)
idx := redisearch.NewIndex(hosts, pass, temporary, name, indexMetadata, redisMode, withSuffixTrie)
return idx, query.QueryVerbatim
case ENGINE_ELASTIC:
idx, err := elastic.NewIndex(hosts[0], name, "doc", disableCache, indexMetadata, user, pass, shardCount, replicaCount, indexerNumCPUs, tlsSkipVerify, bulkIndexerFlushIntervalSeconds, bulkIndexerRefresh)
Expand All @@ -94,27 +94,34 @@ func main() {
termStopWords := flag.String("stopwords", DEFAULT_STOPWORDS, "filtered stopwords for term creation")
dataset := flag.String("dataset", DEFAULT_DATASET, fmt.Sprintf("The dataset tp process. One of: [%s]", strings.Join([]string{EN_WIKI_DATASET, REDDIT_DATASET, PMC_DATASET}, "|")))
benchmark := flag.String("benchmark", "", fmt.Sprintf("The benchmark to run. One of: [%s]. If empty will not run.", strings.Join([]string{BENCHMARK_SEARCH, BENCHMARK_PREFIX, BENCHMARK_WILDCARD}, "|")))
elasticShardCount := flag.Int("es.number_of_shards", 1, "elastic shard count")
elasticReplicaCount := flag.Int("es.number_of_replicas", 0, "elastic replica count")
elasticEnableCache := flag.Bool("es.requests.cache.enable", true, "for elastic only. enable query cache.")

tlsSkipVerify := flag.Bool("tls-skip-verify", true, "Skip verification of server certificate.")
verbatimEnabled := flag.Bool("verbatim", false, "for redisearch only. does not try to use stemming for query expansion but searches the query terms verbatim.")
seconds := flag.Int("duration", 60, "number of seconds to run the benchmark")
temporary := flag.Int("temporary", -1, "for redisearch only, create a temporary index that will expire after the given amount of seconds, -1 mean no temporary")
conc := flag.Int("c", runtimeCPUs, "benchmark concurrency")
debugLevel := flag.Int("debug-level", 0, "print debug info according to debug level. If 0 disabled.")
bulkIndexerFlushIntervalSeconds := flag.Int("es.bulk.flush_interval_secs", 1, "ES bulk indexer flush interval.")
maxDocPerIndex := flag.Int64("maxdocs", -1, "specify the number of max docs per index, -1 for no limit")
outfile := flag.String("o", "benchmark.json", "results output file. set to - for stdout")
cmdPrefix := flag.String("redis.cmd.prefix", "FT", "Command prefix for FT module")
redisMode := flag.String("redis.mode", redisModeDefault, "redis connection mode. one if 'sigle'")

password := flag.String("password", "", "database password")
bulkIndexerRefresh := flag.String("es.refresh", "true", "If true, Elasticsearch refreshes the affected\n\t\t// shards to make this operation visible to search\n\t\t// if wait_for then wait for a refresh to make this operation visible to search,\n\t\t// if false do nothing with refreshes. Valid values: true, false, wait_for. Default: false.")
user := flag.String("user", "", "database username. If empty will use the default for each of the databases")
reportingPeriod := flag.Duration("reporting-period", 1*time.Second, "Period to report runtime stats")
bulkIndexingSizeDocs := flag.Int("bulk.indexer.ndocs", 100, "Groups the documents into chunks to index.")
dropData := flag.Bool("drop-data-start", true, "Drop data at start.")

// redis
cmdPrefix := flag.String("redis.cmd.prefix", "FT", "Command prefix for FT module")
redisMode := flag.String("redis.mode", REDIS_MODE_SINGLE_DEFAULT, fmt.Sprintf("Redis connection mode. One of: [%s]", strings.Join([]string{REDIS_MODE_SINGLE, REDIS_MODULE_OSS_CLUSTER}, "|")))
verbatimEnabled := flag.Bool("redis.verbatim", false, "for redisearch only. does not try to use stemming for query expansion but searches the query terms verbatim.")
withsuffixtrieEnabled := flag.Bool("redis.withsuffixtrie", false, "It is used to optimize contains (*foo*) and suffix (*foo) queries.")

// elastic
elasticShardCount := flag.Int("es.number_of_shards", 1, "elastic shard count")
elasticReplicaCount := flag.Int("es.number_of_replicas", 0, "elastic replica count")
elasticEnableCache := flag.Bool("es.requests.cache.enable", true, "for elastic only. enable query cache.")
bulkIndexerRefresh := flag.String("es.refresh", "true", "If true, Elasticsearch refreshes the affected\n\t\t// shards to make this operation visible to search\n\t\t// if wait_for then wait for a refresh to make this operation visible to search,\n\t\t// if false do nothing with refreshes. Valid values: true, false, wait_for. Default: false.")
bulkIndexerFlushIntervalSeconds := flag.Int("es.bulk.flush_interval_secs", 1, "ES bulk indexer flush interval.")

nIdx := 1
benchmarkQueryField := *queryField
if benchmarkQueryField == "" {
Expand Down Expand Up @@ -166,7 +173,7 @@ func main() {
}
// select index to run
name := IndexNamePrefix + strconv.Itoa(0)
idx, _ := selectIndex(indexMetadata, *engine, servers, username, *password, *temporary, !*elasticEnableCache, name, *cmdPrefix, *elasticShardCount, *elasticReplicaCount, *conc, *tlsSkipVerify, *bulkIndexerFlushIntervalSeconds, *bulkIndexerRefresh, *redisMode)
idx, _ := selectIndex(indexMetadata, *engine, servers, username, *password, *temporary, !*elasticEnableCache, name, *cmdPrefix, *elasticShardCount, *elasticReplicaCount, *conc, *tlsSkipVerify, *bulkIndexerFlushIntervalSeconds, *bulkIndexerRefresh, *redisMode, *withsuffixtrieEnabled)
indexes[0] = idx

if *benchmark != "" {
Expand Down

0 comments on commit f5fbaca

Please sign in to comment.