Skip to content

Commit

Permalink
Merge pull request #38 from georgetown-cset/135-remove-fp
Browse files Browse the repository at this point in the history
Add ability to exclude pairs of ids for matching
  • Loading branch information
jmelot authored Mar 1, 2024
2 parents b3d2590 + dfd8fe3 commit 4e053bf
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 34 deletions.
18 changes: 17 additions & 1 deletion linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@

production_dataset = "literature"
staging_dataset = f"staging_{production_dataset}"
args = get_default_args(pocs=["Jennifer"])
args["retries"] = 1

with DAG(
"article_linkage_updater",
default_args=get_default_args(),
default_args=args,
description="Links articles across our scholarly lit holdings.",
schedule_interval=None,
user_defined_macros={
Expand Down Expand Up @@ -327,6 +329,18 @@
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/lid_input/lid_input*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_unlink",
source_project_dataset_table=f"{staging_dataset}.unlink",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/unlink/data*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_ids_to_drop",
source_project_dataset_table=f"{staging_dataset}.ids_to_drop",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/ids_to_drop/data*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
]

# Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the
Expand All @@ -353,6 +367,8 @@
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .",
"mkdir new_simhash_indexes",
]
Expand Down
16 changes: 2 additions & 14 deletions metadata_merge_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,11 @@
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from dataloader.airflow_utils.slack import task_fail_slack_alert

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2022, 3, 5),
"email": ["jennifer.melot@georgetown.edu"],
"email_on_failure": True,
"email_on_retry": True,
"retries": 0,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": task_fail_slack_alert,
}
from dataloader.airflow_utils.defaults import get_default_args

with DAG(
"org_er_and_metadata_merge",
default_args=default_args,
default_args=get_default_args(pocs=["Jennifer"]),
description="Triggers Org ER and metadata merge dags",
schedule_interval=None,
catchup=False,
Expand Down
1 change: 1 addition & 0 deletions sequences/merge_combined_metadata.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ metadata_match
all_match_pairs_with_um
simhash_input
lid_input
ids_to_drop
5 changes: 5 additions & 0 deletions sql/ids_to_drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT DISTINCT merged_id
FROM
literature.sources
WHERE
orig_id IN (SELECT id1 FROM staging_literature.unlink)
1 change: 1 addition & 0 deletions tests/static/test_create_match_keys/ids_to_drop/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"merged_id": "carticle_0000000003"}
2 changes: 2 additions & 0 deletions tests/static/test_create_match_keys/input/input.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
{"orig_id": "D", "merged_id": "carticle_0000000002"}
{"orig_id": "E", "merged_id": "carticle_0000000002"}
{"orig_id": "F", "merged_id": "carticle_0000000001"}
{"orig_id": "I", "merged_id": "carticle_0000000003"}
{"orig_id": "J", "merged_id": "carticle_0000000003"}
6 changes: 6 additions & 0 deletions tests/static/test_skip_matches_ids/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"id1": "A", "id2": "B"}
{"id1": "B", "id2": "A"}
{"id1": "B", "id2": "C"}
{"id1": "C", "id2": "B"}
{"id1": "D", "id2": "E"}
{"id1": "E", "id2": "D"}
2 changes: 2 additions & 0 deletions tests/static/test_skip_matches_ids_to_skip/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id1": "B", "id2": "C"}
{"id1": "D", "id2": "E"}
39 changes: 29 additions & 10 deletions tests/test_create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,46 @@ def test_get_match_sets_with_extra_id(self):
)
self.assertEqual(actual_result, expected_result)

def test_skip_matches(self):
# test without matches excluded
match_dir = os.path.join(static_dir, "test_skip_matches_ids")
expected_result_no_excludes = [{"A", "B", "C"}, {"D", "E"}]
self.assertEqual(create_match_sets(match_dir), expected_result_no_excludes)
# test with matches excluded
exclude_dir = os.path.join(static_dir, "test_skip_matches_ids_to_skip")
expected_result_excludes = [{"A", "B"}, {"C"}, {"D"}, {"E"}]
self.assertEqual(
create_match_sets(match_dir, exclude_dir=exclude_dir),
expected_result_excludes,
)

