-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changed and added files according to Telegram Updated Repo #4
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,15 @@ | ||
API_KEY=87623783487347845737 | ||
KERNEL_PLANCKSTER_HOST=localhost | ||
KERNEL_PLANCKSTER_PORT=8000 | ||
KERNEL_PLANCKSTER_AUTH_TOKEN=test123 | ||
KERNEL_PLANCKSTER_PROTOCOL=http | ||
HOST=localhost | ||
PORT=8000 | ||
PORT=8000 | ||
MINIO_ACCESS_KEY=minio | ||
MINIO_SECRET_KEY=minio123 | ||
MINIO_HOST=localhost | ||
MINIO_PORT=9091 | ||
MINIO_BUCKET=sda | ||
|
||
STORAGE_PROTOCOL=s3 | ||
OPENAI_API_KEY = "Your OpenAI key" | ||
SCRAPERAPI_KEY = "Your Scraper API Key" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"python.testing.pytestArgs": [ | ||
"tests" | ||
], | ||
"python.testing.unittestEnabled": false, | ||
"python.testing.pytestEnabled": true, | ||
"mypy.runUsingActiveInterpreter": true, | ||
"[python]": { | ||
"editor.defaultFormatter": "ms-python.black-formatter", | ||
"editor.formatOnSave": true | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
from logging import Logger | ||
import logging | ||
import requests | ||
import pandas as pd | ||
import json | ||
import time | ||
import sys | ||
from app.sdk.models import KernelPlancksterSourceData, BaseJobState, JobOutput | ||
from app.sdk.scraped_data_repository import ScrapedDataRepository | ||
import os | ||
from pydantic import BaseModel | ||
from typing import Literal | ||
import instructor | ||
from instructor import Instructor | ||
from openai import OpenAI | ||
from geopy.geocoders import Nominatim | ||
|
||
class messageData(BaseModel): | ||
city: str | ||
country: str | ||
year: int | ||
month: Literal['January', 'Febuary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] | ||
day: Literal['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31'] | ||
disaster_type: Literal['Wildfire', 'Other'] | ||
|
||
#Potential alternate prompting | ||
# class messageDataAlternate(BaseModel): | ||
# city: str | ||
# country: str | ||
# year: int | ||
# month: Literal['January', 'Febuary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December', 'Unsure'] | ||
# day: Literal['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', 'Unsure'] | ||
# disaster_type: Literal['Wildfire', 'Other'] | ||
|
||
class filterData(BaseModel): | ||
relevant: bool | ||
|
||
class TwitterScrapeRequestModel(BaseModel): | ||
query: str | ||
outfile: str | ||
api_key: str | ||
|
||
|
||
|
||
def scrape( | ||
job_id: int, | ||
tracer_id:str, | ||
query: str, | ||
start_date: str, | ||
end_date: str, | ||
api_key: str, | ||
scraped_data_repository: ScrapedDataRepository, | ||
work_dir: str, | ||
log_level: Logger, | ||
) -> JobOutput: | ||
try: | ||
logger = logging.getLogger(__name__) | ||
logging.basicConfig(level=log_level) | ||
|
||
job_state = BaseJobState.CREATED | ||
current_data: KernelPlancksterSourceData | None = None | ||
last_successful_data: KernelPlancksterSourceData | None = None | ||
|
||
protocol = scraped_data_repository.protocol | ||
|
||
output_data_list: list[KernelPlancksterSourceData] = [] | ||
|
||
filter = "forest wildfire" | ||
# Enables `response_model` | ||
client = instructor.from_openai(OpenAI()) | ||
|
||
logger.info(f"{job_id}: Starting Job") | ||
job_state = BaseJobState.RUNNING | ||
results = [] | ||
augmented_results = [] | ||
page = 1 | ||
tweet_count = 0 | ||
while True: | ||
payload = { | ||
'api_key': api_key, | ||
'query': query, | ||
'date_range_start': start_date, | ||
'date_range_end': end_date, | ||
'page': page, | ||
'format': 'json' | ||
} | ||
try: | ||
response = requests.get('https://api.scraperapi.com/structured/twitter/search', params=payload) | ||
response.raise_for_status() | ||
data = response.json() | ||
if 'organic_results' in data: | ||
new_tweets = data['organic_results'] | ||
results.extend(new_tweets) | ||
|
||
augmented_tweet = None | ||
for tweet in new_tweets: | ||
if tweet != None: | ||
augmented_tweet = augment_tweet(client, tweet, filter) | ||
if augmented_tweet != None: | ||
augmented_results.append(augmented_tweet) | ||
|
||
tweet_count += len(new_tweets) | ||
logger.info(f"{job_id}: Fetched {tweet_count} tweets so far...") | ||
|
||
current_data = KernelPlancksterSourceData( | ||
name=f"tweet_{page}", | ||
protocol=protocol, | ||
relative_path=f"twitter/{tracer_id}/{job_id}/scraped/tweet_{page}.json", | ||
) | ||
output_data_list.append(current_data) | ||
lfp = f"{work_dir}/twitter/tweet_{page}.json" | ||
|
||
save_tweets(new_tweets, lfp) | ||
scraped_data_repository.register_scraped_json(current_data, job_id, lfp ) | ||
|
||
last_successful_data = current_data | ||
else: | ||
logger.error(f"{job_id}: Error: {response.status_code} - {response.text}") | ||
logger.info("No more tweets found for this query. Scraping completed.") | ||
|
||
save_tweets(results, f"{work_dir}/twitter/tweet_all.json") | ||
|
||
final_data = KernelPlancksterSourceData( | ||
name=f"tweet_all", | ||
protocol=protocol, | ||
relative_path=f"twitter/{tracer_id}/{job_id}/scraped/tweet_all.json", | ||
) | ||
scraped_data_repository.register_scraped_json(final_data, job_id, f"{work_dir}/twitter/tweet_all.json" ) | ||
|
||
# write augmented data to file: --> title, content, extracted_location, lattitude, longitude, month, day, year, disaster_type | ||
|
||
df = pd.DataFrame(augmented_results, columns=["Title", "Tweet", "Extracted_Location", "Resolved_Latitude", "Resolved_Longitude", "Month", "Day", "Year", "Disaster_Type"]) | ||
df.to_json(f"{work_dir}/twitter/augmented_twitter_scrape.json", orient='index', indent=4) | ||
|
||
final_augmented_data = KernelPlancksterSourceData( | ||
name=f"tweet_all_augmented", | ||
protocol=protocol, | ||
relative_path=f"twitter/{tracer_id}/{job_id}/augmented/data.json", | ||
) | ||
scraped_data_repository.register_scraped_json(final_augmented_data, job_id, f"{work_dir}/twitter/augmented_twitter_scrape.json" ) | ||
|
||
break | ||
except requests.exceptions.HTTPError as e: | ||
job_state = BaseJobState.FAILED | ||
logger.error(f"{job_id}: HTTP Error: {e}") | ||
continue | ||
except requests.exceptions.JSONDecodeError as e: | ||
job_state = BaseJobState.FAILED | ||
logger.error(f"{job_id}: JSON Decode Error: {e}") | ||
logger.info("Retrying request after a short delay...") | ||
time.sleep(5) | ||
continue | ||
except Exception as e: | ||
job_state = BaseJobState.FAILED | ||
logger.error(f"{job_id}: Unable to scrape data. Error:\n{e}\nJob with tracer_id {job_id} failed.\nLast successful data: {last_successful_data}\nCurrent data: \"{current_data}\", job_state: \"{job_state}\"") | ||
continue | ||
|
||
page += 1 | ||
time.sleep(1) | ||
|
||
|
||
|
||
job_state = BaseJobState.FINISHED | ||
logger.info(f"{job_id}: Job finished") | ||
|
||
return JobOutput( | ||
job_state=job_state, | ||
tracer_id=str(job_id), | ||
source_data_list=output_data_list, | ||
) | ||
|
||
except Exception as error: | ||
logger.error(f"{job_id}: Unable to scrape data. Job with tracer_id {job_id} failed. Error:\n{error}") | ||
job_state = BaseJobState.FAILED | ||
return JobOutput( | ||
job_state=job_state, | ||
tracer_id=str(job_id), | ||
source_data_list=[], | ||
) | ||
|
||
def save_tweets(tweets, file_path): | ||
|
||
os.makedirs(os.path.dirname(file_path), exist_ok=True ) | ||
|
||
with open(file_path, 'w+') as f: | ||
tweet_data = [{"tweet": tweet, "tweet_number": i + 1} for i, tweet in enumerate(tweets)] | ||
json.dump(tweet_data, f) | ||
|
||
|
||
def load_tweets(file_path): | ||
out = None | ||
with open(file_path, 'r') as f: | ||
out = json.load(f) | ||
return out | ||
|
||
def augment_tweet(client:Instructor , tweet: dict, filter: str): | ||
if len(tweet) > 5: | ||
# extract aspects of the tweet | ||
title = tweet["title"] | ||
content = tweet["snippet"] | ||
link = tweet["link"] | ||
|
||
formatted_tweet_str = "User " + title + " tweets: " + content.strip("...").strip(",").strip() + "." | ||
|
||
#relvancy filter with gpt-4 | ||
filter_data = client.chat.completions.create( | ||
model="gpt-4", | ||
response_model=filterData, | ||
messages=[ | ||
{ | ||
"role": "user", | ||
"content": f"Examine this tweet: {formatted_tweet_str}. Is this tweet describing {filter}? " | ||
}, | ||
] | ||
) | ||
|
||
if filter_data.relevant == True: | ||
aug_data = None | ||
try: | ||
#location extraction with gpt-3.5 | ||
aug_data = client.chat.completions.create( | ||
model="gpt-4-turbo", | ||
response_model=messageData, | ||
messages=[ | ||
{ | ||
"role": "user", | ||
"content": f"Extract: {formatted_tweet_str}" | ||
}, | ||
] | ||
) | ||
except Exception as e: | ||
Logger.info("Could not augment tweet, trying with alternate prompt") | ||
#Potential alternate prompting | ||
|
||
# try: | ||
# #location extraction with gpt-3.5 | ||
# aug_data = client.chat.completions.create( | ||
# model="gpt-4-turbo", | ||
# response_model=messageDataAlternate, | ||
# messages=[ | ||
# { | ||
# "role": "user", | ||
# "content": f"Extract: {formatted_tweet_str}" | ||
# }, | ||
# ] | ||
# ) | ||
# except Exception as e2: | ||
return None | ||
city = aug_data.city | ||
country = aug_data.country | ||
extracted_location = city + "," + country | ||
year = aug_data.year | ||
month = aug_data.month | ||
day = aug_data.day | ||
disaster_type = aug_data.disaster_type | ||
|
||
# NLP-informed geolocation | ||
try: | ||
coordinates = get_lat_long(extracted_location) | ||
except Exception as e: | ||
coordinates = None | ||
if coordinates: | ||
lattitude = coordinates[0] | ||
longitude = coordinates[1] | ||
else: | ||
lattitude = "no latitude" | ||
longitude = "no longitude" | ||
|
||
#TODO: format date | ||
|
||
return [title, content, extracted_location, lattitude, longitude, month, day, year, disaster_type] | ||
|
||
# utility function for augmenting tweets with geolocation | ||
def get_lat_long(location_name): | ||
geolocator = Nominatim(user_agent="location_to_lat_long") | ||
try: | ||
location = geolocator.geocode(location_name) | ||
if location: | ||
latitude = location.latitude | ||
longitude = location.longitude | ||
return latitude, longitude | ||
else: | ||
return None | ||
except Exception as e: | ||
print(f"Error: {e}") | ||
return None | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import logging | ||
import os | ||
import shutil | ||
|
||
import requests | ||
from app.sdk.models import KernelPlancksterSourceData, ProtocolEnum | ||
|
||
|
||
class FileRepository: | ||
def __init__( | ||
self, | ||
protocol: ProtocolEnum, | ||
data_dir: str = "data", # can be used for config | ||
) -> None: | ||
self._protocol = protocol | ||
self._data_dir = data_dir | ||
self._logger = logging.getLogger(__name__) | ||
|
||
@property | ||
def protocol(self) -> ProtocolEnum: | ||
return self._protocol | ||
|
||
@property | ||
def data_dir(self) -> str: | ||
return self._data_dir | ||
|
||
@property | ||
def logger(self) -> logging.Logger: | ||
return self._logger | ||
|
||
def file_name_to_pfn(self, file_name: str) -> str: | ||
return f"{self.protocol}://{file_name}" | ||
|
||
def pfn_to_file_name(self, pfn: str) -> str: | ||
return pfn.split("://")[1] | ||
|
||
def source_data_to_file_name(self, source_data: KernelPlancksterSourceData) -> str: | ||
return f"{self.data_dir}/{source_data.relative_path}" | ||
|
||
def save_file_locally(self, file_to_save: str, source_data: KernelPlancksterSourceData, file_type: str) -> str: | ||
""" | ||
Save a file to a local directory. | ||
|
||
:param file_to_save: The path to the file to save. | ||
:param source_data: The source data to save. | ||
:param file_type: The type of file to save. | ||
""" | ||
|
||
file_name = self.source_data_to_file_name(source_data) | ||
self.logger.info(f"Saving {file_type} '{source_data}' to '{file_name}'.") | ||
|
||
os.makedirs(os.path.dirname(file_name), exist_ok=True) | ||
shutil.copy(file_to_save, file_name) | ||
|
||
self.logger.info(f"Saved {file_type} '{source_data}' to '{file_name}'.") | ||
|
||
pfn = self.file_name_to_pfn(file_name) | ||
|
||
return pfn | ||
|
||
|
||
def public_upload(self, signed_url: str, file_path: str) -> None: | ||
""" | ||
Upload a file to a signed url. | ||
|
||
:param signed_url: The signed url to upload to. | ||
:param file_path: The path to the file to upload. | ||
""" | ||
|
||
with open(file_path, "rb") as f: | ||
upload_res = requests.put(signed_url, data=f) | ||
|
||
if upload_res.status_code != 200: | ||
raise ValueError(f"Failed to upload file to signed url: {upload_res.text}") |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to include instructor, pandas, openai, pydantic, requests in your requirements.txt