Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sort, use_pagination and closest #158

Merged
merged 8 commits into from
Feb 17, 2022
52 changes: 51 additions & 1 deletion tests/test_cdx_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,61 @@ def test_b() -> None:
url = "https://www.google.com"

wayback = WaybackMachineCDXServerAPI(
url=url, user_agent=user_agent, start_timestamp="202101", end_timestamp="202112"
url=url,
user_agent=user_agent,
start_timestamp="202101",
end_timestamp="202112",
collapses=["urlkey"],
)
# timeframe bound prefix matching enabled along with active urlkey based collapsing

snapshots = wayback.snapshots() # <class 'generator'>

for snapshot in snapshots:
assert snapshot.timestamp.startswith("2021")


def test_c() -> None:
user_agent = (
"Mozilla/5.0 (MacBook Air; M1 Mac OS X 11_4) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/604.1"
)
url = "https://www.google.com"

cdx = WaybackMachineCDXServerAPI(
url=url,
user_agent=user_agent,
closest="201010101010",
sort="closest",
limit="1",
)
snapshots = cdx.snapshots()
for snapshot in snapshots:
archive_url = snapshot.archive_url
timestamp = snapshot.timestamp
break

assert str(archive_url).find("google.com")
assert "20101010" in timestamp


def test_d() -> None:
user_agent = (
"Mozilla/5.0 (MacBook Air; M1 Mac OS X 11_4) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/604.1"
)

cdx = WaybackMachineCDXServerAPI(
url="akamhy.github.io",
user_agent=user_agent,
match_type="prefix",
use_pagination=True,
filters=["statuscode:200"],
)
snapshots = cdx.snapshots()

count = 0
for snapshot in snapshots:
count += 1
assert str(snapshot.archive_url).find("akamhy.github.io")
assert count > 50
10 changes: 10 additions & 0 deletions tests/test_cdx_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
check_collapses,
check_filters,
check_match_type,
check_sort,
full_url,
get_response,
get_total_pages,
Expand Down Expand Up @@ -101,3 +102,12 @@ def test_check_match_type() -> None:

with pytest.raises(WaybackError):
check_match_type("not a valid type", "url")


def test_check_sort() -> None:
assert check_sort("default")
assert check_sort("closest")
assert check_sort("reverse")

with pytest.raises(WaybackError):
assert check_sort("random crap")
1 change: 1 addition & 0 deletions tests/test_save_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,5 @@ def test_archive_url() -> None:
save_api.saved_archive = (
"https://web.archive.org/web/20220124063056/https://example.com/"
)
save_api._archive_url = save_api.saved_archive
assert save_api.archive_url == save_api.saved_archive
9 changes: 8 additions & 1 deletion tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,11 @@ def test_total_archives() -> None:

def test_known_urls() -> None:
wayback = Url("akamhy.github.io")
assert len(list(wayback.known_urls())) > 40
assert len(list(wayback.known_urls(subdomain=True))) > 40


