Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete term set #2284

Merged
merged 24 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
53f3180
wire new type for text or set query through the codebase
trinity-1686a Nov 4, 2022
7154510
add tags to SetQuery and convert api call to protobuf
trinity-1686a Nov 7, 2022
194fb00
add search per TermSet
trinity-1686a Nov 7, 2022
c502cd1
allow most kind of term in TermSetQuery
trinity-1686a Nov 8, 2022
0dfdd66
handle errors on delete api endpoint
trinity-1686a Nov 9, 2022
2ff6349
handle errors when building query
trinity-1686a Nov 9, 2022
6fcc14c
add warming up cache and basic test for TermSetQuery
trinity-1686a Nov 10, 2022
2237c82
add new byterange cache
trinity-1686a Nov 12, 2022
7cb0cdf
update tantivy dep to support warming up full postings
trinity-1686a Nov 12, 2022
e4268d4
add proptest on bytesrange cache
trinity-1686a Nov 14, 2022
3719762
address minor comments
trinity-1686a Nov 14, 2022
22f08a1
cargo fmt
trinity-1686a Nov 14, 2022
e3e18d6
small refactoring
trinity-1686a Nov 14, 2022
5f65307
fix todos in warmup
trinity-1686a Nov 14, 2022
2392b68
fix clippy
trinity-1686a Nov 14, 2022
10ebfa9
Revert "add tags to SetQuery and convert api call to protobuf"
trinity-1686a Nov 16, 2022
d4188d0
Revert "wire new type for text or set query through the codebase"
trinity-1686a Nov 16, 2022
6b97dd2
add support for TermSetQuery through query parser
trinity-1686a Nov 16, 2022
5e5ab21
test delete with TermSet
trinity-1686a Nov 16, 2022
8e248ca
add test for query builder with TermSet
trinity-1686a Nov 17, 2022
0842ebb
fix nits and test ByteRangeCache counters
trinity-1686a Nov 17, 2022
0fc1e54
update tantivy and change some doc comments
trinity-1686a Nov 17, 2022
dde32b1
Merge branch 'main' into delete-term-set
trinity-1686a Nov 18, 2022
56d27d1
Merge branch 'main' into delete-term-set
trinity-1686a Nov 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ quickwit-serve = { version = "0.3.1", path = "./quickwit-serve" }
quickwit-storage = { version = "0.3.1", path = "./quickwit-storage" }
quickwit-telemetry = { version = "0.3.1", path = "./quickwit-telemetry" }

fastfield_codecs = { git = "https://github.com/quickwit-oss/tantivy/", rev = "55a9d80" }
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "55a9d80", default-features = false, features = [
fastfield_codecs = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9a090ed" }
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9a090ed", default-features = false, features = [
"mmap",
"lz4-compression",
"zstd-compression",
"quickwit",
] }
tantivy-query-grammar = { git = "https://github.com/quickwit-oss/tantivy/", rev = "55a9d80" }
tantivy-query-grammar = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9a090ed" }

