Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed and added files according to Telegram Updated Repo #4

Merged
merged 4 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
API_KEY=87623783487347845737
KERNEL_PLANCKSTER_HOST=localhost
KERNEL_PLANCKSTER_PORT=8000
KERNEL_PLANCKSTER_AUTH_TOKEN=test123
KERNEL_PLANCKSTER_PROTOCOL=http
HOST=localhost
PORT=8000
PORT=8000
MINIO_ACCESS_KEY=minio
MINIO_SECRET_KEY=minio123
MINIO_HOST=localhost
MINIO_PORT=9091
MINIO_BUCKET=sda

STORAGE_PROTOCOL=s3
OPENAI_API_KEY = "Your OpenAI key"
SCRAPERAPI_KEY = "Your Scraper API Key"
12 changes: 12 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"mypy.runUsingActiveInterpreter": true,
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
},
}
287 changes: 287 additions & 0 deletions app/scraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
from logging import Logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to include instructor, pandas, openai, pydantic, requests in your requirements.txt

import logging
import requests
import pandas as pd
import json
import time
import sys
from app.sdk.models import KernelPlancksterSourceData, BaseJobState, JobOutput
from app.sdk.scraped_data_repository import ScrapedDataRepository
import os
from pydantic import BaseModel
from typing import Literal
import instructor
from instructor import Instructor
from openai import OpenAI
from geopy.geocoders import Nominatim