def test_create_match_keys(self):
# the first set will contain two old elts from the same match set and one new elt; should keep its id
# the next will contain one elt from one match set, two from another; should change ids
# the last will contain only new ids; should get a new id
match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}]
# The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id.
# The next (D, E, F) contains one elt from one match set, two from another; should change ids.
# Another (G, H) contains only new ids; should get a new id.
# The last two (I and J) are two different match sets that share an old id and are in ids_to_drop;
# each should get a new id (this is in case of unlinking).
match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}]
out_dir = os.path.join(static_dir, "test_create_match_keys", "output")
if os.path.exists(out_dir):
shutil.rmtree(out_dir)
os.mkdir(out_dir)
out_fi = os.path.join(out_dir, "output.jsonl")
id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input")
ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop")
expected_output = [
{"orig_id": "A", "merged_id": "carticle_0000000001"},
{"orig_id": "B", "merged_id": "carticle_0000000001"},
{"orig_id": "C", "merged_id": "carticle_0000000001"},
{"orig_id": "D", "merged_id": "carticle_0000000003"},
{"orig_id": "E", "merged_id": "carticle_0000000003"},
{"orig_id": "F", "merged_id": "carticle_0000000003"},
{"orig_id": "G", "merged_id": "carticle_0000000004"},
{"orig_id": "H", "merged_id": "carticle_0000000004"},
{"orig_id": "D", "merged_id": "carticle_0000000004"},
{"orig_id": "E", "merged_id": "carticle_0000000004"},
{"orig_id": "F", "merged_id": "carticle_0000000004"},
{"orig_id": "G", "merged_id": "carticle_0000000005"},
{"orig_id": "H", "merged_id": "carticle_0000000005"},
{"orig_id": "I", "merged_id": "carticle_0000000006"},
{"orig_id": "J", "merged_id": "carticle_0000000007"},
]
create_match_keys(match_sets, out_fi, id_mapping_dir)
print(expected_output)
create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir)
out = [json.loads(x) for x in open(out_fi).readlines()]
self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"]))
41 changes: 41 additions & 0 deletions tests/test_make_unlink_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
import os
import shutil
import unittest

from utils.make_unlink_rows import make_pairs

static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")


class TestMakeUnlinkRows(unittest.TestCase):
@staticmethod
def gen_sort_key(pair: tuple) -> str:
return f"{pair[0]}-{pair[1]}"

def test_make_pairs(self):
manual_to_orig = {"1": {"a", "b"}, "2": {"d", "e"}, "3": {"f"}}
expected_output = sorted(
[
("a", "d"),
("a", "e"),
("a", "f"),
("b", "d"),
("b", "e"),
("b", "f"),
("d", "a"),
("d", "b"),
("d", "f"),
("e", "a"),
("e", "b"),
("e", "f"),
("f", "a"),
("f", "b"),
("f", "d"),
("f", "e"),
],
key=self.gen_sort_key,
)
self.assertEqual(
expected_output, sorted(make_pairs(manual_to_orig), key=self.gen_sort_key)
)
66 changes: 58 additions & 8 deletions utils/create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,41 @@ def get_usable_ids(ids_dir: str) -> set:
return usable_ids


def create_match_sets(match_dir: str, current_ids_dir: str = None) -> list:
def get_exclude_matches(exclude_dir: str) -> dict:
"""
Build dict mapping ids to sets of other ids they should not be matched to
:param exclude_dir: directory of jsonl files containing article pairs that should not be matched together
:return: dict mapping an id to a set of ids that are not valid matches
"""
dont_match = {}
if not exclude_dir:
return dont_match
for fi in os.listdir(exclude_dir):
with open(os.path.join(exclude_dir, fi)) as f:
for line in f:
js = json.loads(line)
if js["id1"] not in dont_match:
dont_match[js["id1"]] = set()
if js["id2"] not in dont_match:
dont_match[js["id2"]] = set()
dont_match[js["id1"]].add(js["id2"])
dont_match[js["id2"]].add(js["id1"])
return dont_match


def create_match_sets(
match_dir: str, current_ids_dir: str = None, exclude_dir: str = None
) -> list:
"""
Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles,
including "transitive matches".
:param match_dir: directory of exported jsonl files containing article matches, with keys "`dataset`1_id" and "`dataset`2_id"
:param match_dir: directory of exported jsonl files containing article matches
:param current_ids_dir: optional dir containing the current set of ids to use in jsonl form. If None, all ids will be used
:param exclude_dir: directory of jsonl files containing article pairs that should not be matched together
:return: list of sets of matched articles
"""
print("reading pairs to not match")
dont_match = get_exclude_matches(exclude_dir)
print("getting adjacency lists")
adj_list = {}
usable_ids = get_usable_ids(current_ids_dir)
Expand All @@ -79,12 +106,14 @@ def create_match_sets(match_dir: str, current_ids_dir: str = None) -> list:
continue
if key1 not in adj_list:
adj_list[key1] = set()
adj_list[key1].add(key2)
if key2 not in dont_match.get(key1, set()):
adj_list[key1].add(key2)
# even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A,
# this will ensure they get added to the same match set
if key2 not in adj_list:
adj_list[key2] = set()
adj_list[key2].add(key1)
if key1 not in dont_match.get(key2, set()):
adj_list[key2].add(key1)
seen_ids = set()
match_sets = []
for k in adj_list.keys():
Expand All @@ -99,13 +128,14 @@ def create_match_sets(match_dir: str, current_ids_dir: str = None) -> list:


