-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
scraper.py
1360 lines (1224 loc) · 53.3 KB
/
scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import concurrent.futures
import datetime
import json
import locale
import pathlib
import shutil
import tempfile
import time
import urllib.parse
from itertools import groupby
import dateutil.parser
import jinja2
import yt_dlp
from bs4 import BeautifulSoup
from kiwixstorage import KiwixStorage
from pif import get_public_ip
from slugify import slugify
from zimscraperlib.download import BestMp4, BestWebm, YoutubeDownloader, save_large_file
from zimscraperlib.i18n import _, setlocale
from zimscraperlib.image.optimization import optimize_image
from zimscraperlib.image.presets import WebpMedium
from zimscraperlib.image.transformation import resize_image
from zimscraperlib.inputs import compute_descriptions
from zimscraperlib.video.presets import VideoMp4Low, VideoWebmLow
from zimscraperlib.zim import make_zim_file
from zimscraperlib.zim.metadata import (
validate_description,
validate_language,
validate_longdescription,
validate_tags,
validate_title,
)
from ted2zim import languages as tedlang
from ted2zim.constants import (
ALL,
BASE_URL,
MATCHING,
NONE,
ROOT_DIR,
SCRAPER,
SEARCH_URL,
get_logger,
)
from ted2zim.processing import post_process_video
from ted2zim.utils import WebVTT, get_main_title, request_url, update_subtitles_list
logger = get_logger()
class Ted2Zim:
def __init__(
self,
topics,
debug,
name,
video_format,
low_quality,
output_dir,
no_zim,
fname,
languages,
locale_name,
title,
description,
long_description,
creator,
publisher,
tags,
keep_build_dir,
autoplay,
use_any_optimized_version,
s3_url_with_credentials,
playlist,
subtitles_enough,
subtitles_setting,
tmp_dir,
threads,
disable_metadata_checks,
):
# video-encoding info
self.video_format = video_format
self.low_quality = low_quality
# zim params
self.fname = fname
self.languages = (
[] if languages is None else [lang.strip() for lang in languages.split(",")]
)
self.tags = [] if tags is None else [tag.strip() for tag in tags.split(",")]
self.tags = [*self.tags, "_category:ted", "ted", "_videos:yes"]
self.title = title
self.description = description
self.long_description = long_description
self.creator = creator
self.publisher = publisher
self.name = name
self.disable_metadata_checks = disable_metadata_checks
if not self.disable_metadata_checks:
# Validate ZIM metadata early so that we do not waste time doing operations
# for a scraper which will fail anyway in the end ; language is not
# validated here since it is dynamically built based on languages found in
# videos that will be added to the ZIM
validate_tags("Tags", self.tags)
if self.title:
validate_title("Title", self.title)
if self.description:
validate_description("Description", self.description)
if self.long_description:
validate_longdescription("LongDescription", self.long_description)
# directory setup
self.output_dir = pathlib.Path(output_dir).expanduser().resolve()
if tmp_dir:
pathlib.Path(tmp_dir).mkdir(parents=True, exist_ok=True)
self.build_dir = pathlib.Path(tempfile.mkdtemp(dir=tmp_dir))
# scraper options
self.topics = [] if not topics else topics.split(",")
self.autoplay = autoplay
self.playlist = playlist
self.subtitles_enough = subtitles_enough
self.subtitles_setting = (
subtitles_setting
if subtitles_setting in (ALL, MATCHING, NONE)
else tedlang.to_ted_langcodes(
[lang.strip() for lang in subtitles_setting.split(",")]
)
)
self.threads = threads
self.yt_downloader = None
# optimization cache
self.s3_url_with_credentials = s3_url_with_credentials
self.use_any_optimized_version = use_any_optimized_version
self.s3_storage = None
self.video_quality = "low" if self.low_quality else "high"
# debug/developer options
self.no_zim = no_zim
self.keep_build_dir = keep_build_dir
self.debug = debug
# class members
self.videos = []
self.playlist_title = None
self.playlist_description = None
self.source_languages = (
[] if not self.languages else tedlang.to_ted_langcodes(self.languages)
)
self.already_visited = set()
# set and record locale for translations
locale_details = tedlang.get_language_details(locale_name)
if locale_details["querytype"] != "locale":
locale_name = locale_details["iso-639-1"]
try:
self.locale = setlocale(ROOT_DIR, locale_name)
except locale.Error:
logger.error(
f"No locale for {locale_name}. Use --locale to specify it. "
"defaulting to en_US"
)
self.locale = setlocale(ROOT_DIR, "en")
# locale's language code
self.locale_ted_codes = tedlang.to_ted_langcodes(locale_name)
@property
def templates_dir(self):
return ROOT_DIR.joinpath("templates")
@property
def videos_dir(self):
return self.build_dir.joinpath("videos")
@property
def ted_videos_json(self):
return self.build_dir.joinpath("ted_videos.json")
@property
def ted_topics_json(self):
return self.build_dir.joinpath("ted_topics.json")
@property
def talks_base_url(self):
return BASE_URL + "talks/"
@property
def playlists_base_url(self):
return BASE_URL + "playlists"
def extract_videos_from_playlist(self, playlist):
"""extracts metadata for all videos in the given playlist
calls extract_video_info on all links to get this data
"""
playlist_url = f"{self.playlists_base_url}/{playlist}"
logger.debug(f"extract_videos_from_playlist: {playlist_url}")
soup = BeautifulSoup(request_url(playlist_url).text, features="html.parser")
video_elements = soup.find_all("a", attrs={"class": "group"})
self.playlist_title = soup.find("h1").string # pyright: ignore
self.playlist_description = soup.find(
"p", attrs={"class": "text-base"}
).string # pyright: ignore
for element in video_elements:
relative_path = element.get("href")
url = urllib.parse.urljoin(self.talks_base_url, relative_path)
json_data = self.extract_info_from_video_page(url)
if json_data is not None:
player_data = json_data["playerData"]
lang_code = json_data["language"]
if self.source_languages:
# If the first video which was fetched is in source_languages,
# save it.
if lang_code in self.source_languages:
self.update_videos_list_from_info(json_data)
# Determine the next languages to fetch from source_languages
other_languages = [
code for code in self.source_languages if code != lang_code
]
else:
# No languages were specified. Save the first video
self.update_videos_list_from_info(json_data)
# We use the the languages returned from the first
# video to generate other language urls.
other_languages = [
language["languageCode"]
for language in player_data["languages"]
if language["languageCode"] != lang_code
]
if not other_languages:
# No need to generate urls for other languages as the list
# is empty
self.already_visited.add(urllib.parse.urlparse(url).path)
continue
other_lang_urls = self.generate_urls_for_other_languages(
url, other_languages
)
logger.debug(
f"Searching info for the video in other {len(other_lang_urls)} "
"other language(s)"
)
for lang_url in other_lang_urls:
data = self.extract_info_from_video_page(lang_url)
if data is not None:
self.update_videos_list_from_info(data)
self.already_visited.add(urllib.parse.urlparse(url).path)
logger.debug(f"Seen {relative_path}")
logger.debug(f"Total videos found on playlist: {len(video_elements)}")
if not video_elements:
raise ValueError("Wrong playlist ID supplied. No videos found")
def generate_search_results(self, topic):
"""generates a search results and returns the total number of videos scraped"""
total_videos_scraped = 0
page = 0
while True:
result = self.query_search_engine(topic, page)
result_json = result.json()
(
nb_videos_extracted,
nb_videos_on_page,
) = self.extract_videos_in_search_results(result_json)
if nb_videos_on_page == 0:
break
total_videos_scraped += nb_videos_extracted
page += 1
return total_videos_scraped
def query_search_engine(self, topic, page):
logger.debug(f"Fetching page {page} of topic {topic}")
data = [
{
"indexName": "relevance",
"params": {
"attributeForDistinct": "objectID",
"distinct": 1,
"facetFilters": [[f"tags:{topic}"]],
"facets": ["subtitle_languages", "tags"],
"highlightPostTag": "__/ais-highlight__",
"highlightPreTag": "__ais-highlight__",
"hitsPerPage": 24,
"maxValuesPerFacet": 500,
"page": page,
"query": "",
"tagFilters": "",
},
},
]
return request_url(SEARCH_URL, data)
def extract_videos_from_topics(self, topic):
"""extracts metadata for required number of videos on different topics"""
logger.debug(f"Fetching video links for topic: {topic}")
total_videos_scraped = self.generate_search_results(topic)
logger.info(f"Total video links found in {topic}: {total_videos_scraped}")
if total_videos_scraped == 0:
return False
return True
def update_zim_metadata(self):
if self.playlist:
if not self.title:
self.title = self.playlist_title.strip() # pyright: ignore
default_description = self.playlist_description.strip() # pyright: ignore
elif len(self.topics) > 1:
if not self.title:
self.title = "TED Collection"
default_description = "A selection of TED videos from several topics"
else:
topic_str = self.topics[0].replace("+", " ")
if not self.title:
self.title = f"{topic_str.capitalize()} from TED"
default_description = f"A selection of {topic_str} videos from TED"
# update description and long_description if not already set by user input,
# based on default_description potentially retrieved from playlist / topics
# compute_descriptions always returns valid description and long description
# when based on default_description
self.description, self.long_description = compute_descriptions(
default_description=default_description,
user_description=self.description,
user_long_description=self.long_description,
)
# Compute ZIM language (first call, approximation since few videos might
# fail to download and finally not be added to the ZIM ; this however helps to
# ensure that ZIM metadata is OK)
self.compute_zim_languages()
def compute_zim_languages(self):
"""Compute the ZIM language metadata based on expected videos"""
# count the number of videos per audio language
audio_lang_counts = {
lang: len(list(group))
for lang, group in groupby(
[video["native_talk_language"] for video in self.videos]
)
}
# count the number of videos per subtitle language
subtitle_lang_counts = {
lang: len(list(group))
for lang, group in groupby(
[
subtitle["languageCode"]
for video in self.videos
for subtitle in video["subtitles"]
]
)
}
# Attribute 10 "points" score to language in video audio and 1 "point" score
# to language in video subtitle
scored_languages = {
k: 10 * audio_lang_counts.get(k, 0) + subtitle_lang_counts.get(k, 0)
for k in list(audio_lang_counts.keys()) + list(subtitle_lang_counts.keys())
}
sorted_ted_languages = [
lang_code
for lang_code, _ in sorted(
scored_languages.items(), key=lambda item: -item[1]
)
]
# compute the mappings from TED to ISO639-3 code and set ZIM language
mapping = tedlang.ted_to_iso639_3_langcodes(sorted_ted_languages)
self.zim_languages = ",".join(
[mapping[code] for code in sorted_ted_languages if mapping[code]]
)
# Display a clear warning on languages which have been ignored due to missing
# ISO639-3 codes
ignored_ted_codes = [code for code in sorted_ted_languages if not mapping[code]]
if len(ignored_ted_codes):
logger.warning(
"Some languages have not been added to ZIM metadata due to missing "
f"ISO639-3 code: {ignored_ted_codes}"
)
if not self.disable_metadata_checks:
# Validate ZIM languages
validate_language("Language", self.zim_languages)
def get_subtitle_dict(self, lang):
"""dict of language name and code from a larger dict lang
Example:
{
'languageCode': 'en',
'languageName': 'English'
}
"""
return {
"languageName": tedlang.get_display_name(
lang["languageCode"], lang["languageName"]
),
"languageCode": lang["languageCode"],
}
def generate_subtitle_list(self, video_id, langs, page_lang, audio_lang):
"""List of all subtitle languages with link to their pages"""
subtitles = []
if self.subtitles_setting == ALL or (
not self.source_languages and self.topics and self.subtitles_setting != NONE
):
subtitles = [self.get_subtitle_dict(lang) for lang in langs]
elif self.subtitles_setting == MATCHING or (
self.subtitles_enough
and self.subtitles_setting == NONE
and page_lang != audio_lang
):
subtitles = [
self.get_subtitle_dict(lang)
for lang in langs
if lang["languageCode"] == page_lang
]
elif self.subtitles_setting and self.subtitles_setting != NONE:
if not self.subtitles_enough and self.topics:
subtitles = [
self.get_subtitle_dict(lang)
for lang in langs
if lang["languageCode"] in self.subtitles_setting
]
else:
subtitles = [
self.get_subtitle_dict(lang)
for lang in langs
if lang["languageCode"] in self.subtitles_setting
or lang["languageCode"] in self.source_languages
]
return update_subtitles_list(video_id, subtitles)
def generate_urls_for_other_languages(self, url, languages):
"""Possible URLs for other requested languages based on a video url"""
urls = []
page_lang, query = self.get_lang_code_from_url(
url, with_full_query=True
) # pyright: ignore[reportGeneralTypeIssues]
url_parts = list(urllib.parse.urlparse(url))
# update the language query field value with other languages and form URLs
for language in languages:
if language != page_lang:
query.update({"language": language})
url_parts[4] = urllib.parse.urlencode(query)
urls.append(urllib.parse.urlunparse(url_parts))
return urls
def extract_videos_in_search_results(self, result_json):
hits = result_json["results"][0]["hits"]
nb_extracted = 0
nb_listed = len(hits)
logger.debug(f"{nb_listed} video(s) found on current page")
for hit in hits:
url = urllib.parse.urljoin(self.talks_base_url, hit["slug"])
json_data = self.extract_info_from_video_page(url)
if json_data is None:
continue
lang_code = json_data["language"]
player_data = json_data["playerData"]
# we need to filter videos since this has not been done
# before for topics with the "new" search page (2023)
if self.source_languages:
# If the first video which was fetched is in self.source_languages
# save it and increment the counter.
if (
lang_code in self.source_languages
and self.update_videos_list_from_info(json_data)
):
nb_extracted += 1
# Determine the next languages to fetch from source_languages
other_languages = [
code for code in self.source_languages if code != lang_code
]
# If there are any valid language codes which can be fetched, fetch them
# and save accordingly
if other_languages:
other_lang_urls = self.generate_urls_for_other_languages(
url, other_languages
)
logger.debug(
f"Searching info for the video in {len(other_lang_urls)} "
"other language(s)"
)
for lang_url in other_lang_urls:
data = self.extract_info_from_video_page(lang_url)
if data is not None and self.update_videos_list_from_info(data):
# It is possible that this is the first time we
# are saving this video as the first video might
# not necessarily be in the source_languages.
# We increment the counter relying on the fact that
# update_videos_list returns True only if this
# is the first time we are saving the video.
nb_extracted += 1
if lang_code not in self.source_languages:
# Video language fetched is not among the selected ones, we have to
# check subtitles if they are enough
if not self.subtitles_enough:
logger.debug(
f"Ignoring video in non-selected language {lang_code}"
)
else:
matching_languages = [
lang
for lang in player_data["languages"]
if lang["languageCode"] in self.source_languages
]
if len(matching_languages) == 0:
logger.debug(
"Ignoring video without a selected language"
"in audio or subtitles"
)
else:
# Since we are searching for all languages, first update the
# videos list with the data we just scraped.
if self.update_videos_list_from_info(json_data):
nb_extracted += 1
# We use the the languages returned from the json_data of
# this video to generate other language urls
other_languages = []
for language in player_data["languages"]:
# Do not include the language of the video that was just scraped
if language["languageCode"] == lang_code:
continue
other_languages.append(language["languageCode"])
if other_languages:
other_lang_urls = self.generate_urls_for_other_languages(
url, other_languages
)
logger.debug(
f"Searching info for the video in {len(other_lang_urls)} "
"other language(s)"
)
for lang_url in other_lang_urls:
data = self.extract_info_from_video_page(lang_url)
if data is not None:
self.update_videos_list_from_info(data)
logger.debug(f"Seen {hit['slug']}")
self.already_visited.add(urllib.parse.urlparse(url).path)
return nb_extracted, nb_listed
def get_lang_code_from_url(self, url, *, with_full_query=False):
"""gets the queried language code from a ted talk url"""
# sample - https://www.ted.com/talks/alex_rosenthal_the_gauntlet_think_like_a_coder_ep_8?language=ja
url_parts = list(urllib.parse.urlparse(url))
# explode url to extract `language` query field value
query = dict(urllib.parse.parse_qsl(url_parts[4]))
current_lang = query.get("language")
if with_full_query:
return current_lang, query
return current_lang
def extract_download_link(self, talk_data):
"""Returns download link / youtube video ID for a TED video"""
if (
isinstance(talk_data.get("resources", {}).get("h264"), list)
and len(talk_data["resources"]["h264"])
and talk_data["resources"]["h264"][0].get("file")
):
logger.debug(
"Using h264 resource link for bitrate="
f"{talk_data['resources']['h264'][0].get('bitrate')}"
)
download_link = talk_data["resources"]["h264"][0]["file"]
else:
download_link = None
if (
talk_data.get("external", {}).get("service")
and talk_data["external"]["service"] == "YouTube"
and talk_data["external"].get("code")
):
logger.debug(f"Found Youtube ID {talk_data['external']['code']}")
youtube_id = talk_data["external"]["code"]
else:
youtube_id = None
return download_link, youtube_id
def update_videos_list(
self,
video_id,
lang_code,
lang_name,
title,
description,
speaker,
speaker_profession,
speaker_bio,
speaker_picture,
date,
thumbnail,
video_link,
youtube_id,
length,
subtitles,
metadata_link,
native_talk_language,
):
# append to self.videos and return if not present
if not [video for video in self.videos if video.get("id", None) == video_id]:
# Fetch metadata and compute subtitles offset (sum up all domains durations
# up till the primary domain) - we do it only once per video since this
# information is same for all languages
subtitles_offset = 0
if metadata_link:
metadatas = request_url(metadata_link).json()
if "domains" in metadatas:
for domain in metadatas["domains"]:
if domain["primaryDomain"]:
break
subtitles_offset += int(domain["duration"] * 1000)
self.videos.append(
{
"id": video_id,
"languages": [
{
"languageCode": lang_code,
"languageName": tedlang.get_display_name(
lang_code, lang_name
),
}
],
"title": [{"lang": lang_code, "text": title}],
"description": [{"lang": lang_code, "text": description}],
"speaker": speaker,
"speaker_profession": speaker_profession,
"speaker_bio": speaker_bio,
"speaker_picture": speaker_picture,
"date": date,
"thumbnail": thumbnail,
"video_link": video_link,
"youtube_id": youtube_id,
"length": length,
"subtitles": subtitles,
"subtitles_offset": subtitles_offset,
"native_talk_language": native_talk_language,
}
)
logger.debug(f"Successfully inserted video {video_id} into video list")
return True
# update localized meta for video if already in self.videos
# based on --subtitles=matching
logger.debug(f"Video {video_id} already present in video list")
for index, video in enumerate(self.videos):
if video.get("failed", False):
continue
if video.get("id", None) == video_id:
if {"lang": lang_code, "text": title} not in video["title"]:
self.videos[index]["title"].append(
{"lang": lang_code, "text": title}
)
self.videos[index]["description"].append(
{"lang": lang_code, "text": description}
)
self.videos[index]["languages"].append(
{
"languageCode": lang_code,
"languageName": tedlang.get_display_name(
lang_code, lang_name
),
}
)
if self.subtitles_setting in (MATCHING, NONE) and len(subtitles) == 1:
self.videos[index]["subtitles"] += subtitles
return False
def get_lang_code_and_name(self, json_data):
player_data = json_data["playerData"]
lang_code = json_data["language"]
try:
lang_name = [
lang["languageName"]
for lang in player_data["languages"]
if lang["languageCode"] == lang_code
][-1]
except Exception as exc:
logger.warning(f"player data has no entry for {lang_code}: {exc}")
lang_name = lang_code
return lang_code, lang_name
def update_videos_list_from_info(self, json_data):
player_data = json_data["playerData"]
lang_code, lang_name = self.get_lang_code_and_name(json_data)
native_talk_language = player_data["nativeLanguage"]
# Extract the speaker of the TED talk
if len(json_data["speakers"]):
if isinstance(json_data["speakers"], dict):
speaker_info = (
json_data["speakers"]["nodes"][0]
if json_data["speakers"].get("nodes", [])
else {}
)
elif isinstance(json_data["speakers"], list):
speaker_info = json_data["speakers"][0]
else:
raise OSError(f"Unexpected speaker JSON format: {json_data}")
speaker = " ".join(
[
speaker_info.get("firstame", ""),
speaker_info.get("middlename", ""),
speaker_info.get("lastname", ""),
]
)
else:
speaker_info = {
"description": "None",
"whotheyare": "None",
"photo_url": "",
}
if "presenterDisplayName" in json_data:
speaker = json_data["presenterDisplayName"]
else:
speaker = "None"
# Extract the ted talk details from json
video_id = json_data["id"]
speaker_profession = speaker_info.get("description")
speaker_bio = speaker_info.get("whoTheyAre", "-")
speaker_picture = speaker_info.get("photoUrl", "-")
title = json_data.get("title", "n/a")
description = json_data.get("description", "n/a")
date = (
dateutil.parser.parse(json_data["recordedOn"]).strftime("%d %B %Y")
if json_data.get("recordedOn")
else "Unknown"
)
length = int(json_data["duration"]) // 60
thumbnail = player_data["thumb"]
video_link, youtube_id = self.extract_download_link(player_data)
if not video_link and not youtube_id:
logger.error(
"No suitable download link or Youtube ID found. Skipping video"
)
return False
langs = player_data["languages"]
metadata_link = player_data["resources"]["hls"]["metadata"]
subtitles = self.generate_subtitle_list(
video_id, langs, lang_code, native_talk_language
)
return self.update_videos_list(
video_id=video_id,
lang_code=lang_code,
lang_name=lang_name,
title=title,
description=description,
speaker=speaker,
speaker_profession=speaker_profession,
speaker_bio=speaker_bio,
speaker_picture=speaker_picture,
date=date,
thumbnail=thumbnail,
video_link=video_link,
youtube_id=youtube_id,
length=length,
subtitles=subtitles,
metadata_link=metadata_link,
native_talk_language=native_talk_language,
)
def extract_info_from_video_page(
self, url: str, retry_count: int = 0
) -> dict | None:
"""extract all info from a TED video page url.
Returns a dict containign the video information if search was
successful, else None.
"""
# Every TED video page has a <script>-tag with a Javascript
# object with JSON in it. We will just stip away the object
# signature and load the json to extract meta-data out of it.
# returns True if successfully scraped new video
# don't scrape if URL already visited
if urllib.parse.urlparse(url).path in self.already_visited:
return None
# don't scrape if maximum retry count is reached
if retry_count > 5: # noqa: PLR2004
logger.error("Max retries exceeded. Skipping video")
return None
logger.debug(f"extract_info_from_video_page: {url}")
html_content = request_url(url).text
try:
soup = BeautifulSoup(html_content, features="html.parser")
json_data = json.loads(
soup.find(
"script", attrs={"id": "__NEXT_DATA__"}
).string # pyright: ignore
)["props"]["pageProps"]["videoData"]
requested_lang_code = self.get_lang_code_from_url(url)
if requested_lang_code and json_data["language"] != requested_lang_code:
logger.error(
f"Video has not yet been translated into {requested_lang_code}"
)
return None
# Desrialize the data at json_data["playerData"] into a dict
# and overwrite it accordingly
json_data["playerData"] = json.loads(json_data["playerData"])
return json_data
except Exception:
logger.error(
f"Problem occured while parsing {url}. HTML content was:\n"
f"{html_content}"
)
raise
def add_default_language(self):
"""add metatada in default language (english or first avail) on all videos"""
for video in self.videos:
if video.get("failed", False):
continue
en_found = False
for index, lang in enumerate(video["languages"]):
if lang["languageCode"] == "en":
en_found = True
video["title"] = [
{"lang": "default", "text": video["title"][index]["text"]}
] + video["title"]
video["description"] = [
{"lang": "default", "text": video["description"][index]["text"]}
] + video["description"]
if not en_found:
video["title"] = [
{"lang": "default", "text": video["title"][0]["text"]}
] + video["title"]
video["description"] = [
{"lang": "default", "text": video["description"][0]["text"]}
] + video["description"]
# update video slug
video["slug"] = slugify(video["title"][0]["text"], separator="-")
def render_video_pages(self):
# Render static html pages from the scraped video data and
# save the pages in build_dir/<video-id>/index.html
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(str(self.templates_dir)), autoescape=True
)
for video in self.videos:
if video.get("failed", False):
continue
titles = video["title"]
html = env.get_template("article.html").render(
speaker=video["speaker"],
languages=video["subtitles"],
speaker_bio=video["speaker_bio"].replace("Full bio", ""),
speaker_img=video["speaker_picture"],
date=video["date"],
profession=video["speaker_profession"],
video_format=self.video_format,
autoplay=self.autoplay,
video_id=str(video["id"]),
title=get_main_title(titles, self.locale_ted_codes),
titles=titles,
descriptions=video["description"],
back_to_list=_("Back to the list"),
)
html_path = self.build_dir.joinpath(video["slug"])
with open(html_path, "w", encoding="utf-8") as html_page:
html_page.write(html) # pyright: ignore[reportGeneralTypeIssues]
def render_home_page(self):
# Render the homepage
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(str(self.templates_dir)), autoescape=True
)
all_langs = {
language["languageCode"]: language["languageName"]
for video in self.videos
if not video.get("failed", False)
for language in video["subtitles"] + video["languages"]
}
languages = [
{"languageName": value, "languageCode": key}
for key, value in all_langs.items()
]
languages = sorted(languages, key=lambda x: x["languageName"])
html = env.get_template("home.html").render(
languages=languages,
page_title=_("TED Talks"),
language_filter_text=_("Filter by language"),
back_to_top=_("Back to the top"),
pagination_text=_("Page"),
)
home_page_path = self.build_dir.joinpath("index")
with open(home_page_path, "w", encoding="utf-8") as html_page:
html_page.write(html) # pyright: ignore[reportGeneralTypeIssues]
def copy_files_to_build_directory(self):
# Copy files from template_dir to build_dir
assets_dir = self.templates_dir.joinpath("assets")
if assets_dir.exists():
shutil.copytree(
assets_dir, self.build_dir.joinpath("assets"), dirs_exist_ok=True
)
shutil.copy(
self.templates_dir.joinpath("favicon.png"),
self.build_dir.joinpath("favicon.png"),
)
def generate_datafile(self):
"""Generate data.js inside assets folder"""
video_list = []
for video in self.videos:
if video.get("failed", False):
continue
lang_codes = [lang["languageCode"] for lang in video["subtitles"]] + [
lang["languageCode"] for lang in video["languages"]
]
json_data = {
"languages": list(set(lang_codes)),
"id": video["id"],
"description": video["description"],
"title": video["title"],
"speaker": video["speaker"],
"slug": video["slug"],
}
video_list.append(json_data)
assets_path = self.build_dir.joinpath("assets")
if not assets_path.exists():
assets_path.mkdir(parents=True)
with open(assets_path.joinpath("data.js"), "w") as data_file:
data_file.write("json_data = " + json.dumps(video_list, indent=4))
def download_jpeg_image_and_convert(self, url, fpath, preset_options, resize=None):
"""downloads a JPEG image and convert to proper format
Image is automatically converted and optimized into desired format detected from
fpath
"""
org_jpeg_path = pathlib.Path(
tempfile.NamedTemporaryFile(delete=False, suffix=".jpg").name
)
save_large_file(url, org_jpeg_path)
if resize is not None:
resize_image(
org_jpeg_path,
width=resize[0],
height=resize[1],
method="cover",
)
optimize_image(
org_jpeg_path, fpath, convert=True, delete_src=True, **preset_options
)
logger.debug(f"Converted {org_jpeg_path} to {fpath} and optimized ")
def download_speaker_image(
self, video_id, video_title, video_speaker, speaker_path
):
"""downloads the speaker image"""
downloaded_from_cache = False
preset = WebpMedium()
s3_key = f"speaker_image/{video_id}" if self.s3_storage else None
if self.s3_storage: