-
Notifications
You must be signed in to change notification settings - Fork 1
/
scrape_crypto_top1000.py
166 lines (141 loc) · 6.25 KB
/
scrape_crypto_top1000.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.firefox.options import Options
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
from pymongo import MongoClient
import traceback
import schedule
def initialize_driver():
firefox_options = Options()
firefox_options.add_argument("--headless")
firefox_options.add_argument("--no-sandbox")
firefox_options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Firefox(options=firefox_options)
return driver
def remove_rank_change_from_start(rank, rank_change):
try:
rank_change_int = int(rank_change)
return rank[len(rank_change):] if rank_change_int > 0 else rank
except ValueError:
return rank
def scrape_data(driver, category):
driver.get("https://cryptobubbles.net")
# Find and click the category button
top_button = driver.find_element(By.CLASS_NAME, 'select-button')
top_button.click()
# Wait for the popup to appear and find the specific category button
category_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, f'//button[text()="{category}"]')))
category_button.click()
# Wait for the table to load
time.sleep(5)
# Keep clicking the "Show More" button until it disappears
while True:
try:
show_more_button = driver.find_element(By.XPATH, '//button[text()="Show More"]')
show_more_button.click()
time.sleep(2) # wait for the table to expand
except:
break # the "Show More" button is no longer present
# Parse the table with BeautifulSoup
soup = BeautifulSoup(driver.page_source, 'html.parser')
table = soup.find('table')
# Extract the table headers
headers = [th.text for th in table.find_all('th')]
headers = headers[1:] # remove the first header
headers = ['Rank', 'Rank Change'] + headers # add the rank change header
# Extract the table rows
rows = []
for tr in table.find_all('tr'):
cells = tr.find_all('td')
if cells:
row = []
for cell in cells:
if cell.find(class_='currency-rank'):
rank = cell.find(class_='currency-rank').text
rank_change = cell.find(class_='currency-rank-change')
raw_rank_change = ''
if rank_change:
raw_rank_change = rank_change.text
rank_change = '-' + rank_change.text if 'rgb(255, 102, 102)' in rank_change['style'] else '+' + rank_change.text
else:
rank_change = '0'
nrank = remove_rank_change_from_start(rank, raw_rank_change)
row.append(nrank)
row.append(rank_change)
else:
row.append(cell.text)
rows.append(row)
# Create a pandas dataframe
df = pd.DataFrame(rows, columns=headers)
return df
# Function to convert shorthand notations (B and M) to full numeric values
def convert_shorthand_notation(value):
if value.endswith('B'):
return "{:,.0f}".format(float(value[:-1]) * 1e9)
elif value.endswith('M'):
return "{:,.0f}".format(float(value[:-1]) * 1e6)
else:
return value
def remove_percentage_sign(value):
try:
# Attempt to remove any commas and percentage signs before conversion
return float(value.replace('%', '').replace(',', '').replace('+', ''))
except Exception as e:
# Return the original value if conversion fails
return value
def format_dataframe(dff):
df = dff.copy()
#Remove the last column
df = df.iloc[:, :-1]
# Update column names to reflect percentage where needed
df.columns = ['Rank', 'Rank Change', 'Name', 'Price', 'Market Cap', '24h Volume', 'Hour Change (%)', 'Day Change (%)', 'Week Change (%)', 'Month Change (%)', 'Year Change (%)']
# Convert shorthand notations and percentages
for column in ['Market Cap', '24h Volume']:
df[column] = df[column].apply(lambda x: x.replace('$', '').replace(',', '')).apply(convert_shorthand_notation)
for column in ['Hour Change (%)', 'Day Change (%)', 'Week Change (%)', 'Month Change (%)', 'Year Change (%)']:
df[column] = df[column].apply(remove_percentage_sign)
df['Price'] = df['Price'].apply(lambda x: x.replace('$', '').replace(',', '')).apply(lambda x: "{:,.2f}".format(float(x)))
for column in ['Hour Change (%)', 'Day Change (%)', 'Week Change (%)', 'Month Change (%)', 'Year Change (%)']:
df[column] = df[column].apply(remove_percentage_sign)
return df
def insert_into_mongodb(df):
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['Crypto']
if "CryptoTop1000" not in db.list_collection_names():
db.create_collection("CryptoTop1000", timeseries={'timeField': 'timestamp', 'metaField': 'metadata', 'granularity': 'hours'})
df['timestamp'] = datetime.now()
data_dict = df.to_dict("records")
db['CryptoTop1000'].insert_many(data_dict)
print("Data inserted into MongoDB.")
except Exception as e:
print(f"Error inserting into MongoDB: {e}")
def run_scraping_process():
driver = initialize_driver()
try:
categories = ["1 - 100", "101 - 200", "201 - 300", "301 - 400", "401 - 500", "501 - 600", "601 - 700", "701 - 800", "801 - 900", "901 - 1000"]
all_data = pd.DataFrame()
for category in categories:
category_data = scrape_data(driver, category)
if not category_data.empty:
all_data = pd.concat([all_data, category_data], ignore_index=True)
formatted_df = format_dataframe(all_data)
insert_into_mongodb(formatted_df)
except Exception as e:
print(f"Unhandled error in run_scraping_process: {traceback.format_exc()}")
finally:
driver.quit()
def main():
schedule.every(24).hours.do(run_scraping_process)
while True:
schedule.run_pending()
time.sleep(1)
def main2():
run_scraping_process()
if __name__ == "__main__":
main()