Skip to content

Commit

Permalink
use quantized vector cache during build
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Feb 16, 2024
1 parent b94ef69 commit ba13209
Showing 1 changed file with 13 additions and 21 deletions.
34 changes: 13 additions & 21 deletions timescale_vector/src/access_method/bq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
storage::{ArchivedData, NodeFullDistanceMeasure, Storage, StorageFullDistanceFromHeap},
storage_common::calculate_full_distance,
};
use std::{collections::HashMap, iter::once, marker::PhantomData, pin::Pin};
use std::{cell::RefCell, collections::HashMap, iter::once, marker::PhantomData, pin::Pin};

use pgrx::{
pg_sys::{InvalidBlockNumber, InvalidOffsetNumber},
Expand Down Expand Up @@ -105,6 +105,7 @@ impl BqQuantizer {
}

fn quantize(&self, full_vector: &[f32]) -> Vec<BqVectorElement> {
assert!(!self.training);
if self.use_mean {
let mut res_vector = vec![0; Self::quantized_size(full_vector.len())];

Expand Down Expand Up @@ -157,7 +158,6 @@ impl BqQuantizer {
meta_page: &super::meta_page::MetaPage,
full_vector: &[f32],
) -> Vec<BqVectorElement> {
assert!(!self.training);
self.quantize(&full_vector)

/*if self.use_mean && self.training {
Expand Down Expand Up @@ -293,10 +293,7 @@ impl QuantizedVectorCache {
self.quantized_vector_map
.entry(index_pointer)
.or_insert_with(|| {
let rn = unsafe { BqNode::read(storage.index, index_pointer, stats) };
let node = rn.get_archived_node();
node.bq_vector.as_slice().to_vec()
//storage.get_quantized_vector_from_index_pointer(index_pointer, stats)
storage.get_quantized_vector_from_index_pointer(index_pointer, stats)
})
}

Expand Down Expand Up @@ -325,7 +322,7 @@ pub struct BqSpeedupStorage<'a> {
quantizer: BqQuantizer,
heap_rel: Option<&'a PgRelation>,
heap_attr: Option<pgrx::pg_sys::AttrNumber>,
qv_cache: Option<QuantizedVectorCache>,
qv_cache: RefCell<QuantizedVectorCache>,
}

impl<'a> BqSpeedupStorage<'a> {
Expand All @@ -340,7 +337,7 @@ impl<'a> BqSpeedupStorage<'a> {
quantizer: BqQuantizer::new(),
heap_rel: Some(heap_rel),
heap_attr: Some(heap_attr),
qv_cache: None,
qv_cache: RefCell::new(QuantizedVectorCache::new(1000)),
}
}

Expand All @@ -365,7 +362,7 @@ impl<'a> BqSpeedupStorage<'a> {
quantizer: Self::load_quantizer(index_relation, meta_page, stats),
heap_rel: Some(heap_rel),
heap_attr: Some(heap_attr),
qv_cache: None,
qv_cache: RefCell::new(QuantizedVectorCache::new(1000)),
}
}

Expand All @@ -380,7 +377,7 @@ impl<'a> BqSpeedupStorage<'a> {
quantizer: quantizer.clone(),
heap_rel: None,
heap_attr: None,
qv_cache: None,
qv_cache: RefCell::new(QuantizedVectorCache::new(1000)),
}
}

Expand All @@ -389,9 +386,9 @@ impl<'a> BqSpeedupStorage<'a> {
index_pointer: IndexPointer,
stats: &mut S,
) -> Vec<BqVectorElement> {
let slot = unsafe { self.get_heap_table_slot_from_index_pointer(index_pointer, stats) };
let slice = unsafe { slot.get_pg_vector() };
self.quantizer.quantize(slice.to_slice())
let rn = unsafe { BqNode::read(self.index, index_pointer, stats) };
let node = rn.get_archived_node();
node.bq_vector.as_slice().to_vec()
}

fn write_quantizer_metadata<S: StatsNodeWrite>(&self, stats: &mut S) {
Expand Down Expand Up @@ -463,11 +460,8 @@ impl<'a> BqSpeedupStorage<'a> {
)
}
GraphNeighborStore::Builder(b) => {
let rn_neighbor = unsafe {
BqNode::read(self.index, neighbor_index_pointer, &mut lsr.stats)
};
let node_neighbor = rn_neighbor.get_archived_node();
let bq_vector = node_neighbor.bq_vector.as_slice();
let mut cache = self.qv_cache.borrow_mut();
let bq_vector = cache.get(neighbor_index_pointer, self, &mut lsr.stats);
let dist = BqSearchDistanceMeasure::calculate_bq_distance(
table,
bq_vector,
Expand Down Expand Up @@ -527,7 +521,6 @@ impl<'a> Storage for BqSpeedupStorage<'a> {
fn finish_training(&mut self, stats: &mut WriteStats) {
self.quantizer.finish_training();
self.write_quantizer_metadata(stats);
self.qv_cache = Some(QuantizedVectorCache::new(1000));
}

fn finalize_node_at_end_of_build<S: StatsNodeRead + StatsNodeModify>(
Expand All @@ -537,7 +530,7 @@ impl<'a> Storage for BqSpeedupStorage<'a> {
neighbors: &Vec<NeighborWithDistance>,
stats: &mut S,
) {
let mut cache = self.qv_cache.take().unwrap();
let mut cache = self.qv_cache.borrow_mut();
/* It's important to preload cache with all the items since you can run into deadlocks
if you try to fetch a quantized vector while holding the BqNode::modify lock */
let iter = neighbors
Expand All @@ -556,7 +549,6 @@ impl<'a> Storage for BqSpeedupStorage<'a> {
}

node.commit();
self.qv_cache = Some(cache);
}

unsafe fn get_full_vector_distance_state<'b, S: StatsNodeRead>(
Expand Down

0 comments on commit ba13209

Please sign in to comment.