Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robo OOP #6

Open
wants to merge 9 commits into
base: robo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 61 additions & 40 deletions app/robo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import os
import json
from functools import lru_cache
from dotenv import load_dotenv
import requests
from pandas import DataFrame
Expand All @@ -10,59 +11,80 @@

API_KEY = os.getenv("ALPHAVANTAGE_API_KEY", default="abc123")

def fetch_data(symbol):
request_url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={API_KEY}"
response = requests.get(request_url)
return json.loads(response.text)

def process_data(parsed_response):
if "Time Series (Daily)" not in list(parsed_response.keys()):
return None

records = []
for date, daily_data in parsed_response["Time Series (Daily)"].items():
records.append({
"date": date,
"open": float(daily_data["1. open"]),
"high": float(daily_data["2. high"]),
"low": float(daily_data["3. low"]),
"close": float(daily_data["4. close"]),
"volume": int(daily_data["5. volume"]),
})
return DataFrame(records)

def summarize_data(prices_df):
""" Param : prices_df (pandas.DataFrame) """
return {
"latest_close": prices_df.iloc[0]["close"],
"recent_high": prices_df["high"].max(),
"recent_low": prices_df["low"].min(),
}

def prepare_data_for_charting(prices_df):
""" Sorts the data by date ascending, so it can be charted """
chart_df = prices_df.copy()
chart_df.sort_values(by="date", ascending=True, inplace=True)
#chart_df.index.reset_index(inplace=True)
return chart_df
class RoboAdvisor:
def __init__(self, symbol, api_key=API_KEY):
self.symbol = symbol
self.api_key = api_key
self._parsed_response = None

def fetch_data(self):
# a private method of sorts - can alternatively be called _fetch_data() or something
request_url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={self.symbol}&apikey={self.api_key}"
response = requests.get(request_url)
return json.loads(response.text)

@property
@lru_cache(maxsize=None) # cache the results of this network request!
def parsed_response(self):
return self._parsed_response or self.fetch_data() # only make a real network request if we have not overridden / set this value (like we do when testing)

@parsed_response.setter # use a setter (mainly for testing purposes). see: https://docs.python.org/3/library/functions.html#property
def parsed_response(self, value):
self._parsed_response = value

@property
@lru_cache(maxsize=None) # cache the results of this data processing!
def prices_df(self):
if "Time Series (Daily)" not in list(self.parsed_response.keys()):
return None

records = []
for date, daily_data in self.parsed_response["Time Series (Daily)"].items():
records.append({
"date": date,
"open": float(daily_data["1. open"]),
"high": float(daily_data["2. high"]),
"low": float(daily_data["3. low"]),
"close": float(daily_data["4. close"]),
"volume": int(daily_data["5. volume"]),
})
return DataFrame(records)

@property
@lru_cache(maxsize=None) # cache the results of this data processing!
def summary(self):
""" Param : prices_df (pandas.DataFrame) """
return {
"latest_close": self.prices_df.iloc[0]["close"],
"recent_high": self.prices_df["high"].max(),
"recent_low": self.prices_df["low"].min(),
}

@property
@lru_cache(maxsize=None) # cache the results of this data processing!
def chart_df(self):
""" Sorts the data by date ascending, so it can be charted """
chart_df = self.prices_df.copy()
chart_df.sort_values(by="date", ascending=True, inplace=True)
return chart_df


if __name__ == '__main__':

# FETCH DATA

symbol = input("Please input a stock symbol (e.g. 'MSFT'): ")
parsed_response = fetch_data(symbol)
advisor = RoboAdvisor(symbol=symbol)

# PROCESS DATA

df = process_data(parsed_response)
df = advisor.prices_df

if isinstance(df, DataFrame):

# DISPLAY RESULTS

summary = summarize_data(df)
summary = advisor.summary

print("LATEST CLOSING PRICE: ", summary["latest_close"])
print("RECENT HIGH: ", summary["recent_high"])
Expand All @@ -75,6 +97,5 @@ def prepare_data_for_charting(prices_df):

# CHART PRICES OVER TIME

chart_df = prepare_data_for_charting(df)
fig = px.line(chart_df, x="date", y="close", title=f"Closing Prices for {symbol.upper()}") # see: https://plotly.com/python-api-reference/generated/plotly.express.line
fig = px.line(advisor.chart_df, x="date", y="close", title=f"Closing Prices for {symbol.upper()}") # see: https://plotly.com/python-api-reference/generated/plotly.express.line
fig.show()
28 changes: 23 additions & 5 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,33 @@
#

import pytest
from app.robo import fetch_data
from app.robo import RoboAdvisor

@pytest.fixture(scope="module")
def parsed_googl_response():
return fetch_data("GOOGL")
def googl_advisor():
return RoboAdvisor(symbol="AMZN") # FOR TESTING REAL REQUESTS (REMEMBER TO SKIP CI)

@pytest.fixture(scope="module")
def parsed_oops_response():
return fetch_data("OOPS")
def oops_advisor():
return RoboAdvisor(symbol="OOPS") # FOR TESTING REAL REQUESTS (REMEMBER TO SKIP CI)