def test_Save() -> None:
wayback = Url("https://en.wikipedia.org/wiki/Asymptotic_equipartition_property")
wayback.save()
archive_url = str(wayback.archive_url)
assert archive_url.find("Asymptotic_equipartition_property") != -1
82 changes: 43 additions & 39 deletions waybackpy/cdx_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
check_collapses,
check_filters,
check_match_type,
check_sort,
full_url,
get_response,
get_total_pages,
Expand Down Expand Up @@ -44,10 +45,13 @@ def __init__(
end_timestamp: Optional[str] = None,
filters: Optional[List[str]] = None,
match_type: Optional[str] = None,
sort: Optional[str] = None,
gzip: Optional[str] = None,
collapses: Optional[List[str]] = None,
limit: Optional[str] = None,
max_tries: int = 3,
use_pagination: bool = False,
closest: Optional[str] = None,
) -> None:
self.url = str(url).strip().replace(" ", "%20")
self.user_agent = user_agent
Expand All @@ -57,65 +61,65 @@ def __init__(
check_filters(self.filters)
self.match_type = None if match_type is None else str(match_type).strip()
check_match_type(self.match_type, self.url)
self.sort = None if sort is None else str(sort).strip()
check_sort(self.sort)
self.gzip = gzip
self.collapses = [] if collapses is None else collapses
check_collapses(self.collapses)
self.limit = 25000 if limit is None else limit
self.max_tries = max_tries
self.use_pagination = use_pagination
self.closest = None if closest is None else str(closest)
self.last_api_request_url: Optional[str] = None
self.use_page = False
self.endpoint = "https://web.archive.org/cdx/search/cdx"

def cdx_api_manager(
self, payload: Dict[str, str], headers: Dict[str, str], use_page: bool = False
self, payload: Dict[str, str], headers: Dict[str, str]
) -> Generator[str, None, None]:
"""
Manages the API calls for the instance, it automatically selects the best
parameters by looking as the query of the end-user. For bigger queries
automatically use the CDX pagination API and for smaller queries use the
normal API.

CDX Server API is a complex API and to make it easy for the end user to
consume it the CDX manager(this method) handles the selection of the
API output, whether to use the pagination API or not.

For doing large/bulk queries, the use of the Pagination API is
recommended by the Wayback Machine authors. And it determines if the
query would be large or not by using the showNumPages=true parameter,
this tells the number of pages of CDX DATA that the pagination API
will return.

If the number of page is less than 2 we use the normal non-pagination
API as the pagination API is known to lag and for big queries it should
not matter but for queries where the number of pages are less this
method chooses accuracy over the pagination API.
This method uses the pagination API of the CDX server if
use_pagination attribute is True else uses the standard
CDX server response data.
"""
# number of pages that will returned by the pagination API.
# get_total_pages adds the showNumPages=true param to pagination API
# requests.
# This is a special query that will return a single number indicating
# the number of pages.
total_pages = get_total_pages(self.url, self.user_agent)

if use_page is True and total_pages >= 2:
blank_pages = 0

# When using the pagination API of the CDX server.
if self.use_pagination is True:

total_pages = get_total_pages(self.url, self.user_agent)
successive_blank_pages = 0

for i in range(total_pages):
payload["page"] = str(i)

url = full_url(self.endpoint, params=payload)
res = get_response(url, headers=headers)

if isinstance(res, Exception):
raise res

self.last_api_request_url = url
text = res.text

# Reset the counter if the last page was blank
# but the current page is not.
if successive_blank_pages == 1:
if len(text) != 0:
successive_blank_pages = 0

# Increase the succesive page counter on encountering
# blank page.
if len(text) == 0:
blank_pages += 1
successive_blank_pages += 1

if blank_pages >= 2:
# If two succesive pages are blank
# then we don't have any more pages left to
# iterate.
if successive_blank_pages >= 2:
break

yield text

# When not using the pagination API of the CDX server
else:
payload["showResumeKey"] = "true"
payload["limit"] = str(self.limit)
Expand Down Expand Up @@ -162,9 +166,15 @@ def add_payload(self, payload: Dict[str, str]) -> None:
if self.gzip is None:
payload["gzip"] = "false"

if self.closest:
payload["closest"] = self.closest

if self.match_type:
payload["matchType"] = self.match_type

if self.sort:
payload["sort"] = self.sort

if self.filters and len(self.filters) > 0:
for i, _filter in enumerate(self.filters):
payload["filter" + str(i)] = _filter
Expand Down Expand Up @@ -199,13 +209,7 @@ def snapshots(self) -> Generator[CDXSnapshot, None, None]:

self.add_payload(payload)

if not self.start_timestamp or self.end_timestamp:
self.use_page = True

if self.collapses != []:
self.use_page = False

entries = self.cdx_api_manager(payload, headers, use_page=self.use_page)
entries = self.cdx_api_manager(payload, headers)

for entry in entries:

Expand Down
52 changes: 50 additions & 2 deletions waybackpy/cdx_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from .exceptions import WaybackError
from .exceptions import BlockedSiteError, WaybackError
from .utils import DEFAULT_USER_AGENT


Expand All @@ -28,12 +28,38 @@ def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int:
headers = {"User-Agent": user_agent}
request_url = full_url(endpoint, params=payload)
response = get_response(request_url, headers=headers)

check_for_blocked_site(response, url)
if isinstance(response, requests.Response):
return int(response.text.strip())
raise response


def check_for_blocked_site(
response: Union[requests.Response, Exception], url: Optional[str] = None
) -> None:
"""
Checks that the URL can be archived by wayback machine or not.
robots.txt policy of the site may prevent the wayback machine.
"""
# see https://github.com/akamhy/waybackpy/issues/157

# the following if block is to make mypy happy.
if isinstance(response, Exception):
raise response

if not url:
url = "The requested content"
if (
"org.archive.util.io.RuntimeIOException: "
+ "org.archive.wayback.exception.AdministrativeAccessControlException: "
+ "Blocked Site Error"
in response.text.strip()
):
raise BlockedSiteError(
f"{url} is excluded from Wayback Machine by the site's robots.txt policy."
)


def full_url(endpoint: str, params: Dict[str, Any]) -> str:
"""
As the function's name already implies that it returns
Expand Down Expand Up @@ -76,6 +102,7 @@ def get_response(
session.mount("https://", HTTPAdapter(max_retries=retries_))
response = session.get(url, headers=headers)
session.close()
check_for_blocked_site(response)
return response


Expand Down Expand Up @@ -151,3 +178,24 @@ def check_match_type(match_type: Optional[str], url: str) -> bool:
raise WaybackError(exc_message)

return True


def check_sort(sort: Optional[str]) -> bool:
"""
Check that the sort argument passed by the end-user is valid.
If not valid then raise WaybackError.
"""

legal_sort = ["default", "closest", "reverse"]

if not sort:
return True

if sort not in legal_sort:
exc_message = (
f"{sort} is not an allowed argument for sort.\n"
"Use one from 'default', 'closest' or 'reverse'"
)
raise WaybackError(exc_message)

return True
Loading