Skip to content

Commit

Permalink
Merge pull request #46 from georgetown-cset/45-unlink-split-merged-ids
Browse files Browse the repository at this point in the history
Split merged ids if they lose orig ids
  • Loading branch information
brianlove authored Oct 3, 2024
2 parents c046654 + c132c82 commit fe795c0
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 559 deletions.
43 changes: 5 additions & 38 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,7 @@
BigQueryToGCSOperator(
task_id="export_article_pairs",
source_project_dataset_table=f"{staging_dataset}.all_match_pairs_with_um",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/article_pairs/article_pairs*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_simhash_input",
source_project_dataset_table=f"{staging_dataset}.simhash_input",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/simhash_input/simhash_input*.jsonl",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
Expand All @@ -344,8 +338,7 @@
),
]

# Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the
# merge ids
# Start up godzilla of article linkage, create the merged ids
gce_instance_start = ComputeEngineStartInstanceOperator(
project_id=project_id,
zone=gce_zone,
Expand All @@ -364,14 +357,10 @@
"rm -rf current_ids",
"mkdir input_data",
"mkdir current_ids",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/article_pairs .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .",
"mkdir new_simhash_indexes",
]
prep_environment_vm_script = " && ".join(prep_environment_script_sequence)

Expand All @@ -380,24 +369,9 @@
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"',
)

update_simhash_index = BashOperator(
task_id="update_simhash_index",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"',
)

wait_for_simhash_index = GCSObjectExistenceSensor(
task_id="wait_for_simhash_index",
bucket=DATA_BUCKET,
object=f"{tmp_dir}/done_files/simhash_is_done",
deferrable=True,
)

