diff --git a/index/redisearch/index.go b/index/redisearch/index.go index 6118bce..59b92e5 100644 --- a/index/redisearch/index.go +++ b/index/redisearch/index.go @@ -56,20 +56,20 @@ type Index struct { clientClient *goredis.ClusterClient standaloneClient *goredis.Client cluster bool + withSuffixTrie bool } // NewIndex creates a new index connecting to the redis host, and using the given name as key prefix -func NewIndex(addrs []string, pass string, temporary int, name string, md *index.Metadata, mode string) *Index { +func NewIndex(addrs []string, pass string, temporary int, name string, md *index.Metadata, mode string, withSuffixTrie bool) *Index { ret := &Index{ - hosts: addrs, - - md: md, - password: pass, - temporary: temporary, - - name: name, - commandPrefix: "FT", - cluster: false, + hosts: addrs, + md: md, + password: pass, + temporary: temporary, + name: name, + commandPrefix: "FT", + cluster: false, + withSuffixTrie: withSuffixTrie, } switch mode { case "cluster": @@ -160,8 +160,9 @@ func (i *Index) Create() error { if opts.Sortable { args = append(args, "SORTABLE") } - - // stemming per field not supported yet + if i.withSuffixTrie { + args = append(args, "WITHSUFFIXTRIE") + } case index.NumericField: args = append(args, f.Name, "NUMERIC") diff --git a/main.go b/main.go index 2a0271e..31ec789 100644 --- a/main.go +++ b/main.go @@ -24,23 +24,23 @@ import ( const ( // IndexName is the name of our index on all engines - IndexNamePrefix = "rd" - EN_WIKI_DATASET = "enwiki" - PMC_DATASET = "pmc" - REDDIT_DATASET = "reddit" - DEFAULT_DATASET = EN_WIKI_DATASET - BENCHMARK_SEARCH = "search" - BENCHMARK_PREFIX = "prefix" - BENCHMARK_WILDCARD = "wildcard" - BENCHMARK_DEFAULT = BENCHMARK_SEARCH - ENGINE_REDIS = "redis" - ENGINE_ELASTIC = "elastic" - ENGINE_SOLR = "solr" - TERM_QUERY_MAX_LEN = "term-query-prefix-max-len" - ENGINE_DEFAULT = ENGINE_REDIS - DEFAULT_STOPWORDS = "a,an,and,are,as,at,be,but,by,for,if,in,into,is,it,no,not,of,on,or,such,that,the,their,then,there,these,they,this,to,was,will,with" - redisMode = "redis.mode" - redisModeDefault = "single" + IndexNamePrefix = "rd" + EN_WIKI_DATASET = "enwiki" + PMC_DATASET = "pmc" + REDDIT_DATASET = "reddit" + DEFAULT_DATASET = EN_WIKI_DATASET + BENCHMARK_SEARCH = "search" + BENCHMARK_PREFIX = "prefix" + BENCHMARK_WILDCARD = "wildcard" + BENCHMARK_DEFAULT = BENCHMARK_SEARCH + ENGINE_REDIS = "redis" + ENGINE_ELASTIC = "elastic" + TERM_QUERY_MAX_LEN = "term-query-prefix-max-len" + ENGINE_DEFAULT = ENGINE_REDIS + DEFAULT_STOPWORDS = "a,an,and,are,as,at,be,but,by,for,if,in,into,is,it,no,not,of,on,or,such,that,the,their,then,there,these,they,this,to,was,will,with" + REDIS_MODE_SINGLE = "single" + REDIS_MODULE_OSS_CLUSTER = "cluster" + REDIS_MODE_SINGLE_DEFAULT = REDIS_MODE_SINGLE ) // this mutex does not affect any of the client go-routines ( it's only to sync between main thread and datapoints processer go-routines ) @@ -63,12 +63,12 @@ var indexMetadataPMC = index.NewMetadata(). AddField(index.NewTextField("issue", 1)) // selectIndex selects and configures the index we are now running based on the engine name, hosts and number of shards -func selectIndex(indexMetadata *index.Metadata, engine string, hosts []string, user, pass string, temporary int, disableCache bool, name string, cmdPrefix string, shardCount, replicaCount, indexerNumCPUs int, tlsSkipVerify bool, bulkIndexerFlushIntervalSeconds int, bulkIndexerRefresh string, redisMode string) (index.Index, interface{}) { +func selectIndex(indexMetadata *index.Metadata, engine string, hosts []string, user, pass string, temporary int, disableCache bool, name string, cmdPrefix string, shardCount, replicaCount, indexerNumCPUs int, tlsSkipVerify bool, bulkIndexerFlushIntervalSeconds int, bulkIndexerRefresh string, redisMode string, withSuffixTrie bool) (index.Index, interface{}) { switch engine { case ENGINE_REDIS: indexMetadata.Options = redisearch.IndexingOptions{Prefix: cmdPrefix} - idx := redisearch.NewIndex(hosts, pass, temporary, name, indexMetadata, redisMode) + idx := redisearch.NewIndex(hosts, pass, temporary, name, indexMetadata, redisMode, withSuffixTrie) return idx, query.QueryVerbatim case ENGINE_ELASTIC: idx, err := elastic.NewIndex(hosts[0], name, "doc", disableCache, indexMetadata, user, pass, shardCount, replicaCount, indexerNumCPUs, tlsSkipVerify, bulkIndexerFlushIntervalSeconds, bulkIndexerRefresh) @@ -94,27 +94,34 @@ func main() { termStopWords := flag.String("stopwords", DEFAULT_STOPWORDS, "filtered stopwords for term creation") dataset := flag.String("dataset", DEFAULT_DATASET, fmt.Sprintf("The dataset tp process. One of: [%s]", strings.Join([]string{EN_WIKI_DATASET, REDDIT_DATASET, PMC_DATASET}, "|"))) benchmark := flag.String("benchmark", "", fmt.Sprintf("The benchmark to run. One of: [%s]. If empty will not run.", strings.Join([]string{BENCHMARK_SEARCH, BENCHMARK_PREFIX, BENCHMARK_WILDCARD}, "|"))) - elasticShardCount := flag.Int("es.number_of_shards", 1, "elastic shard count") - elasticReplicaCount := flag.Int("es.number_of_replicas", 0, "elastic replica count") - elasticEnableCache := flag.Bool("es.requests.cache.enable", true, "for elastic only. enable query cache.") + tlsSkipVerify := flag.Bool("tls-skip-verify", true, "Skip verification of server certificate.") - verbatimEnabled := flag.Bool("verbatim", false, "for redisearch only. does not try to use stemming for query expansion but searches the query terms verbatim.") seconds := flag.Int("duration", 60, "number of seconds to run the benchmark") temporary := flag.Int("temporary", -1, "for redisearch only, create a temporary index that will expire after the given amount of seconds, -1 mean no temporary") conc := flag.Int("c", runtimeCPUs, "benchmark concurrency") debugLevel := flag.Int("debug-level", 0, "print debug info according to debug level. If 0 disabled.") - bulkIndexerFlushIntervalSeconds := flag.Int("es.bulk.flush_interval_secs", 1, "ES bulk indexer flush interval.") maxDocPerIndex := flag.Int64("maxdocs", -1, "specify the number of max docs per index, -1 for no limit") outfile := flag.String("o", "benchmark.json", "results output file. set to - for stdout") - cmdPrefix := flag.String("redis.cmd.prefix", "FT", "Command prefix for FT module") - redisMode := flag.String("redis.mode", redisModeDefault, "redis connection mode. one if 'sigle'") + password := flag.String("password", "", "database password") - bulkIndexerRefresh := flag.String("es.refresh", "true", "If true, Elasticsearch refreshes the affected\n\t\t// shards to make this operation visible to search\n\t\t// if wait_for then wait for a refresh to make this operation visible to search,\n\t\t// if false do nothing with refreshes. Valid values: true, false, wait_for. Default: false.") user := flag.String("user", "", "database username. If empty will use the default for each of the databases") reportingPeriod := flag.Duration("reporting-period", 1*time.Second, "Period to report runtime stats") bulkIndexingSizeDocs := flag.Int("bulk.indexer.ndocs", 100, "Groups the documents into chunks to index.") dropData := flag.Bool("drop-data-start", true, "Drop data at start.") + // redis + cmdPrefix := flag.String("redis.cmd.prefix", "FT", "Command prefix for FT module") + redisMode := flag.String("redis.mode", REDIS_MODE_SINGLE_DEFAULT, fmt.Sprintf("Redis connection mode. One of: [%s]", strings.Join([]string{REDIS_MODE_SINGLE, REDIS_MODULE_OSS_CLUSTER}, "|"))) + verbatimEnabled := flag.Bool("redis.verbatim", false, "for redisearch only. does not try to use stemming for query expansion but searches the query terms verbatim.") + withsuffixtrieEnabled := flag.Bool("redis.withsuffixtrie", false, "It is used to optimize contains (*foo*) and suffix (*foo) queries.") + + // elastic + elasticShardCount := flag.Int("es.number_of_shards", 1, "elastic shard count") + elasticReplicaCount := flag.Int("es.number_of_replicas", 0, "elastic replica count") + elasticEnableCache := flag.Bool("es.requests.cache.enable", true, "for elastic only. enable query cache.") + bulkIndexerRefresh := flag.String("es.refresh", "true", "If true, Elasticsearch refreshes the affected\n\t\t// shards to make this operation visible to search\n\t\t// if wait_for then wait for a refresh to make this operation visible to search,\n\t\t// if false do nothing with refreshes. Valid values: true, false, wait_for. Default: false.") + bulkIndexerFlushIntervalSeconds := flag.Int("es.bulk.flush_interval_secs", 1, "ES bulk indexer flush interval.") + nIdx := 1 benchmarkQueryField := *queryField if benchmarkQueryField == "" { @@ -166,7 +173,7 @@ func main() { } // select index to run name := IndexNamePrefix + strconv.Itoa(0) - idx, _ := selectIndex(indexMetadata, *engine, servers, username, *password, *temporary, !*elasticEnableCache, name, *cmdPrefix, *elasticShardCount, *elasticReplicaCount, *conc, *tlsSkipVerify, *bulkIndexerFlushIntervalSeconds, *bulkIndexerRefresh, *redisMode) + idx, _ := selectIndex(indexMetadata, *engine, servers, username, *password, *temporary, !*elasticEnableCache, name, *cmdPrefix, *elasticShardCount, *elasticReplicaCount, *conc, *tlsSkipVerify, *bulkIndexerFlushIntervalSeconds, *bulkIndexerRefresh, *redisMode, *withsuffixtrieEnabled) indexes[0] = idx if *benchmark != "" {