Skip to content

Commit

Permalink
refactor(parsing.utils): parse_entry: improve readability
Browse files Browse the repository at this point in the history
Signed-off-by: Rongrong <i@rong.moe>
  • Loading branch information
Rongronggg9 committed Dec 15, 2024
1 parent 32709d5 commit d252fe6
Showing 1 changed file with 56 additions and 44 deletions.
100 changes: 56 additions & 44 deletions src/parsing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,59 +250,71 @@ class EntryParsed:
title: Optional[str] = None
enclosures: list[Enclosure] = None

# entry.summary returns summary(Atom) or description(RSS)
content = entry.get('content') or entry.get('summary', '')
content = (
entry.get('content') # Atom
or entry.get('summary', '') # Atom summary or RSS description
)

if isinstance(content, list): # Atom
if len(content) == 1:
content = content[0]
if isinstance(content, list) and len(content) > 0: # Atom
for _content in content:
content_type = _content.get('type', '')
if 'html' in content_type or 'xml' in content_type:
content = _content
break
else:
for _content in content:
content_type = _content.get('type', '')
if 'html' in content_type or 'xml' in content_type:
content = _content
break
else:
content = content[0]
content = content[0]
content = content.get('value', '')

EntryParsed.content = await html_validator(content)
EntryParsed.link = entry.get('link') or entry.get('guid')
author = entry['author'] if ('author' in entry and type(entry['author']) is str) else None
author = await ensure_plain(author) if author else None
EntryParsed.author = author or None # reject empty string
# hmm, some entries do have no title, should we really set up a feed hospital?
title = entry.get('title')
title = await ensure_plain(title, enable_emojify=True) if title else None
EntryParsed.title = title or None # reject empty string
if (tags := entry.get('tags')) and isinstance(tags, list):
EntryParsed.tags = list(filter(None, (tag.get('term') for tag in tags)))

if (author := entry.get('author')) and isinstance(author, str):
EntryParsed.author = await ensure_plain(author) or None
if (title := entry.get('title')) and isinstance(title, str):
EntryParsed.title = await ensure_plain(title, enable_emojify=True) or None
if (tags := entry.get('tags')) and isinstance(tags, list) and len(tags) > 0:
EntryParsed.tags = list(filter(
None,
(tag.get('term') for tag in tags)
))

# Collect enclosures (attachment in RSS entries)
enclosures = []

if isinstance(entry.get('links'), list):
for link in (link for link in entry['links'] if link.get('rel') == 'enclosure' and link.get('href')):
enclosures.append(Enclosure(url=resolve_relative_link(feed_link, link['href']),
length=link.get('length'),
_type=link.get('type')))
if enclosures and entry.get('itunes_duration'):
enclosures[0].duration = entry['itunes_duration']

if isinstance(entry.get('media_content'), list):
enclosures_media = []
for media in (media for media in entry['media_content'] if media.get('url')):
media_type = media.get('type') or media.get('medium')
if media_type and 'flash' in media_type: # application/x-shockwave-flash or so on
continue # false media
enclosures_media.append(Enclosure(url=resolve_relative_link(feed_link, media['url']),
length=media.get('fileSize'),
_type=media_type,
duration=media.get('duration')))
if enclosures_media:
if isinstance(entry.get('media_thumbnail'), list) and entry['media_thumbnail'] \
and isinstance(entry['media_thumbnail'][0], dict):
enclosures_media[0].thumbnail = entry['media_thumbnail'][0].get('url')
enclosures.extend(enclosures_media)
# RSS/Atom
if (links := entry.get('links')) and isinstance(links, list) and len(links) > 0:
for link in links:
if link.get('rel') == 'enclosure' and (link_href := link.get('href')):
enclosures.append(
Enclosure(
url=resolve_relative_link(feed_link, link_href),
length=link.get('length'),
_type=link.get('type'),
)
)

# Media RSS
if (media_content := entry.get('media_content')) and isinstance(media_content, list) and len(media_content) > 0:
for media in media_content:
if not (media_url := media.get('url')):
continue
if (media_type := media.get('type') or media.get('medium')) and 'flash' in media_type:
# Skip application/x-shockwave-flash or so on
continue
enclosures.append(
Enclosure(url=resolve_relative_link(feed_link, media_url),
length=media.get('fileSize'),
_type=media_type,
duration=media.get('duration')),
)

if len(enclosures) == 1:
single = enclosures[0]
if single.duration is None and (itunes_duration := entry.get('itunes_duration')):
single.duration = itunes_duration

if (media_thumbnail := entry.get('media_thumbnail', ({},))[0]) and isinstance(media_thumbnail, dict):
single.thumbnail = media_thumbnail.get('url')

EntryParsed.enclosures = enclosures or None

Expand Down

0 comments on commit d252fe6

Please sign in to comment.