diff --git a/params.yaml b/params.yaml index 66b7b47a..1c74828f 100644 --- a/params.yaml +++ b/params.yaml @@ -422,13 +422,7 @@ ratio_study: # in the interpret pipeline stage. Only used if comp_enable = true comp: # Number of comps to generate for each PIN/card - num_comps: 20 - - # Number of price bins to use when binning properties for the purpose of - # comparison. Corresponds to ntile of predicted FMV, e.g. 10 creates deciles - # of predicted values. Larger deciles = larger comparables search range - num_price_bins: 20 - + num_comps: 5 # Export ----------------------------------------------------------------------- diff --git a/pipeline/04-interpret.R b/pipeline/04-interpret.R index 3011c2aa..130f6376 100644 --- a/pipeline/04-interpret.R +++ b/pipeline/04-interpret.R @@ -122,14 +122,30 @@ lightgbm::lgb.importance(lgbm_final_full_fit$fit) %>% if (comp_enable) { message("Finding comparables") + # Filter target properties for only the current triad, to speed up the comps + # algorithm + comp_assessment_data <- assessment_data %>% + filter( + meta_township_code %in% ( + ccao::town_dict %>% + filter(triad_name == tools::toTitleCase(params$assessment$triad)) %>% + pull(township_code) + ) + ) + comp_assessment_data_prepped <- recipes::bake( + object = lgbm_final_full_recipe, + new_data = comp_assessment_data, + all_predictors() + ) + # Calculate the leaf node assignments for every predicted value. # Due to integer overflow problems with leaf node assignment, we need to # chunk our data such that they are strictly less than the limit of 1073742 # rows. More detail here: https://github.com/microsoft/LightGBM/issues/1884 chunk_size <- 500000 chunks <- split( - assessment_data_prepped, - ceiling(seq_along(assessment_data_prepped[[1]]) / chunk_size) + comp_assessment_data_prepped, + ceiling(seq_along(comp_assessment_data_prepped[[1]]) / chunk_size) ) chunked_leaf_nodes <- chunks %>% map(\(chunk) { @@ -184,20 +200,6 @@ if (comp_enable) { type = "leaf" ) %>% as_tibble() - training_leaf_nodes$predicted_value <- predict( - object = lgbm_final_full_fit$fit, - newdata = as.matrix(training_data_prepped) - ) %>% - # Round predicted values down for binning - floor() - - # Get predicted values for the assessment set, which we already have in - # the assessment card set - leaf_nodes$predicted_value <- assessment_data %>% - left_join(assessment_card, by = c("meta_pin", "meta_card_num")) %>% - # Round predicted values down for binning - mutate(pred_card_initial_fmv = floor(pred_card_initial_fmv)) %>% - dplyr::pull(pred_card_initial_fmv) # Make sure that the leaf node tibbles are all integers, which is what # the comps algorithm expects @@ -213,8 +215,7 @@ if (comp_enable) { { comps <- comps_module$get_comps( leaf_nodes, training_leaf_nodes, tree_weights, - num_comps = as.integer(params$comp$num_comps), - num_price_bins = as.integer(params$comp$num_price_bins) + num_comps = as.integer(params$comp$num_comps) ) }, error = function(e) { @@ -223,32 +224,38 @@ if (comp_enable) { stop("Encountered error in python/comps.py") } ) - # Correct for the fact that Python is 0-indexed by incrementing the - # comp indexes by 1 - comps[[1]] <- comps[[1]] + 1 # Translate comp indexes to PINs and document numbers comps[[1]] <- comps[[1]] %>% mutate( + # Correct for the fact that Python is 0-indexed by incrementing the + # comp indexes by 1, and cast null indicators (-1) to null + across(everything(), ~ ifelse(. == -1, NA, . + 1)), + # Map comp index to PIN across( starts_with("comp_idx_"), \(idx_row) { - training_data[idx_row, ]$meta_pin + ifelse(is.na(idx_row), NA, training_data[idx_row, ]$meta_pin) }, .names = "comp_pin_{str_remove(col, 'comp_idx_')}" ), + # Map comp index to sale doc number across( starts_with("comp_idx_"), \(idx_row) { - training_data[idx_row, ]$meta_sale_document_num + ifelse( + is.na(idx_row), + NA, + training_data[idx_row, ]$meta_sale_document_num + ) }, .names = "comp_document_num_{str_remove(col, 'comp_idx_')}" ) ) %>% select(-starts_with("comp_idx_")) %>% cbind( - pin = assessment_data$meta_pin, - card = assessment_data$meta_card_num + pin = comp_assessment_data$meta_pin, + card = comp_assessment_data$meta_card_num ) %>% relocate(pin, card) diff --git a/pipeline/05-finalize.R b/pipeline/05-finalize.R index 727d2baa..f2ac3fd3 100644 --- a/pipeline/05-finalize.R +++ b/pipeline/05-finalize.R @@ -91,7 +91,6 @@ metadata <- tibble::tibble( shap_enable = shap_enable, comp_enable = comp_enable, comp_num_comps = params$comp$num_comps, - comp_num_price_bins = params$comp$num_price_bins, cv_enable = cv_enable, cv_num_folds = params$cv$num_folds, cv_fold_overlap = params$cv$fold_overlap, diff --git a/python/comps.py b/python/comps.py index bfaacb29..76fb2e4c 100644 --- a/python/comps.py +++ b/python/comps.py @@ -1,5 +1,5 @@ -import numpy as np import numba as nb +import numpy as np import pandas as pd @@ -7,8 +7,7 @@ def get_comps( observation_df, comparison_df, weights, - num_comps=20, - num_price_bins=10, + num_comps=5, ): """Fast algorithm to get the top `num_comps` comps from a dataframe of lightgbm leaf node assignments (`observation_df`) compared to a second dataframe of @@ -17,213 +16,69 @@ def get_comps( return two dataframes, one a set of indices and the other a set of scores for the `n` most similar comparables. More details on the underlying algorithm here: https://ccao-data.github.io/lightsnip/articles/finding-comps.html - - The function expects that `observation_df` and `comparison_df` will both - have a column called `predicted_value`. These two columns represent the - (integer) predicted value for the observations and the comparisons. - These columns are then used along with the `num_price_bins` parameter to bin - the comparison data and only compare observations to comparisons that are in - the three closest bins to the observation. """ # Convert the weights to a numpy array so that we can take advantage of # numba acceleration later on weights_matrix = np.asarray(weights, dtype=np.float32) - # Add ID columns so that we can keep track of the initial position of - # each row as we sort them. This is necessary to allow the caller to - # translate results back to parcels based on row index - observation_df["id"] = list(range(len(observation_df))) - comparison_df["id"] = list(range(len(comparison_df))) - - # Sort the comparison data and extract the indexes of rows that represent - # boundaries between price bins. We'll use these bin indexes to - # reduce the number of comparison parcels that we need to search for each - # observation - sorted_comparison_df = comparison_df.sort_values( - ["predicted_value"] - ).reset_index( - drop=True - ) - sorted_comparison_df["price_bin"] = pd.qcut( - sorted_comparison_df["predicted_value"], - num_price_bins, - labels=np.arange(0, num_price_bins), - ) - - # Extract price bin metadata from the binned comparison data - price_bin_indices = sorted_comparison_df.groupby( - "price_bin", observed=False # Silence deprecated `observed` warning - ).apply( - lambda x: x.index.tolist() - ).apply( - lambda x: [x[0], x[-1]] - ) - bin_argmin = [idx[0] for idx in price_bin_indices.values] - bin_argmax = [idx[1] for idx in price_bin_indices.values] - price_bin_indices = pd.DataFrame( - { - "id": price_bin_indices.index.astype(int), - "argmin": bin_argmin, - "argmax": bin_argmax, - "min": sorted_comparison_df["predicted_value"][bin_argmin].values, - "max": sorted_comparison_df["predicted_value"][bin_argmax].values - }, - ) - - # Fudge the minimum/maximum values in the price bin range so that they - # approximate infinity. This is necessary because observation data may - # have observed values that are lower than the minimum value in the - # comparison data, or higher than the maximum value - price_bin_indices.at[0, "min"] = np.iinfo(np.int32).min + 1 - price_bin_indices.at[num_price_bins - 1, "max"] = np.iinfo(np.int32).max - 1 - - # Update bins to ensure they have no gaps. This enables us to put values - # from the observation dataframe into these bins, since otherwise - # values in the observation dataframe might be in between the maximum - # and minimum values of two adjacent bins - for bin_idx in range(len(price_bin_indices)): - # Skip the first bin, since its min val should aways be the smallest - # possible int - if bin_idx > 0: - price_bin_indices.at[bin_idx, "min"] = ( - price_bin_indices["max"][bin_idx - 1] + 1 - ) - - # Place observations in bins. Do this in a numba-accelerated function so - # that we can make use of fast loops - observation_df["price_bin"] = _bin_by_price( - observation_df[["id", "predicted_value"]].values, - price_bin_indices.values + # Chunk the observations so that the script can periodically report progress + num_chunks = 10 + observation_df["chunk"] = pd.cut( + observation_df.index, bins=num_chunks, labels=False ) total_num_observations = len(observation_df) - total_num_possible_comps = len(sorted_comparison_df) - binned_ids, binned_scores = [], [] - for bin_idx, bin in price_bin_indices.iterrows(): - observations = observation_df[observation_df["price_bin"] == bin["id"]] + total_num_possible_comps = len(comparison_df) + chunked_ids, chunked_scores = [], [] + for chunk_num in set(observation_df["chunk"]): + observations = observation_df[observation_df["chunk"] == chunk_num] + # Drop chunk column to produce a matrix that we can accelerate + # with numba + observation_matrix = observations.drop(columns=["chunk"]).values - if observations.empty: - print( - f"No observations in bin {bin['id'] + 1}; skipping", - # Flush statement to stdout so that reticulate will print it - # in real time - flush=True - ) - continue - - observation_matrix = observations.drop( - columns=["id", "predicted_value", "price_bin"] - ).values - - # Add a 1-bin buffer on either side in case an observation is close to - # a bin edge. In addition, make sure that argmax is inclusive by - # expanding the range by 1 - argmin, argmax = bin["argmin"], bin["argmax"] + 1 - price_min, price_max = bin["min"], bin["max"] - if bin_idx > 0: - prev_bin = price_bin_indices.iloc[bin_idx - 1] - argmin, price_min = prev_bin["argmin"], prev_bin["min"] - if bin_idx < len(price_bin_indices) - 1: - next_bin = price_bin_indices.iloc[bin_idx + 1] - argmax, price_max = next_bin["argmax"] + 1, next_bin["max"] - - possible_comps = sorted_comparison_df[argmin:argmax] - comp_idx_to_id = dict( - zip( - possible_comps.reset_index(drop=True).index, - possible_comps['id'] - ) - ) - # Handle -1, which is an indicator of no match - comp_idx_to_id[-1] = -1 - possible_comp_matrix = possible_comps.drop( - columns=["id", "predicted_value", "price_bin"] - ).values + # Produce a numba-compatible matrix for the comparisons + possible_comp_matrix = comparison_df.values print( ( - f"Getting top {num_comps} comps for price bin {bin['id'] + 1}/" - f"{len(price_bin_indices)} (${price_min:,} to ${price_max:,}) - " + f"Getting top {num_comps} comps for chunk {chunk_num}/" + f"{num_chunks} - " f"{len(observations)}/{total_num_observations} observations, " - f"{len(possible_comps)}/{total_num_possible_comps} possible comps" + f"{total_num_possible_comps} possible comps" ), - flush=True + # Flush so that we can print to the console in realtime when + # reticulate runs this function in an R context + flush=True, ) + # Compute comps for each observation comp_ids, comp_scores = _get_top_n_comps( observation_matrix, possible_comp_matrix, weights_matrix, num_comps ) - # Match comp and observation IDs back to the original dataframes since - # we have since rearranged them - matched_comp_ids = np.vectorize(comp_idx_to_id.get)(comp_ids) - observation_ids = observations["id"].values + observation_ids = observations.index.values for obs_idx, comp_idx, comp_score in zip( - observation_ids, matched_comp_ids, comp_scores + observation_ids, comp_ids, comp_scores ): - binned_ids.append((obs_idx, comp_idx)) - binned_scores.append((obs_idx, comp_score)) - - # Sort the IDs and comps according to the original order of the input - # data so that the output data has the same order - sorted_binned_ids = [ - binned_idx[1] - for binned_idx in sorted( - binned_ids, key=lambda binned_idx: binned_idx[0] - ) - ] - sorted_binned_scores = [ - binned_score[1] - for binned_score in sorted( - binned_scores, key=lambda binned_score: binned_score[0] - ) - ] + chunked_ids.append((obs_idx, comp_idx)) + chunked_scores.append((obs_idx, comp_score)) # Turn the comps matrices into pandas dataframes to match the type of # the input data indexes_df = pd.DataFrame( - np.asarray(sorted_binned_ids), - columns=[f"comp_idx_{idx}" for idx in range(1, num_comps + 1)] + # We don't need the observation ID, since the output should be in the + # same order as the input + np.asarray([chunked_id[1] for chunked_id in chunked_ids]), + columns=[f"comp_idx_{idx}" for idx in range(1, num_comps + 1)], ) scores_df = pd.DataFrame( - np.asarray(sorted_binned_scores), - columns=[f"comp_score_{idx}" for idx in range(1, num_comps + 1)] + np.asarray([chunked_score[1] for chunked_score in chunked_scores]), + columns=[f"comp_score_{idx}" for idx in range(1, num_comps + 1)], ) return indexes_df, scores_df -@nb.njit(fastmath=True, parallel=True) -def _bin_by_price(observation_matrix, price_bin_matrix): - """Given a matrix of observations and a matrix of price bins, place the - observations in the closest price bin and return an array of bin IDs - with the same length as the observation matrix.""" - num_observations = len(observation_matrix) - price_bin_idx, price_bin_min_idx, price_bin_max_idx = 0, 3, 4 - observation_price_idx = 1 - output_matrix = np.zeros(num_observations, dtype=np.int32) - - for obs_idx in nb.prange(num_observations): - observation = observation_matrix[obs_idx] - observation_price = observation[observation_price_idx] - for bin in price_bin_matrix: - if ( - # Since we expect the price bins to be non-overlapping with - # no gaps and an integer difference of 1 between ranges, the - # ranges can be treated as inclusive on both ends - observation_price >= bin[price_bin_min_idx] and - observation_price <= bin[price_bin_max_idx] - ): - output_matrix[obs_idx] = bin[price_bin_idx] - break - else: - raise ValueError( - f"Observation {obs_idx} did not match any price bins" - ) - - return output_matrix - - @nb.njit(fastmath=True, parallel=True) def _get_top_n_comps( leaf_node_matrix, comparison_leaf_node_matrix, weights_matrix, num_comps @@ -246,50 +101,41 @@ def _get_top_n_comps( all_top_n_scores = np.zeros((num_observations, num_comps), dtype=score_dtype) for x_i in nb.prange(num_observations): - top_n_idxs = np.full(num_comps, -1, dtype=idx_dtype) - top_n_scores = np.zeros(num_comps, dtype=score_dtype) - # TODO: We could probably speed this up by skipping comparisons we've # already made; we just need to do it in a way that will have a # low memory footprint for y_i in range(num_possible_comparisons): similarity_score = 0.0 for tree_idx in range(len(leaf_node_matrix[x_i])): - similarity_score += ( - weights_matrix[x_i][tree_idx] * ( - leaf_node_matrix[x_i][tree_idx] == - comparison_leaf_node_matrix[y_i][tree_idx] - ) - ) + if ( + leaf_node_matrix[x_i][tree_idx] + == comparison_leaf_node_matrix[y_i][tree_idx] + ): + similarity_score += weights_matrix[x_i][tree_idx] # See if the score is higher than any of the top N # comps, and store it in the sorted comps array if it is. # First check if the score is higher than the lowest score, # since otherwise we don't need to bother iterating the scores - if similarity_score > top_n_scores[-1]: - for idx, score in enumerate(top_n_scores): - if similarity_score > score: - top_n_idxs = _insert_at_idx_and_shift( - top_n_idxs, y_i, idx - ) - top_n_scores = _insert_at_idx_and_shift( - top_n_scores, similarity_score, idx - ) - break - - all_top_n_idxs[x_i] = top_n_idxs - all_top_n_scores[x_i] = top_n_scores + if similarity_score > all_top_n_scores[x_i][-1]: + for idx, score in enumerate(all_top_n_scores[x_i]): + if similarity_score > score: + _insert_at_idx_and_shift(all_top_n_idxs[x_i], y_i, idx) + _insert_at_idx_and_shift( + all_top_n_scores[x_i], similarity_score, idx + ) + break return all_top_n_idxs, all_top_n_scores @nb.njit(fastmath=True) def _insert_at_idx_and_shift(arr, elem, idx): - """Helper function to insert an element `elem` into a sorted numpy array `arr` - at a given index `idx` and shift the subsequent elements down one index.""" - return np.concatenate(( - arr[:idx], np.array([(elem)], dtype=arr.dtype), arr[idx:-1] - )) + """Helper function to insert an element `elem` into a sorted numpy array `arr` + at a given index `idx` and shift the subsequent elements down one index.""" + arr[idx + 1 :] = arr[idx:-1] + arr[idx] = elem + return arr if __name__ == "__main__": @@ -303,23 +149,13 @@ def _insert_at_idx_and_shift(arr, elem, idx): mean_sale_price = 350000 std_deviation = 110000 - leaf_nodes = pd.DataFrame( - np.random.randint(0, num_obs, size=[num_obs, num_trees]) - ) - leaf_nodes["predicted_value"] = np.random.normal( - mean_sale_price, std_deviation, size=num_obs - ).astype(int) - + leaf_nodes = pd.DataFrame(np.random.randint(0, num_obs, size=[num_obs, num_trees])) training_leaf_nodes = pd.DataFrame( np.random.randint(0, num_comparisons, size=[num_comparisons, num_trees]) ) - training_leaf_nodes["predicted_value"] = np.random.normal( - mean_sale_price, std_deviation, size=num_comparisons - ).astype(int) - tree_weights = np.asarray([ - np.random.dirichlet(np.ones(num_trees)) - for _ in range(num_comparisons) - ]) + tree_weights = np.asarray( + [np.random.dirichlet(np.ones(num_trees)) for _ in range(num_comparisons)] + ) start = time.time() get_comps(leaf_nodes, training_leaf_nodes, tree_weights)