class messageData(BaseModel):
city: str
country: str
year: int
month: Literal['January', 'Febuary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
day: Literal['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31']
disaster_type: Literal['Wildfire', 'Other']

#Potential alternate prompting
# class messageDataAlternate(BaseModel):
# city: str
# country: str
# year: int
# month: Literal['January', 'Febuary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December', 'Unsure']
# day: Literal['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', 'Unsure']
# disaster_type: Literal['Wildfire', 'Other']

class filterData(BaseModel):
relevant: bool

class TwitterScrapeRequestModel(BaseModel):
query: str
outfile: str
api_key: str



def scrape(
job_id: int,
tracer_id:str,
query: str,
start_date: str,
end_date: str,
api_key: str,
scraped_data_repository: ScrapedDataRepository,
work_dir: str,
log_level: Logger,
) -> JobOutput:
try:
logger = logging.getLogger(__name__)
logging.basicConfig(level=log_level)

job_state = BaseJobState.CREATED
current_data: KernelPlancksterSourceData | None = None
last_successful_data: KernelPlancksterSourceData | None = None

protocol = scraped_data_repository.protocol

output_data_list: list[KernelPlancksterSourceData] = []

filter = "forest wildfire"
# Enables `response_model`
client = instructor.from_openai(OpenAI())

logger.info(f"{job_id}: Starting Job")
job_state = BaseJobState.RUNNING
results = []
augmented_results = []
page = 1
tweet_count = 0
while True:
payload = {
'api_key': api_key,
'query': query,
'date_range_start': start_date,
'date_range_end': end_date,
'page': page,
'format': 'json'
}
try:
response = requests.get('https://api.scraperapi.com/structured/twitter/search', params=payload)
response.raise_for_status()
data = response.json()
if 'organic_results' in data:
new_tweets = data['organic_results']
results.extend(new_tweets)

augmented_tweet = None
for tweet in new_tweets:
if tweet != None:
augmented_tweet = augment_tweet(client, tweet, filter)
if augmented_tweet != None:
augmented_results.append(augmented_tweet)

tweet_count += len(new_tweets)
logger.info(f"{job_id}: Fetched {tweet_count} tweets so far...")

current_data = KernelPlancksterSourceData(
name=f"tweet_{page}",
protocol=protocol,
relative_path=f"twitter/{tracer_id}/{job_id}/scraped/tweet_{page}.json",
)
output_data_list.append(current_data)
lfp = f"{work_dir}/twitter/tweet_{page}.json"

save_tweets(new_tweets, lfp)
scraped_data_repository.register_scraped_json(current_data, job_id, lfp )

last_successful_data = current_data
else:
logger.error(f"{job_id}: Error: {response.status_code} - {response.text}")
logger.info("No more tweets found for this query. Scraping completed.")

save_tweets(results, f"{work_dir}/twitter/tweet_all.json")

final_data = KernelPlancksterSourceData(
name=f"tweet_all",
protocol=protocol,
relative_path=f"twitter/{tracer_id}/{job_id}/scraped/tweet_all.json",
)
scraped_data_repository.register_scraped_json(final_data, job_id, f"{work_dir}/twitter/tweet_all.json" )

# write augmented data to file: --> title, content, extracted_location, lattitude, longitude, month, day, year, disaster_type

df = pd.DataFrame(augmented_results, columns=["Title", "Tweet", "Extracted_Location", "Resolved_Latitude", "Resolved_Longitude", "Month", "Day", "Year", "Disaster_Type"])
df.to_json(f"{work_dir}/twitter/augmented_twitter_scrape.json", orient='index', indent=4)

final_augmented_data = KernelPlancksterSourceData(
name=f"tweet_all_augmented",
protocol=protocol,
relative_path=f"twitter/{tracer_id}/{job_id}/augmented/data.json",
)
scraped_data_repository.register_scraped_json(final_augmented_data, job_id, f"{work_dir}/twitter/augmented_twitter_scrape.json" )

break
except requests.exceptions.HTTPError as e:
job_state = BaseJobState.FAILED
logger.error(f"{job_id}: HTTP Error: {e}")
continue
except requests.exceptions.JSONDecodeError as e:
job_state = BaseJobState.FAILED
logger.error(f"{job_id}: JSON Decode Error: {e}")
logger.info("Retrying request after a short delay...")
time.sleep(5)
continue
except Exception as e:
job_state = BaseJobState.FAILED
logger.error(f"{job_id}: Unable to scrape data. Error:\n{e}\nJob with tracer_id {job_id} failed.\nLast successful data: {last_successful_data}\nCurrent data: \"{current_data}\", job_state: \"{job_state}\"")
continue

page += 1
time.sleep(1)



job_state = BaseJobState.FINISHED
logger.info(f"{job_id}: Job finished")

return JobOutput(
job_state=job_state,
tracer_id=str(job_id),
source_data_list=output_data_list,
)

except Exception as error:
logger.error(f"{job_id}: Unable to scrape data. Job with tracer_id {job_id} failed. Error:\n{error}")
job_state = BaseJobState.FAILED
return JobOutput(
job_state=job_state,
tracer_id=str(job_id),
source_data_list=[],
)

def save_tweets(tweets, file_path):

os.makedirs(os.path.dirname(file_path), exist_ok=True )

with open(file_path, 'w+') as f:
tweet_data = [{"tweet": tweet, "tweet_number": i + 1} for i, tweet in enumerate(tweets)]
json.dump(tweet_data, f)


def load_tweets(file_path):
out = None
with open(file_path, 'r') as f:
out = json.load(f)
return out

def augment_tweet(client:Instructor , tweet: dict, filter: str):
if len(tweet) > 5:
# extract aspects of the tweet
title = tweet["title"]
content = tweet["snippet"]
link = tweet["link"]

formatted_tweet_str = "User " + title + " tweets: " + content.strip("...").strip(",").strip() + "."

#relvancy filter with gpt-4
filter_data = client.chat.completions.create(
model="gpt-4",
response_model=filterData,
messages=[
{
"role": "user",
"content": f"Examine this tweet: {formatted_tweet_str}. Is this tweet describing {filter}? "
},
]
)

if filter_data.relevant == True:
aug_data = None
try:
#location extraction with gpt-3.5
aug_data = client.chat.completions.create(
model="gpt-4-turbo",
response_model=messageData,
messages=[
{
"role": "user",
"content": f"Extract: {formatted_tweet_str}"
},
]
)
except Exception as e:
Logger.info("Could not augment tweet, trying with alternate prompt")
#Potential alternate prompting

# try:
# #location extraction with gpt-3.5
# aug_data = client.chat.completions.create(
# model="gpt-4-turbo",
# response_model=messageDataAlternate,
# messages=[
# {
# "role": "user",
# "content": f"Extract: {formatted_tweet_str}"
# },
# ]
# )
# except Exception as e2:
return None
city = aug_data.city
country = aug_data.country
extracted_location = city + "," + country
year = aug_data.year
month = aug_data.month
day = aug_data.day
disaster_type = aug_data.disaster_type

# NLP-informed geolocation
try:
coordinates = get_lat_long(extracted_location)
except Exception as e:
coordinates = None
if coordinates:
lattitude = coordinates[0]
longitude = coordinates[1]
else:
lattitude = "no latitude"
longitude = "no longitude"

#TODO: format date

return [title, content, extracted_location, lattitude, longitude, month, day, year, disaster_type]

# utility function for augmenting tweets with geolocation
def get_lat_long(location_name):
geolocator = Nominatim(user_agent="location_to_lat_long")
try:
location = geolocator.geocode(location_name)
if location:
latitude = location.latitude
longitude = location.longitude
return latitude, longitude
else:
return None
except Exception as e:
print(f"Error: {e}")
return None

74 changes: 74 additions & 0 deletions app/sdk/file_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import os
import shutil

import requests
from app.sdk.models import KernelPlancksterSourceData, ProtocolEnum


class FileRepository:
def __init__(
self,
protocol: ProtocolEnum,
data_dir: str = "data", # can be used for config
) -> None:
self._protocol = protocol
self._data_dir = data_dir
self._logger = logging.getLogger(__name__)

@property
def protocol(self) -> ProtocolEnum:
return self._protocol

@property
def data_dir(self) -> str:
return self._data_dir

@property
def logger(self) -> logging.Logger:
return self._logger

def file_name_to_pfn(self, file_name: str) -> str:
return f"{self.protocol}://{file_name}"

def pfn_to_file_name(self, pfn: str) -> str:
return pfn.split("://")[1]

def source_data_to_file_name(self, source_data: KernelPlancksterSourceData) -> str:
return f"{self.data_dir}/{source_data.relative_path}"

def save_file_locally(self, file_to_save: str, source_data: KernelPlancksterSourceData, file_type: str) -> str:
"""
Save a file to a local directory.

:param file_to_save: The path to the file to save.
:param source_data: The source data to save.
:param file_type: The type of file to save.
"""

file_name = self.source_data_to_file_name(source_data)
self.logger.info(f"Saving {file_type} '{source_data}' to '{file_name}'.")

os.makedirs(os.path.dirname(file_name), exist_ok=True)
shutil.copy(file_to_save, file_name)

self.logger.info(f"Saved {file_type} '{source_data}' to '{file_name}'.")

pfn = self.file_name_to_pfn(file_name)

return pfn


def public_upload(self, signed_url: str, file_path: str) -> None:
"""
Upload a file to a signed url.

:param signed_url: The signed url to upload to.
:param file_path: The path to the file to upload.
"""

with open(file_path, "rb") as f:
upload_res = requests.put(signed_url, data=f)

if upload_res.status_code != 200:
raise ValueError(f"Failed to upload file to signed url: {upload_res.text}")
Loading