From 017ac4d84f0b3608e050f6f556bc9bf76c400edf Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 14 Aug 2024 15:24:31 -0400 Subject: [PATCH 1/4] Split merged ids if they lose orig ids Closes #45 --- .../test_create_match_keys/input/input.jsonl | 4 ++ tests/test_create_merge_ids.py | 47 ++++++++++++------- utils/create_merge_ids.py | 9 ++++ 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/tests/static/test_create_match_keys/input/input.jsonl b/tests/static/test_create_match_keys/input/input.jsonl index 1876580..8bcbf43 100644 --- a/tests/static/test_create_match_keys/input/input.jsonl +++ b/tests/static/test_create_match_keys/input/input.jsonl @@ -5,3 +5,7 @@ {"orig_id": "F", "merged_id": "carticle_0000000001"} {"orig_id": "I", "merged_id": "carticle_0000000003"} {"orig_id": "J", "merged_id": "carticle_0000000003"} +{"orig_id": "K", "merged_id": "carticle_0000000004"} +{"orig_id": "L", "merged_id": "carticle_0000000004"} +{"orig_id": "M", "merged_id": "carticle_0000000005"} +{"orig_id": "N", "merged_id": "carticle_0000000005"} diff --git a/tests/test_create_merge_ids.py b/tests/test_create_merge_ids.py index ee4b8cd..67be887 100644 --- a/tests/test_create_merge_ids.py +++ b/tests/test_create_merge_ids.py @@ -75,12 +75,15 @@ def test_skip_matches(self): ) def test_create_match_keys(self): - # The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id. - # The next (D, E, F) contains one elt from one match set, two from another; should change ids. - # Another (G, H) contains only new ids; should get a new id. - # The last two (I and J) are two different match sets that share an old id and are in ids_to_drop; - # each should get a new id (this is in case of unlinking). - match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}] + match_sets = [ + {"A", "B", "C"}, + {"D", "E", "F"}, + {"G", "H"}, + {"I"}, + {"J"}, + {"K", "L"}, + {"M", "N", "O"}, + ] out_dir = os.path.join(static_dir, "test_create_match_keys", "output") if os.path.exists(out_dir): shutil.rmtree(out_dir) @@ -89,16 +92,28 @@ def test_create_match_keys(self): id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input") ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop") expected_output = [ - {"orig_id": "A", "merged_id": "carticle_0000000001"}, - {"orig_id": "B", "merged_id": "carticle_0000000001"}, - {"orig_id": "C", "merged_id": "carticle_0000000001"}, - {"orig_id": "D", "merged_id": "carticle_0000000004"}, - {"orig_id": "E", "merged_id": "carticle_0000000004"}, - {"orig_id": "F", "merged_id": "carticle_0000000004"}, - {"orig_id": "G", "merged_id": "carticle_0000000005"}, - {"orig_id": "H", "merged_id": "carticle_0000000005"}, - {"orig_id": "I", "merged_id": "carticle_0000000006"}, - {"orig_id": "J", "merged_id": "carticle_0000000007"}, + # F was removed from this match set so A B and C should get a new merged id + {"orig_id": "A", "merged_id": "carticle_0000000006"}, + {"orig_id": "B", "merged_id": "carticle_0000000006"}, + {"orig_id": "C", "merged_id": "carticle_0000000006"}, + # D, E, F contains one elt from one match set, two from another; should change ids + {"orig_id": "D", "merged_id": "carticle_0000000007"}, + {"orig_id": "E", "merged_id": "carticle_0000000007"}, + {"orig_id": "F", "merged_id": "carticle_0000000007"}, + # G, H is a completely new match set with new ids, should get a new id + {"orig_id": "G", "merged_id": "carticle_0000000008"}, + {"orig_id": "H", "merged_id": "carticle_0000000008"}, + # The last two (I and J) are two different match sets that share an old id and are in ids_to_drop; + # each should get a new id + {"orig_id": "I", "merged_id": "carticle_0000000009"}, + {"orig_id": "J", "merged_id": "carticle_0000000010"}, + # Nothing changed for this match set so the merged id stays the same + {"orig_id": "K", "merged_id": "carticle_0000000004"}, + {"orig_id": "L", "merged_id": "carticle_0000000004"}, + # This match set got one new article so the merged id stays the same + {"orig_id": "M", "merged_id": "carticle_0000000005"}, + {"orig_id": "N", "merged_id": "carticle_0000000005"}, + {"orig_id": "O", "merged_id": "carticle_0000000005"}, ] print(expected_output) create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index 299d133..06a5472 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -139,8 +139,10 @@ def create_match_keys( :param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form :return: None """ + print("Creating merged ids") with open(match_file, mode="w") as out: prev_orig_to_merg = {} + merg_to_orig = {} max_merg = "carticle_0" if prev_id_mapping_dir is not None: for fi in os.listdir(prev_id_mapping_dir): @@ -151,6 +153,9 @@ def create_match_keys( merg_id = js["merged_id"] assert orig_id not in prev_orig_to_merg prev_orig_to_merg[orig_id] = merg_id + if merg_id not in merg_to_orig: + merg_to_orig[merg_id] = set() + merg_to_orig[merg_id].add(orig_id) if merg_id > max_merg: max_merg = merg_id ignore_ids = set() @@ -171,6 +176,10 @@ def create_match_keys( ) if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids: cset_article_id = existing_ids.pop() + # In some cases, merged ids can "split apart", if their constituent articles no longer + # match. We'll detect this case by checking whether the old set of articles assigned to + # this merged id contain any entries missing from our current set + if cset_article_id and (len(merg_to_orig[cset_article_id] - set(ms)) == 0): num_old += 1 else: cset_article_id = create_cset_article_id(match_id) From 7635b223352547dc6323b3f8cd010da57a648089 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Fri, 23 Aug 2024 12:14:44 -0400 Subject: [PATCH 2/4] Speed up and simplify linking script Also update tests and add some uncommitted parts of the obsolete link breaking method --- linkage_dag.py | 6 +- tests/static/simhash_empty/empty.jsonl | 0 .../ids/ids1.jsonl | 3 - .../ids/ids2.jsonl | 3 - .../match_pairs/file1 | 1 + .../file2 => simhash_match_pairs/ids1.jsonl} | 0 .../file3 => simhash_match_pairs/ids2.jsonl} | 0 tests/test_create_merge_ids.py | 59 ++-- utils/create_merge_ids.py | 263 ++++++++++-------- utils/run_ids_scripts.sh | 4 +- utils/run_simhash_scripts.sh | 2 - 11 files changed, 188 insertions(+), 153 deletions(-) create mode 100644 tests/static/simhash_empty/empty.jsonl delete mode 100644 tests/static/test_get_match_sets_with_extra_id/ids/ids1.jsonl delete mode 100644 tests/static/test_get_match_sets_with_extra_id/ids/ids2.jsonl rename tests/static/test_get_match_sets_with_extra_id/{match_pairs/file2 => simhash_match_pairs/ids1.jsonl} (100%) rename tests/static/test_get_match_sets_with_extra_id/{match_pairs/file3 => simhash_match_pairs/ids2.jsonl} (100%) diff --git a/linkage_dag.py b/linkage_dag.py index 68497bf..9485326 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -315,7 +315,7 @@ BigQueryToGCSOperator( task_id="export_article_pairs", source_project_dataset_table=f"{staging_dataset}.all_match_pairs_with_um", - destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/article_pairs/article_pairs*.jsonl", + destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl", export_format="NEWLINE_DELIMITED_JSON", ), BigQueryToGCSOperator( @@ -364,7 +364,7 @@ "rm -rf current_ids", "mkdir input_data", "mkdir current_ids", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/article_pairs .", + f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .", @@ -483,7 +483,7 @@ import_id_mapping = GCSToBigQueryOperator( task_id="import_id_mapping", bucket=bucket, - source_objects=[f"{tmp_dir}/id_mapping.jsonl"], + source_objects=[f"{tmp_dir}/new_id_mappings/*"], schema_object=f"{schema_dir}/id_mapping.json", destination_project_dataset_table=f"{staging_dataset}.id_mapping", source_format="NEWLINE_DELIMITED_JSON", diff --git a/tests/static/simhash_empty/empty.jsonl b/tests/static/simhash_empty/empty.jsonl new file mode 100644 index 0000000..e69de29 diff --git a/tests/static/test_get_match_sets_with_extra_id/ids/ids1.jsonl b/tests/static/test_get_match_sets_with_extra_id/ids/ids1.jsonl deleted file mode 100644 index f859622..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/ids/ids1.jsonl +++ /dev/null @@ -1,3 +0,0 @@ -{"id1": "A"} -{"id1": "D"} -{"id1": "I"} diff --git a/tests/static/test_get_match_sets_with_extra_id/ids/ids2.jsonl b/tests/static/test_get_match_sets_with_extra_id/ids/ids2.jsonl deleted file mode 100644 index 30cedf7..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/ids/ids2.jsonl +++ /dev/null @@ -1,3 +0,0 @@ -{"id1": "B"} -{"id1": "C"} -{"id1": "J"} diff --git a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 b/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 index 87e3912..61d3beb 100644 --- a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 +++ b/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 @@ -1,3 +1,4 @@ {"id1": "A", "id2": "B"} {"id1": "B", "id2": "C"} {"id1": "J", "id2": "I"} +{"id1": "E", "id2": "E"} diff --git a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file2 b/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl similarity index 100% rename from tests/static/test_get_match_sets_with_extra_id/match_pairs/file2 rename to tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl diff --git a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file3 b/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl similarity index 100% rename from tests/static/test_get_match_sets_with_extra_id/match_pairs/file3 rename to tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl diff --git a/tests/test_create_merge_ids.py b/tests/test_create_merge_ids.py index 67be887..30a6c96 100644 --- a/tests/test_create_merge_ids.py +++ b/tests/test_create_merge_ids.py @@ -3,9 +3,10 @@ import shutil import unittest -from utils.create_merge_ids import create_match_keys, create_match_sets +from utils.create_merge_ids import create_match_sets, create_matches static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") +empty_simhash_dir = os.path.join(static_dir, "simhash_empty") class TestGetCombinedMap(unittest.TestCase): @@ -15,22 +16,30 @@ class TestGetCombinedMap(unittest.TestCase): def test_get_combined_map1(self): match_dir = os.path.join(static_dir, "test_get_combined_map1") expected_result = [{"A", "B", "C"}] - self.assertEqual(create_match_sets(match_dir), expected_result) + self.assertEqual( + create_match_sets(match_dir, empty_simhash_dir), expected_result + ) def test_get_combined_map2(self): match_dir = os.path.join(static_dir, "test_get_combined_map2") expected_result = [{"A", "B", "C", "D"}] - self.assertEqual(create_match_sets(match_dir), expected_result) + self.assertEqual( + create_match_sets(match_dir, empty_simhash_dir), expected_result + ) def test_get_combined_map3(self): match_dir = os.path.join(static_dir, "test_get_combined_map3") expected_result = [{"A", "B", "C", "D", "E"}] - self.assertEqual(create_match_sets(match_dir), expected_result) + self.assertEqual( + create_match_sets(match_dir, empty_simhash_dir), expected_result + ) def test_get_combined_map4(self): match_dir = os.path.join(static_dir, "test_get_combined_map4") expected_result = [{"A", "B", "C", "D", "E", "F", "G", "H"}] - self.assertEqual(create_match_sets(match_dir), expected_result) + self.assertEqual( + create_match_sets(match_dir, empty_simhash_dir), expected_result + ) def test_get_combined_map5(self): # test with two disconnected sets @@ -40,24 +49,28 @@ def test_get_combined_map5(self): expected_result = sorted( [result_set_small, result_set_large], key=lambda k: len(k) ) - actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k)) + actual_result = sorted( + create_match_sets(match_dir, empty_simhash_dir), key=lambda k: len(k) + ) self.assertEqual(actual_result, expected_result) def test_get_match_sets_with_extra_id(self): - # test with three disconnected sets. The set A - E will have one extra id (E) that should get filtered, and - # the "small" set F-H will all be extra ids that should be filtered. The other small set I-J will have ids - # distributed across two id files, but the set should be included. + # test with three disconnected sets. The set A - E will have one simhash match (E-A) that should be included, + # and matches involving the obsolete id D that should be filtered. The "small" set F-H contains simhash-only + # ids and should be filtered. The other small set I-J should be included. match_dir = os.path.join( static_dir, "test_get_match_sets_with_extra_id", "match_pairs" ) - ids_dir = os.path.join(static_dir, "test_get_match_sets_with_extra_id", "ids") - result_set_large = {"A", "B", "C", "D"} + simhash_match_dir = os.path.join( + static_dir, "test_get_match_sets_with_extra_id", "simhash_match_pairs" + ) + result_set_large = {"A", "B", "C", "E"} result_set_small = {"I", "J"} expected_result = sorted( [result_set_small, result_set_large], key=lambda k: len(k) ) actual_result = sorted( - create_match_sets(match_dir, ids_dir), key=lambda k: len(k) + create_match_sets(match_dir, simhash_match_dir), key=lambda k: len(k) ) self.assertEqual(actual_result, expected_result) @@ -65,16 +78,18 @@ def test_skip_matches(self): # test without matches excluded match_dir = os.path.join(static_dir, "test_skip_matches_ids") expected_result_no_excludes = [{"A", "B", "C"}, {"D", "E"}] - self.assertEqual(create_match_sets(match_dir), expected_result_no_excludes) + self.assertEqual( + create_match_sets(match_dir, empty_simhash_dir), expected_result_no_excludes + ) # test with matches excluded exclude_dir = os.path.join(static_dir, "test_skip_matches_ids_to_skip") expected_result_excludes = [{"A", "B"}, {"C"}, {"D"}, {"E"}] self.assertEqual( - create_match_sets(match_dir, exclude_dir=exclude_dir), + create_match_sets(match_dir, empty_simhash_dir, exclude_dir=exclude_dir), expected_result_excludes, ) - def test_create_match_keys(self): + def test_create_matches(self): match_sets = [ {"A", "B", "C"}, {"D", "E", "F"}, @@ -84,11 +99,6 @@ def test_create_match_keys(self): {"K", "L"}, {"M", "N", "O"}, ] - out_dir = os.path.join(static_dir, "test_create_match_keys", "output") - if os.path.exists(out_dir): - shutil.rmtree(out_dir) - os.mkdir(out_dir) - out_fi = os.path.join(out_dir, "output.jsonl") id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input") ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop") expected_output = [ @@ -116,6 +126,9 @@ def test_create_match_keys(self): {"orig_id": "O", "merged_id": "carticle_0000000005"}, ] print(expected_output) - create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir) - out = [json.loads(x) for x in open(out_fi).readlines()] - self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"])) + match_batches = create_matches(match_sets, ids_to_drop, id_mapping_dir) + matches = [] + for match_batch, batch_id in match_batches: + matches.extend(match_batch) + print(sorted(matches, key=lambda x: x["orig_id"])) + self.assertEqual(expected_output, sorted(matches, key=lambda x: x["orig_id"])) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index 06a5472..fd4bcaa 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -1,12 +1,13 @@ -import argparse -import json -import os - """ Creates match sets from pairs of linked articles, assigns each match set an id, and writes out a mapping from each id to each article in its match set. """ +import argparse +import json +import multiprocessing +import os + def create_cset_article_id(idx: int): """ @@ -38,24 +39,6 @@ def get_connected_edges(adj_list: dict, key: str) -> set: return conn_edges -def get_usable_ids(ids_dir: str) -> set: - """ - Get the set of usable ids from a directory of jsonl, or returns None if ids_dir is None - :param ids_dir: None or name of a directory containing jsonl with one key, "id", which are the ids we want to keep - :return: the set of usable ids, or None - """ - if ids_dir is None: - return None - usable_ids = set() - for fi in os.listdir(ids_dir): - print("reading " + fi) - with open(os.path.join(ids_dir, fi)) as f: - for line in f: - js = json.loads(line) - usable_ids.add(js["id1"]) - return usable_ids - - def get_exclude_matches(exclude_dir: str) -> dict: """ Build dict mapping ids to sets of other ids they should not be matched to @@ -79,13 +62,14 @@ def get_exclude_matches(exclude_dir: str) -> dict: def create_match_sets( - match_dir: str, current_ids_dir: str = None, exclude_dir: str = None + exact_match_dir: str, simhash_match_dir: str, exclude_dir: str = None ) -> list: """ Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles, - including "transitive matches". - :param match_dir: directory of exported jsonl files containing article matches - :param current_ids_dir: optional dir containing the current set of ids to use in jsonl form. If None, all ids will be used + including "transitive matches". We will use the ids present in the exact matches to filter the simhash matches, + since it's possible for obsolete ids to live on in the simhash index + :param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match + :param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash :param exclude_dir: directory of jsonl files containing article pairs that should not be matched together :return: list of sets of matched articles """ @@ -93,27 +77,30 @@ def create_match_sets( dont_match = get_exclude_matches(exclude_dir) print("getting adjacency lists") adj_list = {} - usable_ids = get_usable_ids(current_ids_dir) - for fi in os.listdir(match_dir): - with open(os.path.join(match_dir, fi)) as f: - for line in f: - js = json.loads(line) - key1 = js["id1"] - key2 = js["id2"] - if (usable_ids is not None) and ( - (key1 not in usable_ids) or (key2 not in usable_ids) - ): - continue - if key1 not in adj_list: - adj_list[key1] = set() - if key2 not in dont_match.get(key1, set()): - adj_list[key1].add(key2) - # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, - # this will ensure they get added to the same match set - if key2 not in adj_list: - adj_list[key2] = set() - if key1 not in dont_match.get(key2, set()): - adj_list[key2].add(key1) + usable_ids = set() + for match_dir, is_simhash in [(exact_match_dir, False), (simhash_match_dir, True)]: + for fi in os.listdir(match_dir): + with open(os.path.join(match_dir, fi)) as f: + for line in f: + js = json.loads(line) + key1 = js["id1"] + key2 = js["id2"] + if is_simhash: + if (key1 not in usable_ids) or (key2 not in usable_ids): + continue + else: + usable_ids.add(key1) + usable_ids.add(key2) + if key1 not in adj_list: + adj_list[key1] = set() + if key2 not in dont_match.get(key1, set()): + adj_list[key1].add(key2) + # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, + # this will ensure they get added to the same match set + if key2 not in adj_list: + adj_list[key2] = set() + if key1 not in dont_match.get(key2, set()): + adj_list[key2].add(key1) seen_ids = set() match_sets = [] for k in adj_list.keys(): @@ -127,9 +114,9 @@ def create_match_sets( return match_sets -def create_match_keys( - match_sets: list, match_file: str, ids_to_drop: str, prev_id_mapping_dir: str = None -): +def create_matches( + match_sets: list, ids_to_drop: str, prev_id_mapping_dir: str = None +) -> iter: """ Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match set to that id @@ -137,68 +124,113 @@ def create_match_keys( :param match_file: file where id mapping should be written :param ids_to_drop: directory containing merged ids that should not be used in jsonl form :param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form - :return: None + :return: a generator of tuples with two elements: a list of jsons containing orig_id, merged_id matches to be + written, and an identifier for the batch """ - print("Creating merged ids") - with open(match_file, mode="w") as out: - prev_orig_to_merg = {} - merg_to_orig = {} - max_merg = "carticle_0" - if prev_id_mapping_dir is not None: - for fi in os.listdir(prev_id_mapping_dir): - with open(os.path.join(prev_id_mapping_dir, fi)) as f: - for line in f: - js = json.loads(line.strip()) - orig_id = js["orig_id"] - merg_id = js["merged_id"] - assert orig_id not in prev_orig_to_merg - prev_orig_to_merg[orig_id] = merg_id - if merg_id not in merg_to_orig: - merg_to_orig[merg_id] = set() - merg_to_orig[merg_id].add(orig_id) - if merg_id > max_merg: - max_merg = merg_id - ignore_ids = set() - for fi in os.listdir(ids_to_drop): - with open(os.path.join(ids_to_drop, fi)) as f: + prev_orig_to_merg = {} + merg_to_orig = {} + max_merg = "carticle_0" + if prev_id_mapping_dir is not None: + for fi in os.listdir(prev_id_mapping_dir): + with open(os.path.join(prev_id_mapping_dir, fi)) as f: for line in f: js = json.loads(line.strip()) - ignore_ids.add(js["merged_id"]) - match_id = int(max_merg.split("carticle_")[1]) + 1 - num_new, num_old = 0, 0 - for ms in match_sets: - cset_article_id = None - # if we have exactly one existing id, reuse it, even if new articles are matched to it. - # if two articles that previously had different carticle ids are now in the same match set, - # create a new carticle id - existing_ids = set( - [prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg] - ) - if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids: - cset_article_id = existing_ids.pop() - # In some cases, merged ids can "split apart", if their constituent articles no longer - # match. We'll detect this case by checking whether the old set of articles assigned to - # this merged id contain any entries missing from our current set - if cset_article_id and (len(merg_to_orig[cset_article_id] - set(ms)) == 0): - num_old += 1 + orig_id = js["orig_id"] + merg_id = js["merged_id"] + assert orig_id not in prev_orig_to_merg + prev_orig_to_merg[orig_id] = merg_id + if merg_id not in merg_to_orig: + merg_to_orig[merg_id] = set() + merg_to_orig[merg_id].add(orig_id) + if merg_id > max_merg: + max_merg = merg_id + ignore_ids = set() + for fi in os.listdir(ids_to_drop): + with open(os.path.join(ids_to_drop, fi)) as f: + for line in f: + js = json.loads(line.strip()) + ignore_ids.add(js["merged_id"]) + match_id = int(max_merg.split("carticle_")[1]) + 1 + batch_size = 1_000_000 + batch_count = 0 + batch = [] + for ms in match_sets: + cset_article_id = None + # if we have exactly one existing id, reuse it, even if new articles are matched to it. + # if two articles that previously had different carticle ids are now in the same match set, + # create a new carticle id + existing_ids = set([prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg]) + if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids: + cset_article_id = existing_ids.pop() + # In some cases, merged ids can "split apart", if their constituent articles no longer + # match. We'll detect this case by checking whether the old set of articles assigned to + # this merged id contain any entries missing from our current set + if (cset_article_id and (len(merg_to_orig[cset_article_id] - set(ms)) > 0)) or ( + not cset_article_id + ): + cset_article_id = create_cset_article_id(match_id) + match_id += 1 + for article in ms: + match = {"merged_id": cset_article_id, "orig_id": article} + if len(batch) == batch_size: + yield batch, batch_count + batch = [match] + batch_count += 1 else: - cset_article_id = create_cset_article_id(match_id) - num_new += 1 - match_id += 1 - for article in ms: - out.write( - json.dumps({"merged_id": cset_article_id, "orig_id": article}) - + "\n" - ) - print(f"wrote {num_new} new ids and reused {num_old} ids") + batch.append(match) + yield batch, batch_count + + +def write_batch(match_batch: tuple, output_dir: str) -> None: + """ + Write a batch of matches to disk + :param match_batch: tuple of a list of jsons containing a merged id and orig id, and an identifier for the batch + :param output_dir: directory where matches should be written + :return: None + """ + matches, batch_id = match_batch + with open(os.path.join(output_dir, f"matches_{batch_id}.jsonl"), "w") as f: + for match in matches: + f.write(json.dumps(match) + "\n") + + +def write_matches( + exact_match_dir, + simhash_match_dir, + exclude_dir, + ids_to_drop, + prev_id_mapping_dir, + output_dir, +) -> None: + """ + Generate merged id-orig id pairs and write them out as a directory of jsonls + :param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match + :param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash + :param exclude_dir: directory of article pairs that should not be matched + :param ids_to_drop: file containing ids that should not be used + :param prev_id_mapping_dir: directory of jsonl containing previous mapping between orig ids and merged ids + :param output_dir: directory where jsonls containing new mappings between orig ids and merged ids should be written + :return: None + """ + match_sets = create_match_sets(exact_match_dir, simhash_match_dir, exclude_dir) + match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + with multiprocessing.Pool() as p: + p.starmap(write_batch, ((mb, output_dir) for mb in match_batches)) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( - "--match_dir", + "--exact_match_dir", + required=True, + help="directory of jsonls containing matched orig_ids from exact metadata match", + ) + parser.add_argument( + "--simhash_match_dir", required=True, - help="directory of exported jsonl from bigquery containing pairs of article matches", + help="directory of jsonls containing matched orig_ids from simhash", ) parser.add_argument( "--exclude_dir", @@ -210,26 +242,23 @@ def create_match_keys( required=True, help="file containing ids that should not be used", ) - parser.add_argument( - "--merge_file", required=True, help="file where merged ids should be written" - ) parser.add_argument( "--prev_id_mapping_dir", - help="directory of exported jsonl from bigquery containing pairs of article matches", + help="directory of jsonl containing previous mapping between orig ids and merged ids", ) parser.add_argument( - "--current_ids_dir", - help=( - "directory containing jsonl with one key, 'id'. " - "These are the ids that should be included in output. " - "If None, no ids will be filtered." - ), + "--output_dir", + required=True, + help="directory where jsonls containing new mappings between orig ids and " + "merged ids should be written", ) args = parser.parse_args() - match_sets = create_match_sets( - args.match_dir, args.current_ids_dir, args.exclude_dir - ) - create_match_keys( - match_sets, args.merge_file, args.ids_to_drop, args.prev_id_mapping_dir + write_matches( + args.exact_match_dir, + args.simhash_match_dir, + args.exclude_dir, + args.ids_to_drop, + args.prev_id_mapping_dir, + args.output_dir, ) diff --git a/utils/run_ids_scripts.sh b/utils/run_ids_scripts.sh index 59a0db8..e1751c4 100644 --- a/utils/run_ids_scripts.sh +++ b/utils/run_ids_scripts.sh @@ -1,7 +1,7 @@ cd /mnt/disks/data/run gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done -python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs -/snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/ +python3 create_merge_ids.py --exact_match_dir exact_matches --simhash_match_dir simhash_results --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings +/snap/bin/gsutil -m cp -r new_id_mappings gs://airflow-data-exchange/article_linkage/tmp/ /snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/ /snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/ /snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes_archive/$(date +%F)/ diff --git a/utils/run_simhash_scripts.sh b/utils/run_simhash_scripts.sh index 7b76579..95650e9 100644 --- a/utils/run_simhash_scripts.sh +++ b/utils/run_simhash_scripts.sh @@ -1,7 +1,5 @@ cd /mnt/disks/data/run gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/simhash_is_done python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes -cp -r article_pairs usable_ids -cp simhash_results/* article_pairs/ touch simhash_is_done gsutil cp simhash_is_done gs://airflow-data-exchange/article_linkage/tmp/done_files/ From 125e966886604bbbe8ddf42454299b9c3c927c34 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 11 Sep 2024 21:27:32 -0700 Subject: [PATCH 3/4] Simplify logic, improve naming --- utils/create_merge_ids.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index fd4bcaa..deed02d 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -19,7 +19,7 @@ def create_cset_article_id(idx: int): return f"carticle_{zero_padding}{idx}" -def get_connected_edges(adj_list: dict, key: str) -> set: +def get_connected_articles(adj_list: dict, key: str) -> set: """ Given a dict where a key-value pair corresponds to an article match and a particular article `key`, returns a set of articles matched to `key`. @@ -27,16 +27,16 @@ def get_connected_edges(adj_list: dict, key: str) -> set: :param key: an article to match in `adj_list` :return: a set of matched articles """ - conn_edges = {key} + conn_articles = {key} to_explore = adj_list[key] while len(to_explore) > 0: v = to_explore.pop() - if v not in conn_edges: - conn_edges.add(v) + if v not in conn_articles: + conn_articles.add(v) to_explore = to_explore.union( - {k for k in adj_list[v] if k not in conn_edges} + {k for k in adj_list[v] if k not in conn_articles} ) - return conn_edges + return conn_articles def get_exclude_matches(exclude_dir: str) -> dict: @@ -77,7 +77,6 @@ def create_match_sets( dont_match = get_exclude_matches(exclude_dir) print("getting adjacency lists") adj_list = {} - usable_ids = set() for match_dir, is_simhash in [(exact_match_dir, False), (simhash_match_dir, True)]: for fi in os.listdir(match_dir): with open(os.path.join(match_dir, fi)) as f: @@ -86,11 +85,8 @@ def create_match_sets( key1 = js["id1"] key2 = js["id2"] if is_simhash: - if (key1 not in usable_ids) or (key2 not in usable_ids): + if (key1 not in adj_list) or (key2 not in adj_list): continue - else: - usable_ids.add(key1) - usable_ids.add(key2) if key1 not in adj_list: adj_list[key1] = set() if key2 not in dont_match.get(key1, set()): @@ -101,13 +97,14 @@ def create_match_sets( adj_list[key2] = set() if key1 not in dont_match.get(key2, set()): adj_list[key2].add(key1) + print("getting connected articles") seen_ids = set() match_sets = [] for k in adj_list.keys(): if k in seen_ids: continue # grab every connected article - match_set = get_connected_edges(adj_list, k) + match_set = get_connected_articles(adj_list, k) for matched_key in match_set: seen_ids.add(matched_key) match_sets.append(match_set) From c132c8210ce9ea9242181326ee973e7ec1697cf1 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 11 Sep 2024 21:41:06 -0700 Subject: [PATCH 4/4] Remove simhash --- linkage_dag.py | 37 +-- sequences/merge_combined_metadata.tsv | 1 - sql/simhash_input.sql | 18 -- tests/static/simhash_empty/empty.jsonl | 0 .../match_pairs/file1 | 4 - .../simhash_match_pairs/ids1.jsonl | 4 - .../simhash_match_pairs/ids2.jsonl | 2 - tests/test_create_merge_ids.py | 47 +--- utils/create_merge_ids.py | 67 +++--- utils/my_simhash.py | 210 ------------------ utils/run_ids_scripts.sh | 5 +- utils/run_simhash.py | 121 ---------- utils/run_simhash_scripts.sh | 5 - 13 files changed, 36 insertions(+), 485 deletions(-) delete mode 100644 sql/simhash_input.sql delete mode 100644 tests/static/simhash_empty/empty.jsonl delete mode 100644 tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 delete mode 100644 tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl delete mode 100644 tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl delete mode 100644 utils/my_simhash.py delete mode 100644 utils/run_simhash.py delete mode 100644 utils/run_simhash_scripts.sh diff --git a/linkage_dag.py b/linkage_dag.py index 9485326..64c7dbf 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -318,12 +318,6 @@ destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl", export_format="NEWLINE_DELIMITED_JSON", ), - BigQueryToGCSOperator( - task_id="export_simhash_input", - source_project_dataset_table=f"{staging_dataset}.simhash_input", - destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/simhash_input/simhash_input*.jsonl", - export_format="NEWLINE_DELIMITED_JSON", - ), BigQueryToGCSOperator( task_id="export_lid_input", source_project_dataset_table=f"{staging_dataset}.lid_input", @@ -344,8 +338,7 @@ ), ] - # Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the - # merge ids + # Start up godzilla of article linkage, create the merged ids gce_instance_start = ComputeEngineStartInstanceOperator( project_id=project_id, zone=gce_zone, @@ -365,13 +358,9 @@ "mkdir input_data", "mkdir current_ids", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .", - "mkdir new_simhash_indexes", ] prep_environment_vm_script = " && ".join(prep_environment_script_sequence) @@ -380,24 +369,9 @@ bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"', ) - update_simhash_index = BashOperator( - task_id="update_simhash_index", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"', - ) - - wait_for_simhash_index = GCSObjectExistenceSensor( - task_id="wait_for_simhash_index", - bucket=DATA_BUCKET, - object=f"{tmp_dir}/done_files/simhash_is_done", - deferrable=True, - ) - create_cset_ids = BashOperator( task_id="create_cset_ids", bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"', - # These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task - - # but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files - # outside the VM it runs on and this task depends on all of these things, directly or indirectly inlets=[ BigQueryTable( project_id=project_id, dataset_id=production_dataset, table_id="sources" @@ -407,11 +381,6 @@ dataset_id=staging_dataset, table_id="all_match_pairs_with_um", ), - BigQueryTable( - project_id=project_id, - dataset_id=staging_dataset, - table_id="simhash_input", - ), BigQueryTable( project_id=project_id, dataset_id=staging_dataset, table_id="unlink" ), @@ -577,7 +546,7 @@ BigQueryCheckOperator( task_id="all_trivial_matches_survived", sql=f""" - -- check that all article pairs generated by exact matches make it through the simhash and + -- check that all article pairs generated by exact matches make it through the -- merged id assignment, except ones we've deliberately unlinked select count(0) = 0 @@ -696,8 +665,6 @@ >> heavy_compute_inputs >> gce_instance_start >> prep_environment - >> update_simhash_index - >> wait_for_simhash_index >> create_cset_ids >> wait_for_cset_ids >> gce_instance_stop diff --git a/sequences/merge_combined_metadata.tsv b/sequences/merge_combined_metadata.tsv index 73b4071..09bf778 100644 --- a/sequences/merge_combined_metadata.tsv +++ b/sequences/merge_combined_metadata.tsv @@ -1,6 +1,5 @@ arxiv_id_match metadata_match all_match_pairs_with_um -simhash_input lid_input ids_to_drop diff --git a/sql/simhash_input.sql b/sql/simhash_input.sql deleted file mode 100644 index 3a04e6f..0000000 --- a/sql/simhash_input.sql +++ /dev/null @@ -1,18 +0,0 @@ --- get input for simhash, which is articles with not null titles and abstracts that have not already been matched -SELECT - id, - year, - concat(title_norm, abstract_norm) AS normalized_text -FROM {{ staging_dataset }}.all_metadata_norm_filt -WHERE - (year IS NOT NULL) - AND (title_norm IS NOT NULL) AND (title_norm != "") - AND (abstract_norm IS NOT NULL) AND (abstract_norm != "") - AND id NOT IN ( - SELECT a.id FROM {{ staging_dataset }}.all_metadata_norm_filt AS a - LEFT JOIN - {{ staging_dataset }}.all_metadata_with_cld2_lid_last_run AS b - ON a.id = b.id - WHERE (a.title = b.title) AND (a.abstract = b.abstract) AND (a.year = b.year) AND (a.title != "") - AND (a.title IS NOT NULL) AND (a.abstract != "") AND (a.abstract IS NOT NULL) AND (a.year IS NOT NULL) - ) diff --git a/tests/static/simhash_empty/empty.jsonl b/tests/static/simhash_empty/empty.jsonl deleted file mode 100644 index e69de29..0000000 diff --git a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 b/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 deleted file mode 100644 index 61d3beb..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 +++ /dev/null @@ -1,4 +0,0 @@ -{"id1": "A", "id2": "B"} -{"id1": "B", "id2": "C"} -{"id1": "J", "id2": "I"} -{"id1": "E", "id2": "E"} diff --git a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl b/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl deleted file mode 100644 index 12baa68..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl +++ /dev/null @@ -1,4 +0,0 @@ -{"id1": "D", "id2": "B"} -{"id1": "D", "id2": "E"} -{"id1": "E", "id2": "A"} -{"id1": "B", "id2": "C"} diff --git a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl b/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl deleted file mode 100644 index 7ce7f16..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl +++ /dev/null @@ -1,2 +0,0 @@ -{"id1": "F", "id2": "G"} -{"id1": "F", "id2": "H"} diff --git a/tests/test_create_merge_ids.py b/tests/test_create_merge_ids.py index 30a6c96..4dac11a 100644 --- a/tests/test_create_merge_ids.py +++ b/tests/test_create_merge_ids.py @@ -6,7 +6,6 @@ from utils.create_merge_ids import create_match_sets, create_matches static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") -empty_simhash_dir = os.path.join(static_dir, "simhash_empty") class TestGetCombinedMap(unittest.TestCase): @@ -16,30 +15,22 @@ class TestGetCombinedMap(unittest.TestCase): def test_get_combined_map1(self): match_dir = os.path.join(static_dir, "test_get_combined_map1") expected_result = [{"A", "B", "C"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map2(self): match_dir = os.path.join(static_dir, "test_get_combined_map2") expected_result = [{"A", "B", "C", "D"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map3(self): match_dir = os.path.join(static_dir, "test_get_combined_map3") expected_result = [{"A", "B", "C", "D", "E"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map4(self): match_dir = os.path.join(static_dir, "test_get_combined_map4") expected_result = [{"A", "B", "C", "D", "E", "F", "G", "H"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map5(self): # test with two disconnected sets @@ -49,43 +40,19 @@ def test_get_combined_map5(self): expected_result = sorted( [result_set_small, result_set_large], key=lambda k: len(k) ) - actual_result = sorted( - create_match_sets(match_dir, empty_simhash_dir), key=lambda k: len(k) - ) - self.assertEqual(actual_result, expected_result) - - def test_get_match_sets_with_extra_id(self): - # test with three disconnected sets. The set A - E will have one simhash match (E-A) that should be included, - # and matches involving the obsolete id D that should be filtered. The "small" set F-H contains simhash-only - # ids and should be filtered. The other small set I-J should be included. - match_dir = os.path.join( - static_dir, "test_get_match_sets_with_extra_id", "match_pairs" - ) - simhash_match_dir = os.path.join( - static_dir, "test_get_match_sets_with_extra_id", "simhash_match_pairs" - ) - result_set_large = {"A", "B", "C", "E"} - result_set_small = {"I", "J"} - expected_result = sorted( - [result_set_small, result_set_large], key=lambda k: len(k) - ) - actual_result = sorted( - create_match_sets(match_dir, simhash_match_dir), key=lambda k: len(k) - ) + actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k)) self.assertEqual(actual_result, expected_result) def test_skip_matches(self): # test without matches excluded match_dir = os.path.join(static_dir, "test_skip_matches_ids") expected_result_no_excludes = [{"A", "B", "C"}, {"D", "E"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result_no_excludes - ) + self.assertEqual(create_match_sets(match_dir), expected_result_no_excludes) # test with matches excluded exclude_dir = os.path.join(static_dir, "test_skip_matches_ids_to_skip") expected_result_excludes = [{"A", "B"}, {"C"}, {"D"}, {"E"}] self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir, exclude_dir=exclude_dir), + create_match_sets(match_dir, exclude_dir=exclude_dir), expected_result_excludes, ) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index deed02d..fd3d7a5 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -61,15 +61,11 @@ def get_exclude_matches(exclude_dir: str) -> dict: return dont_match -def create_match_sets( - exact_match_dir: str, simhash_match_dir: str, exclude_dir: str = None -) -> list: +def create_match_sets(match_dir: str, exclude_dir: str = None) -> list: """ Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles, - including "transitive matches". We will use the ids present in the exact matches to filter the simhash matches, - since it's possible for obsolete ids to live on in the simhash index - :param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match - :param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash + including "transitive matches". + :param match_dir: directory of jsonls containing matched orig_ids from exact metadata match :param exclude_dir: directory of jsonl files containing article pairs that should not be matched together :return: list of sets of matched articles """ @@ -77,26 +73,22 @@ def create_match_sets( dont_match = get_exclude_matches(exclude_dir) print("getting adjacency lists") adj_list = {} - for match_dir, is_simhash in [(exact_match_dir, False), (simhash_match_dir, True)]: - for fi in os.listdir(match_dir): - with open(os.path.join(match_dir, fi)) as f: - for line in f: - js = json.loads(line) - key1 = js["id1"] - key2 = js["id2"] - if is_simhash: - if (key1 not in adj_list) or (key2 not in adj_list): - continue - if key1 not in adj_list: - adj_list[key1] = set() - if key2 not in dont_match.get(key1, set()): - adj_list[key1].add(key2) - # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, - # this will ensure they get added to the same match set - if key2 not in adj_list: - adj_list[key2] = set() - if key1 not in dont_match.get(key2, set()): - adj_list[key2].add(key1) + for fi in os.listdir(match_dir): + with open(os.path.join(match_dir, fi)) as f: + for line in f: + js = json.loads(line) + key1 = js["id1"] + key2 = js["id2"] + if key1 not in adj_list: + adj_list[key1] = set() + if key2 not in dont_match.get(key1, set()): + adj_list[key1].add(key2) + # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, + # this will ensure they get added to the same match set + if key2 not in adj_list: + adj_list[key2] = set() + if key1 not in dont_match.get(key2, set()): + adj_list[key2].add(key1) print("getting connected articles") seen_ids = set() match_sets = [] @@ -178,13 +170,13 @@ def create_matches( yield batch, batch_count -def write_batch(match_batch: tuple, output_dir: str) -> None: +def write_batch(match_batch_with_output_dir: tuple) -> None: """ Write a batch of matches to disk - :param match_batch: tuple of a list of jsons containing a merged id and orig id, and an identifier for the batch - :param output_dir: directory where matches should be written + :param match_batch: tuple of (a tuple containing a list of jsons containing a merged id and orig id, and an identifier for the batch), and a directory where matches should be written :return: None """ + match_batch, output_dir = match_batch_with_output_dir matches, batch_id = match_batch with open(os.path.join(output_dir, f"matches_{batch_id}.jsonl"), "w") as f: for match in matches: @@ -193,7 +185,6 @@ def write_batch(match_batch: tuple, output_dir: str) -> None: def write_matches( exact_match_dir, - simhash_match_dir, exclude_dir, ids_to_drop, prev_id_mapping_dir, @@ -202,19 +193,19 @@ def write_matches( """ Generate merged id-orig id pairs and write them out as a directory of jsonls :param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match - :param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash :param exclude_dir: directory of article pairs that should not be matched :param ids_to_drop: file containing ids that should not be used :param prev_id_mapping_dir: directory of jsonl containing previous mapping between orig ids and merged ids :param output_dir: directory where jsonls containing new mappings between orig ids and merged ids should be written :return: None """ - match_sets = create_match_sets(exact_match_dir, simhash_match_dir, exclude_dir) - match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir) if not os.path.exists(output_dir): os.makedirs(output_dir) with multiprocessing.Pool() as p: - p.starmap(write_batch, ((mb, output_dir) for mb in match_batches)) + match_sets = create_match_sets(exact_match_dir, exclude_dir) + match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir) + output_batches = ((mb, output_dir) for mb in match_batches) + list(p.imap(write_batch, output_batches)) if __name__ == "__main__": @@ -224,11 +215,6 @@ def write_matches( required=True, help="directory of jsonls containing matched orig_ids from exact metadata match", ) - parser.add_argument( - "--simhash_match_dir", - required=True, - help="directory of jsonls containing matched orig_ids from simhash", - ) parser.add_argument( "--exclude_dir", required=True, @@ -253,7 +239,6 @@ def write_matches( write_matches( args.exact_match_dir, - args.simhash_match_dir, args.exclude_dir, args.ids_to_drop, args.prev_id_mapping_dir, diff --git a/utils/my_simhash.py b/utils/my_simhash.py deleted file mode 100644 index 7a15493..0000000 --- a/utils/my_simhash.py +++ /dev/null @@ -1,210 +0,0 @@ -# Created by 1e0n in 2013 -# this file is identical to https://github.com/leonsim/simhash/commit/7337c9ae353dbdc32666e88ba07a75170c19b79c, except -# it has some logging which was overwhelming airflow removed -from __future__ import division, unicode_literals - -import collections -import hashlib -import logging -import numbers -import re -import sys -from itertools import groupby - -if sys.version_info[0] >= 3: - basestring = str - unicode = str - long = int - - -def _hashfunc(x): - return int(hashlib.md5(x).hexdigest(), 16) - - -class Simhash(object): - def __init__(self, value, f=64, reg=r"[\w\u4e00-\u9fcc]+", hashfunc=None, log=None): - """ - `f` is the dimensions of fingerprints - - `reg` is meaningful only when `value` is basestring and describes - what is considered to be a letter inside parsed string. Regexp - object can also be specified (some attempt to handle any letters - is to specify reg=re.compile(r'\w', re.UNICODE)) # noqa: W605 - - `hashfunc` accepts a utf-8 encoded string and returns a unsigned - integer in at least `f` bits. - """ - - self.f = f - self.reg = reg - self.value = None - - if hashfunc is None: - self.hashfunc = _hashfunc - else: - self.hashfunc = hashfunc - - if log is None: - self.log = logging.getLogger("simhash") - else: - self.log = log - - if isinstance(value, Simhash): - self.value = value.value - elif isinstance(value, basestring): - self.build_by_text(unicode(value)) - elif isinstance(value, collections.Iterable): - self.build_by_features(value) - elif isinstance(value, numbers.Integral): - self.value = value - else: - raise Exception("Bad parameter with type {}".format(type(value))) - - def __eq__(self, other): - """ - Compare two simhashes by their value. - - :param Simhash other: The Simhash object to compare to - """ - return self.value == other.value - - def _slide(self, content, width=4): - return [content[i : i + width] for i in range(max(len(content) - width + 1, 1))] - - def _tokenize(self, content): - content = content.lower() - content = "".join(re.findall(self.reg, content)) - ans = self._slide(content) - return ans - - def build_by_text(self, content): - features = self._tokenize(content) - features = {k: sum(1 for _ in g) for k, g in groupby(sorted(features))} - return self.build_by_features(features) - - def build_by_features(self, features): - """ - `features` might be a list of unweighted tokens (a weight of 1 - will be assumed), a list of (token, weight) tuples or - a token -> weight dict. - """ - v = [0] * self.f - masks = [1 << i for i in range(self.f)] - if isinstance(features, dict): - features = features.items() - for f in features: - if isinstance(f, basestring): - h = self.hashfunc(f.encode("utf-8")) - w = 1 - else: - assert isinstance(f, collections.Iterable) - h = self.hashfunc(f[0].encode("utf-8")) - w = f[1] - for i in range(self.f): - v[i] += w if h & masks[i] else -w - # use reversed binary str to keep the backward compatibility - binary_str = "".join(["0" if i <= 0 else "1" for i in v[::-1]]) - self.value = int(binary_str, 2) - - def distance(self, another): - assert self.f == another.f - x = (self.value ^ another.value) & ((1 << self.f) - 1) - ans = 0 - while x: - ans += 1 - x &= x - 1 - return ans - - -class SimhashIndex(object): - def __init__(self, objs, f=64, k=2, log=None): - """ - `objs` is a list of (obj_id, simhash) - obj_id is a string, simhash is an instance of Simhash - `f` is the same with the one for Simhash - `k` is the tolerance - """ - self.k = k - self.f = f - count = len(objs) - - if log is None: - self.log = logging.getLogger("simhash") - else: - self.log = log - - self.log.info("Initializing %s data.", count) - - self.bucket = collections.defaultdict(set) - - for i, q in enumerate(objs): - if i % 10000 == 0 or i == count - 1: - self.log.info("%s/%s", i + 1, count) - - self.add(*q) - - def get_near_dups(self, simhash): - """ - `simhash` is an instance of Simhash - return a list of obj_id, which is in type of str - """ - assert simhash.f == self.f - - ans = set() - - for key in self.get_keys(simhash): - dups = self.bucket[key] - self.log.debug("key:%s", key) - # if len(dups) > 200: - # self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups)) - - for dup in dups: - sim2, obj_id = dup.split(",", 1) - sim2 = Simhash(long(sim2, 16), self.f) - - d = simhash.distance(sim2) - if d <= self.k: - ans.add(obj_id) - return list(ans) - - def add(self, obj_id, simhash): - """ - `obj_id` is a string - `simhash` is an instance of Simhash - """ - assert simhash.f == self.f - - for key in self.get_keys(simhash): - v = "%x,%s" % (simhash.value, obj_id) - self.bucket[key].add(v) - - def delete(self, obj_id, simhash): - """ - `obj_id` is a string - `simhash` is an instance of Simhash - """ - assert simhash.f == self.f - - for key in self.get_keys(simhash): - v = "%x,%s" % (simhash.value, obj_id) - if v in self.bucket[key]: - self.bucket[key].remove(v) - - @property - def offsets(self): - """ - You may optimize this method according to - """ - return [self.f // (self.k + 1) * i for i in range(self.k + 1)] - - def get_keys(self, simhash): - for i, offset in enumerate(self.offsets): - if i == (len(self.offsets) - 1): - m = 2 ** (self.f - offset) - 1 - else: - m = 2 ** (self.offsets[i + 1] - offset) - 1 - c = simhash.value >> offset & m - yield "%x:%x" % (c, i) - - def bucket_size(self): - return len(self.bucket) diff --git a/utils/run_ids_scripts.sh b/utils/run_ids_scripts.sh index e1751c4..6380930 100644 --- a/utils/run_ids_scripts.sh +++ b/utils/run_ids_scripts.sh @@ -1,9 +1,6 @@ cd /mnt/disks/data/run gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done -python3 create_merge_ids.py --exact_match_dir exact_matches --simhash_match_dir simhash_results --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings +python3 create_merge_ids.py --exact_match_dir exact_matches --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings /snap/bin/gsutil -m cp -r new_id_mappings gs://airflow-data-exchange/article_linkage/tmp/ -/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/ -/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/ -/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes_archive/$(date +%F)/ touch ids_are_done gsutil cp ids_are_done gs://airflow-data-exchange/article_linkage/tmp/done_files/ diff --git a/utils/run_simhash.py b/utils/run_simhash.py deleted file mode 100644 index 3ca56c0..0000000 --- a/utils/run_simhash.py +++ /dev/null @@ -1,121 +0,0 @@ -import argparse -import json -import multiprocessing -import os -import pickle -import re -from datetime import datetime - -from my_simhash import Simhash, SimhashIndex - - -def get_features(s: str) -> list: - """ - The default feature extraction method, from https://github.com/leonsim/simhash - """ - width = 3 - s = s.lower() - s = re.sub(r"[^\w]+", "", s) - return [s[i : i + width] for i in range(max(len(s) - width + 1, 1))] - - -def write_sim_strings( - data_fi: str, output_fi: str, input_index: str = None, output_index: str = None -) -> None: - """ - Does the similarity matching and writes out the outputs. Basic method from from https://github.com/leonsim/simhash - """ - data_ids_and_values = [ - line.strip().split("\t") for line in open(data_fi).readlines() - ] - objs = [ - (article_id, Simhash(get_features(article_text))) - for article_id, article_text in data_ids_and_values - ] - index = None - if (input_index is None) or not os.path.exists(input_index): - index = SimhashIndex(objs, k=3) - else: - index = pickle.load(open(input_index, mode="rb")) - for obj_id, obj in objs: - index.add(obj_id, obj) - print("writing updated index to " + output_index) - pickle.dump(index, open(output_index, mode="wb")) - - with open(output_fi, mode="w") as out: - for article_id, article_text in data_ids_and_values: - feats = Simhash(get_features(article_text)) - dup_ids = index.get_near_dups(feats) - for dup_id in dup_ids: - if dup_id != article_id: - out.write(json.dumps({"id1": article_id, "id2": dup_id}) + "\n") - - -def get_year_partition(input_dir: str, output_dir: str) -> list: - """ - Takes an input directory of jsonl containing three fields: id, year, and normalized_text. Constructs a map - mapping year to tuples of id, normalized_text, and writes each year's data as a tsv - - Initially I tried passing the arrays of id, normalized text for each year around in memory. However, - the multiprocessing library pickles its inputs and some years' data exceeded the maximum pickle size. - For the same reason, we write to tsv instead of pickling here. - - :param input_dir: directory of jsonl - :param output_dir: dir where each year's worth of pairs should be written as pkl - :return: list of years - """ - print("getting year partition") - year_to_outfi = {} - if not os.path.exists(output_dir): - os.mkdir(output_dir) - for fi in os.listdir(input_dir): - for line in open(os.path.join(input_dir, fi)): - js = json.loads(line) - year = js["year"] - if year not in year_to_outfi: - year_to_outfi[year] = open( - os.path.join(output_dir, year + ".tsv"), mode="w" - ) - year_to_outfi[year].write(f"{js['id']}\t{js['normalized_text']}\n") - for year in year_to_outfi: - year_to_outfi[year].close() - return list(year_to_outfi.keys()) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("input_dir", help="directory of jsonl") - parser.add_argument("--tmp_dir", default="simhash-tmp") - parser.add_argument("--simhash_indexes", help="current simhash indexes") - parser.add_argument( - "--new_simhash_indexes", help="location where updated indexes should be written" - ) - parser.add_argument( - "output_dir", - help=( - "directory where output matches should be written. " - "Outputs will be in the form `year`.jsonl" - ), - ) - args = parser.parse_args() - - years = get_year_partition(args.input_dir, args.tmp_dir) - print("running simhash") - day = datetime.now().strftime("%Y-%m-%d") - with multiprocessing.Pool() as p: - p.starmap( - write_sim_strings, - [ - ( - os.path.join(args.tmp_dir, year + ".tsv"), - os.path.join(args.output_dir, f"{year}_{day}.jsonl"), - None - if args.simhash_indexes is None - else os.path.join(args.simhash_indexes, f"{year}.pkl"), - None - if args.new_simhash_indexes is None - else os.path.join(args.new_simhash_indexes, f"{year}.pkl"), - ) - for year in years - ], - ) diff --git a/utils/run_simhash_scripts.sh b/utils/run_simhash_scripts.sh deleted file mode 100644 index 95650e9..0000000 --- a/utils/run_simhash_scripts.sh +++ /dev/null @@ -1,5 +0,0 @@ -cd /mnt/disks/data/run -gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/simhash_is_done -python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes -touch simhash_is_done -gsutil cp simhash_is_done gs://airflow-data-exchange/article_linkage/tmp/done_files/