@pytest.fixture(scope="module")
def mock_msft_advisor():
advisor = RoboAdvisor(symbol="MSFT")
advisor.parsed_response = mock_msft_response # OVERRIDE WITH MOCK DATA
return advisor

@pytest.fixture(scope="module")
def mock_amzn_advisor():
advisor = RoboAdvisor(symbol="AMZN")
advisor.parsed_response = mock_amzn_response # OVERRIDE WITH MOCK DATA
return advisor

@pytest.fixture(scope="module")
def mock_oops_advisor():
advisor = RoboAdvisor(symbol="OOPS")
advisor.parsed_response = mock_error_response # OVERRIDE WITH MOCK DATA
return advisor

#
# MOCK DATA
Expand Down
63 changes: 34 additions & 29 deletions test/robo_test.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@

from conftest import mock_msft_response, mock_amzn_response #, mock_error_response, mock_rate_limit_response
import os
import pytest
from pandas import DataFrame

from app.robo import process_data, summarize_data, prepare_data_for_charting
from app.robo import RoboAdvisor

# expect default environment variable setting of "CI=true" on Travis CI. see: https://docs.travis-ci.com/user/environment-variables/#default-environment-variables
CI_ENV = os.getenv("CI") == "true"

# SKIP CI
def test_fetch(parsed_googl_response):
# it should fetch data containing certain expected characteristics:
response_keys = list(parsed_googl_response.keys()) # we are testing the fetch_data function indirectly through our fixture (see conftest.py)
@pytest.mark.skipif(CI_ENV==True, reason="avoid issuing HTTP requests on the CI server") # skips this test on CI
def test_parsed_response(googl_advisor, oops_advisor):
# with valid symbol, should containing certain expected characteristics (time series data with daily prices):
parsed_response = googl_advisor.parsed_response
response_keys = list(parsed_response.keys()) # we are testing the fetch_data function indirectly through our fixture (see conftest.py)
assert "Meta Data" in response_keys
assert "Time Series (Daily)" in response_keys
# ... including time series data with daily prices:
daily_prices = list(parsed_googl_response["Time Series (Daily)"].values())[0] #> {'1. open': '2068.4700', '2. high': '2099.0000', '3. low': '2044.1218', '4. close': '2082.2200', '5. volume': '1319126'}
daily_prices = list(parsed_response["Time Series (Daily)"].values())[0] #> {'1. open': '2068.4700', '2. high': '2099.0000', '3. low': '2044.1218', '4. close': '2082.2200', '5. volume': '1319126'}
price_keys = list(daily_prices.keys())
assert price_keys == ["1. open", "2. high", "3. low", "4. close", "5. volume"]

# SKIP CI
def test_process(parsed_googl_response, parsed_oops_response):
# it should process the nested response data:
googl_df = process_data(parsed_googl_response)
assert isinstance(googl_df, DataFrame)
assert len(googl_df) == 100
assert list(googl_df.columns) == ["date", "open", "high", "low", "close", "volume"]

# it should gracefully handle response errors:
assert process_data(parsed_oops_response) is None


def test_summarize():
# it should summarize and aggregate the data:
assert summarize_data(process_data(mock_msft_response)) == {
# with invalid symbol, should contain an error message / not the expected data:
parsed_response = oops_advisor.parsed_response
response_keys = list(parsed_response.keys()) # we are testing the fetch_data function indirectly through our fixture (see conftest.py)
assert "Meta Data" not in response_keys
assert "Time Series (Daily)" not in response_keys

@pytest.mark.skipif(CI_ENV==True, reason="avoid issuing HTTP requests on the CI server") # skips this test on CI
def test_prices_df(googl_advisor, oops_advisor):
# with valid symbol, should provide a dataframe with expected headers:
assert isinstance(googl_advisor.prices_df, DataFrame)
assert len(googl_advisor.prices_df) == 100
assert list(googl_advisor.prices_df.columns) == ["date", "open", "high", "low", "close", "volume"]

# with invalid symbol, should gracefully handle response errors / be null:
assert oops_advisor.prices_df is None

def test_summary(mock_msft_advisor, mock_amzn_advisor):
# should summarize and aggregate the data, noting the latest close, recent high, and recent low:
assert mock_msft_advisor.summary == {
'latest_close': 237.71,
'recent_high': 240.055,
'recent_low': 231.81
}
assert summarize_data(process_data(mock_amzn_response)) == {
assert mock_amzn_advisor.summary == {
'latest_close': 3091.86,
'recent_high': 3131.7843,
'recent_low': 3030.05
}

def test_charting():
def test_charting(mock_amzn_advisor):
# it should sort dates in the proper order (ascending) for charting:
df = process_data(mock_amzn_response)
chart_df = prepare_data_for_charting(df)
assert chart_df["date"].tolist() == ['2030-03-10', '2030-03-11', '2030-03-12', '2030-03-15', '2030-03-16']
assert mock_amzn_advisor.chart_df["date"].tolist() == ['2030-03-10', '2030-03-11', '2030-03-12', '2030-03-15', '2030-03-16']