# This is actually not used directly the goal is to fix the version
# used by reqwest. 0.8.30 has an unclear license.
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchRespons
.await?;
let search_request = SearchRequest {
index_id: args.index_id,
query: args.query.clone(),
query: Some(args.query.clone().into()),
search_fields: args.search_fields.unwrap_or_default(),
snippet_fields: args.snippet_fields.unwrap_or_default(),
start_timestamp: args.start_timestamp,
Expand Down
36 changes: 5 additions & 31 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use std::{fmt, io};

use async_trait::async_trait;
use quickwit_storage::MemorySizedCache;
use quickwit_storage::ByteRangeCache;
use tantivy::directory::error::OpenReadError;
use tantivy::directory::{FileHandle, OwnedBytes};
use tantivy::{AsyncIoResult, Directory, HasLen};
Expand All @@ -33,43 +33,18 @@ use tantivy::{AsyncIoResult, Directory, HasLen};
pub struct CachingDirectory {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: Arc<MemorySizedCache>,
cache: Arc<ByteRangeCache>,
}

impl CachingDirectory {
/// Creates a new CachingDirectory.
/// `capacity_in_bytes` acts as a memory budget for the directory.
///
/// The implementation is voluntarily very naive as it was design solely to
/// address Quickwit's requirements.
/// Most notably, if two reads targetting the same read on the same path
/// happen concurrently, the read will be executed twice.
///
/// The overall number of bytes held in memory may exceed the capacity at one point
/// if a read request is large than `capacity_in_bytes`.
/// In that case, the read payload will not be saved in the cache.
pub fn new_with_capacity_in_bytes(
underlying: Arc<dyn Directory>,
capacity_in_bytes: usize,
) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(MemorySizedCache::with_capacity_in_bytes(
capacity_in_bytes,
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
/// Prefer using `.new_with_capacity_in_bytes(...)` in most case.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(MemorySizedCache::with_infinite_capacity(
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
Expand All @@ -84,7 +59,7 @@ impl fmt::Debug for CachingDirectory {

struct CachingFileHandle {
path: PathBuf,
cache: Arc<MemorySizedCache>,
cache: Arc<ByteRangeCache>,
underlying_filehandle: Arc<dyn FileHandle>,
}

Expand Down Expand Up @@ -179,8 +154,7 @@ mod tests {
let test_path = Path::new("test");
ram_directory.atomic_write(test_path, &b"test"[..])?;
let debug_proxy_directory = Arc::new(DebugProxyDirectory::wrap(ram_directory));
let caching_directory =
CachingDirectory::new_with_capacity_in_bytes(debug_proxy_directory.clone(), 10_000);
let caching_directory = CachingDirectory::new_unbounded(debug_proxy_directory.clone());
caching_directory.atomic_read(test_path)?;
caching_directory.atomic_read(test_path)?;
assert_eq!(debug_proxy_directory.drain_read_operations().count(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ mod tests {
query: &str,
) -> Result<String, String> {
let search_request = SearchRequest {
query: query.to_string(),
query: Some(query.to_string().into()),
..Default::default()
};
let query = doc_mapper
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-doc-mapper/src/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ mod tests {
let schema = doc_mapper.schema();
let search_request = SearchRequest {
index_id: "quickwit-index".to_string(),
query: "json_field.toto.titi:hello".to_string(),
query: Some("json_field.toto.titi:hello".to_string().into()),
search_fields: vec![],
snippet_fields: vec![],
start_timestamp: None,
Expand Down Expand Up @@ -262,7 +262,7 @@ mod tests {
let schema = doc_mapper.schema();
let search_request = SearchRequest {
index_id: "quickwit-index".to_string(),
query: "text_field:hello".to_string(),
query: Some("text_field:hello".to_string().into()),
search_fields: vec![],
snippet_fields: vec![],
start_timestamp: None,
Expand Down Expand Up @@ -297,7 +297,7 @@ mod tests {
let schema = doc_mapper.schema();
let search_request = SearchRequest {
index_id: "quickwit-index".to_string(),
query: "toto.titi:hello".to_string(),
query: Some("toto.titi:hello".to_string().into()),
search_fields: vec![],
snippet_fields: vec![],
start_timestamp: None,
Expand Down Expand Up @@ -332,7 +332,7 @@ mod tests {
let schema = doc_mapper.schema();
let search_request = SearchRequest {
index_id: "quickwit-index".to_string(),
query: "toto:5".to_string(),
query: Some("toto:5".to_string().into()),
search_fields: vec![],
snippet_fields: vec![],
start_timestamp: None,
Expand Down
128 changes: 97 additions & 31 deletions quickwit/quickwit-doc-mapper/src/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

use std::collections::HashSet;

use quickwit_proto::search_request::Query as SearchQuery;
use quickwit_proto::SearchRequest;
use tantivy::query::{Query, QueryParser, QueryParserError as TantivyQueryParserError};
use tantivy::schema::{Field, FieldType, Schema};
use tantivy::schema::{Field, FieldType, Schema, Term, Type, Value};
use tantivy_query_grammar::{UserInputAst, UserInputLeaf, UserInputLiteral};

use crate::sort_by::validate_sort_by_field_name;
Expand All @@ -33,38 +34,103 @@ pub(crate) fn build_query(
request: &SearchRequest,
default_field_names: &[String],
) -> Result<Box<dyn Query>, QueryParserError> {
let user_input_ast = tantivy_query_grammar::parse_query(&request.query)
.map_err(|_| TantivyQueryParserError::SyntaxError(request.query.to_string()))?;
let query = match &request.query {
Some(SearchQuery::Text(query)) => {
let user_input_ast = tantivy_query_grammar::parse_query(query)
.map_err(|_| TantivyQueryParserError::SyntaxError(query.clone()))?;

if has_range_clause(&user_input_ast) {
return Err(anyhow::anyhow!("Range queries are not currently allowed.").into());
}
if has_range_clause(&user_input_ast) {
return Err(anyhow::anyhow!("Range queries are not currently allowed.").into());
}

if needs_default_search_field(&user_input_ast)
&& request.search_fields.is_empty()
&& (default_field_names.is_empty() || default_field_names == [DYNAMIC_FIELD_NAME])
{
return Err(
anyhow::anyhow!("No default field declared and no field specified in query.").into(),
);
}
if needs_default_search_field(&user_input_ast)
&& request.search_fields.is_empty()
&& (default_field_names.is_empty() || default_field_names == [DYNAMIC_FIELD_NAME])
{
return Err(anyhow::anyhow!(
"No default field declared and no field specified in query."
)
.into());
}

validate_requested_snippet_fields(&schema, request, &user_input_ast, default_field_names)?;
validate_requested_snippet_fields(
&schema,
request,
&user_input_ast,
default_field_names,
)?;

let search_fields = if request.search_fields.is_empty() {
resolve_fields(&schema, default_field_names)?
} else {
resolve_fields(&schema, &request.search_fields)?
};

if let Some(sort_by_field) = request.sort_by_field.as_ref() {
validate_sort_by_field_name(sort_by_field, &schema, Some(&search_fields))?;
}

let search_fields = if request.search_fields.is_empty() {
resolve_fields(&schema, default_field_names)?
} else {
resolve_fields(&schema, &request.search_fields)?
};
let mut query_parser =
QueryParser::new(schema, search_fields, QUICKWIT_TOKENIZER_MANAGER.clone());
query_parser.set_conjunction_by_default();
query_parser.parse_query(query)?
}
Some(SearchQuery::SetQuery(set_query)) => {
let _ = (&set_query.terms, &set_query.field_name, &set_query.tags);
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
let field_name = &set_query.field_name;
let field = schema
.get_field(field_name)
.ok_or_else(|| TantivyQueryParserError::FieldDoesNotExist(field_name.clone()))?;

let field_type = schema.get_field_entry(field).field_type();
if !field_type.is_indexed() {
return Err(anyhow::anyhow!("Attempted to search on not indexed field.").into());
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
}

if let Some(sort_by_field) = request.sort_by_field.as_ref() {
validate_sort_by_field_name(sort_by_field, &schema, Some(&search_fields))?;
}
// TODO maybe Facet could be allowed?
if matches!(field_type.value_type(), Type::Json | Type::Facet) {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
return Err(
anyhow::anyhow!("Attempted to search on unsuported field type.").into(),
);
}

// TODO error on any parse error instead??
let terms: Vec<_> = set_query
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
.terms
.iter()
// TODO not correct
.flat_map(|term| field_type.value_from_json(term.clone().into()).ok())
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
.map(|term| match term {
Value::Str(text) => Term::from_field_text(field, &text),
Value::PreTokStr(pre_tokenized_string) => {
Term::from_field_text(field, &pre_tokenized_string.text)
}
Value::U64(int) => Term::from_field_u64(field, int),
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
Value::I64(int) => Term::from_field_i64(field, int),
Value::F64(int) => Term::from_field_f64(field, int),
Value::Bool(b) => Term::from_field_bool(field, b),
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
Value::Date(date) => Term::from_field_date(field, date),
Value::Bytes(buf) => Term::from_field_bytes(field, &buf),
Value::IpAddr(ip) => Term::from_field_ip_addr(field, ip),
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
Value::JsonObject(_) | Value::Facet(_) => unreachable!(),
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
})
.collect();

if terms.is_empty() {
return Err(anyhow::anyhow!("No valid term to search for").into());
}

// TODO BooleanQuery(Must(Set(terms)), Must(Set(tag))) would probably be more coherent
Box::new(tantivy::query::TermSetQuery::new(terms))
}
None => {
return Err(anyhow::anyhow!(
"Invalid request state: neither a text query nor a set query"
)
.into())
}
};

let mut query_parser =
QueryParser::new(schema, search_fields, QUICKWIT_TOKENIZER_MANAGER.clone());
query_parser.set_conjunction_by_default();
let query = query_parser.parse_query(&request.query)?;
Ok(query)
}
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -205,7 +271,7 @@ mod test {
let request = SearchRequest {
aggregation_request: None,
index_id: "test_index".to_string(),
query: query_str.to_string(),
query: Some(query_str.to_string().into()),
search_fields,
snippet_fields: vec![],
start_timestamp: None,
Expand Down Expand Up @@ -358,7 +424,7 @@ mod test {
let request = SearchRequest {
aggregation_request: None,
index_id: "test_index".to_string(),
query: query_str.to_string(),
query: Some(query_str.to_string().into()),
search_fields,
snippet_fields,
start_timestamp: None,
Expand All @@ -368,8 +434,8 @@ mod test {
sort_order: None,
sort_by_field: None,
};
let user_input_ast = tantivy_query_grammar::parse_query(&request.query)
.map_err(|_| QueryParserError::SyntaxError(request.query.clone()))
let user_input_ast = tantivy_query_grammar::parse_query(query_str)
.map_err(|_| QueryParserError::SyntaxError(query_str.to_owned()))
.unwrap();
let default_field_names =
default_search_fields.unwrap_or_else(|| vec!["title".to_string(), "desc".to_string()]);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl MergeExecutor {
.expect("A delete task must have a delete query.");
let search_request = SearchRequest {
index_id: delete_query.index_id,
query: delete_query.query,
query: delete_query.query.map(Into::into),
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
start_timestamp: delete_query.start_timestamp,
end_timestamp: delete_query.end_timestamp,
search_fields: delete_query.search_fields,
Expand Down Expand Up @@ -675,7 +675,7 @@ mod tests {
index_id: index_id.to_string(),
start_timestamp: None,
end_timestamp: None,
query: "body:delete".to_string(),
query: Some("body:delete".to_string().into()),
search_fields: Vec::new(),
})
.await?;
Expand Down
Loading