create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"',
# These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task -
# but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files
# outside the VM it runs on and this task depends on all of these things, directly or indirectly
inlets=[
BigQueryTable(
project_id=project_id, dataset_id=production_dataset, table_id="sources"
Expand All @@ -407,11 +381,6 @@
dataset_id=staging_dataset,
table_id="all_match_pairs_with_um",
),
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="simhash_input",
),
BigQueryTable(
project_id=project_id, dataset_id=staging_dataset, table_id="unlink"
),
Expand Down Expand Up @@ -483,7 +452,7 @@
import_id_mapping = GCSToBigQueryOperator(
task_id="import_id_mapping",
bucket=bucket,
source_objects=[f"{tmp_dir}/id_mapping.jsonl"],
source_objects=[f"{tmp_dir}/new_id_mappings/*"],
schema_object=f"{schema_dir}/id_mapping.json",
destination_project_dataset_table=f"{staging_dataset}.id_mapping",
source_format="NEWLINE_DELIMITED_JSON",
Expand Down Expand Up @@ -577,7 +546,7 @@
BigQueryCheckOperator(
task_id="all_trivial_matches_survived",
sql=f"""
-- check that all article pairs generated by exact matches make it through the simhash and
-- check that all article pairs generated by exact matches make it through the
-- merged id assignment, except ones we've deliberately unlinked
select
count(0) = 0
Expand Down Expand Up @@ -696,8 +665,6 @@
>> heavy_compute_inputs
>> gce_instance_start
>> prep_environment
>> update_simhash_index
>> wait_for_simhash_index
>> create_cset_ids
>> wait_for_cset_ids
>> gce_instance_stop
Expand Down
1 change: 0 additions & 1 deletion sequences/merge_combined_metadata.tsv
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
arxiv_id_match
metadata_match
all_match_pairs_with_um
simhash_input
lid_input
ids_to_drop
18 changes: 0 additions & 18 deletions sql/simhash_input.sql

This file was deleted.

4 changes: 4 additions & 0 deletions tests/static/test_create_match_keys/input/input.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
{"orig_id": "F", "merged_id": "carticle_0000000001"}
{"orig_id": "I", "merged_id": "carticle_0000000003"}
{"orig_id": "J", "merged_id": "carticle_0000000003"}
{"orig_id": "K", "merged_id": "carticle_0000000004"}
{"orig_id": "L", "merged_id": "carticle_0000000004"}
{"orig_id": "M", "merged_id": "carticle_0000000005"}
{"orig_id": "N", "merged_id": "carticle_0000000005"}
3 changes: 0 additions & 3 deletions tests/static/test_get_match_sets_with_extra_id/ids/ids1.jsonl

This file was deleted.

3 changes: 0 additions & 3 deletions tests/static/test_get_match_sets_with_extra_id/ids/ids2.jsonl

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

83 changes: 39 additions & 44 deletions tests/test_create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import unittest

from utils.create_merge_ids import create_match_keys, create_match_sets
from utils.create_merge_ids import create_match_sets, create_matches

static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")

Expand Down Expand Up @@ -43,24 +43,6 @@ def test_get_combined_map5(self):
actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k))
self.assertEqual(actual_result, expected_result)

def test_get_match_sets_with_extra_id(self):
# test with three disconnected sets. The set A - E will have one extra id (E) that should get filtered, and
# the "small" set F-H will all be extra ids that should be filtered. The other small set I-J will have ids
# distributed across two id files, but the set should be included.
match_dir = os.path.join(
static_dir, "test_get_match_sets_with_extra_id", "match_pairs"
)
ids_dir = os.path.join(static_dir, "test_get_match_sets_with_extra_id", "ids")
result_set_large = {"A", "B", "C", "D"}
result_set_small = {"I", "J"}
expected_result = sorted(
[result_set_small, result_set_large], key=lambda k: len(k)
)
actual_result = sorted(
create_match_sets(match_dir, ids_dir), key=lambda k: len(k)
)
self.assertEqual(actual_result, expected_result)

def test_skip_matches(self):
# test without matches excluded
match_dir = os.path.join(static_dir, "test_skip_matches_ids")
Expand All @@ -74,33 +56,46 @@ def test_skip_matches(self):
expected_result_excludes,
)

def test_create_match_keys(self):
# The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id.
# The next (D, E, F) contains one elt from one match set, two from another; should change ids.
# Another (G, H) contains only new ids; should get a new id.
# The last two (I and J) are two different match sets that share an old id and are in ids_to_drop;
# each should get a new id (this is in case of unlinking).
match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}]
out_dir = os.path.join(static_dir, "test_create_match_keys", "output")
if os.path.exists(out_dir):
shutil.rmtree(out_dir)
os.mkdir(out_dir)
out_fi = os.path.join(out_dir, "output.jsonl")
def test_create_matches(self):
match_sets = [
{"A", "B", "C"},
{"D", "E", "F"},
{"G", "H"},
{"I"},
{"J"},
{"K", "L"},
{"M", "N", "O"},
]
id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input")
ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop")
expected_output = [
{"orig_id": "A", "merged_id": "carticle_0000000001"},
{"orig_id": "B", "merged_id": "carticle_0000000001"},
{"orig_id": "C", "merged_id": "carticle_0000000001"},
{"orig_id": "D", "merged_id": "carticle_0000000004"},
{"orig_id": "E", "merged_id": "carticle_0000000004"},
{"orig_id": "F", "merged_id": "carticle_0000000004"},
{"orig_id": "G", "merged_id": "carticle_0000000005"},
{"orig_id": "H", "merged_id": "carticle_0000000005"},
{"orig_id": "I", "merged_id": "carticle_0000000006"},
{"orig_id": "J", "merged_id": "carticle_0000000007"},
# F was removed from this match set so A B and C should get a new merged id
{"orig_id": "A", "merged_id": "carticle_0000000006"},
{"orig_id": "B", "merged_id": "carticle_0000000006"},
{"orig_id": "C", "merged_id": "carticle_0000000006"},
# D, E, F contains one elt from one match set, two from another; should change ids
{"orig_id": "D", "merged_id": "carticle_0000000007"},
{"orig_id": "E", "merged_id": "carticle_0000000007"},
{"orig_id": "F", "merged_id": "carticle_0000000007"},
# G, H is a completely new match set with new ids, should get a new id
{"orig_id": "G", "merged_id": "carticle_0000000008"},
{"orig_id": "H", "merged_id": "carticle_0000000008"},
# The last two (I and J) are two different match sets that share an old id and are in ids_to_drop;
# each should get a new id
{"orig_id": "I", "merged_id": "carticle_0000000009"},
{"orig_id": "J", "merged_id": "carticle_0000000010"},
# Nothing changed for this match set so the merged id stays the same
{"orig_id": "K", "merged_id": "carticle_0000000004"},
{"orig_id": "L", "merged_id": "carticle_0000000004"},
# This match set got one new article so the merged id stays the same
{"orig_id": "M", "merged_id": "carticle_0000000005"},
{"orig_id": "N", "merged_id": "carticle_0000000005"},
{"orig_id": "O", "merged_id": "carticle_0000000005"},
]
print(expected_output)
create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir)
out = [json.loads(x) for x in open(out_fi).readlines()]
self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"]))
match_batches = create_matches(match_sets, ids_to_drop, id_mapping_dir)
matches = []
for match_batch, batch_id in match_batches:
matches.extend(match_batch)
print(sorted(matches, key=lambda x: x["orig_id"]))
self.assertEqual(expected_output, sorted(matches, key=lambda x: x["orig_id"]))
Loading

0 comments on commit fe795c0

Please sign in to comment.