def create_match_keys(
match_sets: list, match_file: str, prev_id_mapping_dir: str = None
match_sets: list, match_file: str, ids_to_drop: str, prev_id_mapping_dir: str = None
):
"""
Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match
set to that id
:param match_sets: list of match sets
:param match_file: file where id mapping should be written
:param ids_to_drop: directory containing merged ids that should not be used in jsonl form
:param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form
:return: None
"""
Expand All @@ -123,6 +153,12 @@ def create_match_keys(
prev_orig_to_merg[orig_id] = merg_id
if merg_id > max_merg:
max_merg = merg_id
ignore_ids = set()
for fi in os.listdir(ids_to_drop):
with open(os.path.join(ids_to_drop, fi)) as f:
for line in f:
js = json.loads(line.strip())
ignore_ids.add(js["merged_id"])
match_id = int(max_merg.split("carticle_")[1]) + 1
num_new, num_old = 0, 0
for ms in match_sets:
Expand All @@ -133,7 +169,7 @@ def create_match_keys(
existing_ids = set(
[prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg]
)
if len(existing_ids) == 1:
if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids:
cset_article_id = existing_ids.pop()
num_old += 1
else:
Expand All @@ -155,6 +191,16 @@ def create_match_keys(
required=True,
help="directory of exported jsonl from bigquery containing pairs of article matches",
)
parser.add_argument(
"--exclude_dir",
required=True,
help="directory of article pairs that should not be matched",
)
parser.add_argument(
"--ids_to_drop",
required=True,
help="file containing ids that should not be used",
)
parser.add_argument(
"--merge_file", required=True, help="file where merged ids should be written"
)
Expand All @@ -172,5 +218,9 @@ def create_match_keys(
)
args = parser.parse_args()

match_sets = create_match_sets(args.match_dir, args.current_ids_dir)
create_match_keys(match_sets, args.merge_file, args.prev_id_mapping_dir)
match_sets = create_match_sets(
args.match_dir, args.current_ids_dir, args.exclude_dir
)
create_match_keys(
match_sets, args.merge_file, args.ids_to_drop, args.prev_id_mapping_dir
)
58 changes: 58 additions & 0 deletions utils/make_unlink_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import csv


def make_pairs(manual_to_orig: dict) -> list:
"""
Make all pairs of ids that should be unlinked
:param manual_to_orig: Dict mapping manually assigned ids to original ids that we believe to be the same article
:return: A list of pairs of ids that should not be linked together
"""
pairs = []
for manual1 in manual_to_orig:
for orig1 in manual_to_orig[manual1]:
for manual2 in manual_to_orig:
if manual1 == manual2:
continue
for orig2 in manual_to_orig[manual2]:
pairs.append((orig1, orig2))
return pairs


def write_unlink_rows(unlinking_file: str, output_file: str) -> None:
"""
Write a sql file containing a query that adds new rows to the staging_literature.unlink table
:param unlinking_file: CSV containing two columns, `manual_id` (a manually assigned id marking articles that are the same),
and `orig_id`, the id for the article in its source corpus
:param output_file: SQL file containing a query that adds new rows to staging_literature.unlink
:return: None
"""
manual_to_orig = {}
with open(unlinking_file) as f:
for line in csv.DictReader(f):
if line["manual_id"] not in manual_to_orig:
manual_to_orig[line["manual_id"]] = set()
manual_to_orig[line["manual_id"]].add(line["orig_id"])
pairs = make_pairs(manual_to_orig)
with open(output_file, mode="w") as out:
out.write(
"create or replace table staging_literature.unlink as\nselect id1, id2 from staging_literature.unlink\nunion all\n"
)
out.write(
"\nunion all\n".join(
[f'select "{id1}" as id1, "{id2}" as id2' for id1, id2 in pairs]
)
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"unlinking_file", help="csv with two columns: manual_id and orig_id"
)
parser.add_argument(
"output_file", help="file where query adding new rows should be written"
)
args = parser.parse_args()

write_unlink_rows(args.unlinking_file, args.output_file)
2 changes: 1 addition & 1 deletion utils/run_ids_scripts.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cd /mnt/disks/data/run
gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done
python3 create_merge_ids.py --match_dir usable_ids --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs
python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs
/snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/
/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/
/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/
Expand Down

0 comments on commit 4e053bf

Please sign in to comment.