Skip to content

Commit

Permalink
Class data_transfer added to objects.py and dockerfile modified
Browse files Browse the repository at this point in the history
  • Loading branch information
JC3008 committed Dec 12, 2023
1 parent a1f9b41 commit 8d5c8a6
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 109 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ RUN pip install --upgrade pip && \
RUN mkdir -p /workspaces/app/
COPY /src /workspaces/app
WORKDIR /workspaces/app

ENV PYTHONPATH=/usr/local/bin/python

ENV PYTHONPATH=/usr/local/lib/python3.9/dist-packages
# ENV PYTHONPATH=/usr/local/bin/python

CMD ["python"]

Expand Down
20 changes: 14 additions & 6 deletions dadoseconomicos.log
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
URL and headers defined - Fundamentus_ELT - fundamentus_extract.py - 2023-12-11 19:10:56,576
Parsing file with BeautifulSoup - Fundamentus_ELT - fundamentus_extract.py - 2023-12-11 19:10:58,703
Iterating over php table rows - Fundamentus_ELT - fundamentus_extract.py - 2023-12-11 19:11:05,899
Variable bucket ready - Fundamentus_ELT - fundamentus_extract.py - 2023-12-11 19:11:29,718
Connection ready to be used - Fundamentus_ELT - fundamentus_extract.py - 2023-12-11 19:11:29,872
File was successfully uploaded to s3 bucket - Fundamentus_ELT - fundamentus_extract.py - 2023-12-11 19:11:33,821
ELT process started! Fundamentus_to_landing was triggered - <module> - fundamentus_extract.py - 2023-12-12 19:20:14,759
URL and headers defined - Fundamentus_to_landing - fundamentus_extract.py - 2023-12-12 19:20:14,759
Parsing file with BeautifulSoup - Fundamentus_to_landing - fundamentus_extract.py - 2023-12-12 19:20:16,121
Iterating over php table rows - Fundamentus_to_landing - fundamentus_extract.py - 2023-12-12 19:20:17,462
Variable bucket ready - Fundamentus_to_landing - fundamentus_extract.py - 2023-12-12 19:20:46,777
Connection ready to be used - Fundamentus_to_landing - fundamentus_extract.py - 2023-12-12 19:20:48,593
File was successfully uploaded to s3 bucket - Fundamentus_to_landing - fundamentus_extract.py - 2023-12-12 19:20:51,339
Fundamentus_to_landing finished its data upload to S3 landing bucket - <module> - fundamentus_extract.py - 2023-12-12 19:20:51,344
Fundamentus_to_processed was triggered - <module> - fundamentus_extract.py - 2023-12-12 19:20:51,344
AWS connection was establshed by using the profileadmin for aws_connection class - transfer - objects.py - 2023-12-12 19:20:51,351
Adding new metadata fields - transfer - objects.py - 2023-12-12 19:20:53,170
Dataframe was successfully buffered - transfer - objects.py - 2023-12-12 19:20:53,181
Done! Pipeline ran as ppl_fundamentus_5794e924dffe4638 using tags S3|Fundamentus|B3 - transfer - objects.py - 2023-12-12 19:20:56,114
Fundamentus_to_processed has transfered data from landing to process - <module> - fundamentus_extract.py - 2023-12-12 19:20:56,118
Empty file added fundamentus.log
Empty file.
3 changes: 2 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

Este projeto visa automatizar a extração de dados fundamentalistas de empresas listadas na B3. Para padronizar a integração de código será usado Git Actions para criação de Pipelines CI/CD.

docker build -t python_fundamentus:latest .

docker run python_fundamentus:latest python3.9 /workspaces/app/fundamentus_extract.py
pipeline de integração contínua

CI (Continuous Integration)
integração de código visando agregar novas features de forma padronizada e automática.
etapas envolvidas (codificação, commit, build, teste, geração de pacote)


pipeline de entrega contínua
CD (Continuous Deployment)
etapas envolvidas (release, teste, aceite, deploy)
Expand Down
Empty file added src/__init__.py
Empty file.
Binary file modified src/__pycache__/objects.cpython-39.pyc
Binary file not shown.
4 changes: 4 additions & 0 deletions src/dadoseconomicos.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
AWS connection was establshed by using the profileadmin for aws_connection class - transfer - <ipython-input-1-6c36e2c2e6e7> - 2023-12-12 19:01:39,015
Adding new metadata fields - transfer - <ipython-input-1-6c36e2c2e6e7> - 2023-12-12 19:01:41,410
Dataframe was successfully buffered - transfer - <ipython-input-1-6c36e2c2e6e7> - 2023-12-12 19:01:41,420
Done! Pipeline ran as ppl_fundamentus_fdf4b06b841e43da using tags S3|Fundamentus|B3 - transfer - <ipython-input-1-6c36e2c2e6e7> - 2023-12-12 19:01:43,872
6 changes: 6 additions & 0 deletions src/fundamentus.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
URL and headers defined - Fundamentus_ELT - <ipython-input-1-5be7b961a443> - 2023-12-12 15:04:06,617
Parsing file with BeautifulSoup - Fundamentus_ELT - <ipython-input-1-5be7b961a443> - 2023-12-12 15:04:07,487
Iterating over php table rows - Fundamentus_ELT - <ipython-input-1-5be7b961a443> - 2023-12-12 15:04:08,461
Variable bucket ready - Fundamentus_ELT - <ipython-input-1-5be7b961a443> - 2023-12-12 15:04:29,908
Connection ready to be used - Fundamentus_ELT - <ipython-input-1-5be7b961a443> - 2023-12-12 15:04:29,913
File was successfully uploaded to s3 bucket - Fundamentus_ELT - <ipython-input-1-5be7b961a443> - 2023-12-12 15:04:32,255
23 changes: 22 additions & 1 deletion src/fundamentus_extract.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# importing libraries
import sys
sys.path.append("/usr/local/lib/python3.9/dist-packages")
from bs4 import *
import pandas as pd
from urllib.request import Request, urlopen
Expand Down Expand Up @@ -40,7 +42,7 @@
format="%(message)s - %(funcName)s - %(filename)s - %(asctime)s"
)

def Fundamentus_ELT():
def Fundamentus_to_landing():

'''
Expand Down Expand Up @@ -100,4 +102,23 @@ def Fundamentus_ELT():

return logging.info('File was successfully uploaded to s3 bucket')

def Fundamentus_to_processed():
objects.data_transfer(source='landing',
target='processed',
provider='s3',
profile='admin',
file=pd.DataFrame(),
pipeline='ppl_fundamentus').transfer()

if __name__ == "__main__":
logging.info("ELT process started! Fundamentus_to_landing was triggered")
Fundamentus_to_landing()
logging.info("Fundamentus_to_landing finished its data upload to S3 landing bucket")
logging.info("Fundamentus_to_processed was triggered")
Fundamentus_to_processed()
logging.info("Fundamentus_to_processed has transfered data from landing to process")





221 changes: 122 additions & 99 deletions src/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
sys.path.append(r'C:\Users\SALA443\Desktop\Projetos\Dados_B3 - teste\CVM\airflow')
sys.path.append(r"C:\Users\SALA443\Desktop\Projetos\Dados_B3 - teste\Transform\CVM_env\Lib\site-packages")
# print (sys.path)
import datetime
from datetime import date
import datetime as dt
from datetime import datetime,date,time
import zipfile
import requests
from bs4 import BeautifulSoup
Expand All @@ -16,16 +16,26 @@
from pathlib import Path
from dotenv import load_dotenv
import io
import uuid


dotenv_path = Path(r'/workspaces/DadosAbertosGOV/src/.env')
dotenv_path = Path(r'/workspaces/app/.env')
load_dotenv(dotenv_path=dotenv_path)
print(dotenv_path)
# print(dotenv_path)

logging.basicConfig(

level=logging.INFO,
handlers=[logging.FileHandler("dadoseconomicos.log", mode='w'),logging.StreamHandler()],
format="%(message)s - %(funcName)s - %(filename)s - %(asctime)s"
)

today = datetime.date.today()
today = dt.date.today()
'''output example: 2023/11/23/'''

sep = {'standard':';','altered':','}
encoding = {'standard':'utf-8','altered':'Windows-1252'}

tags = {'ppl_fundamentus':'S3|Fundamentus|B3'}

'''
This script aims to build the local enviroment to receive files from
Expand Down Expand Up @@ -54,7 +64,11 @@ def daily(self) -> str:
def __str__(self) -> str:
return f"{self.year}/{self.month}/{self.day}/"

YearMonthDateFolder = folderpath(year = str(today.year),month = str(today.month).zfill(2),day = str(today.day).zfill(2))
YearMonthDateFolder = folderpath(
year = str(today.year),
month = str(today.month).zfill(2),
day = str(today.day).zfill(2)
)

class s3path():
"""This object builds folder structure of S3 datalake
Expand Down Expand Up @@ -117,8 +131,13 @@ def fullpath(self) -> str:
return "caminho inexistente"

class aws_connection():
'''
This class performs the aws connection, by fetching
credentials within .env file.
'''
def __init__(self,profile:str):
self.profile = profile

@property
def account(self) -> dict:

Expand All @@ -133,11 +152,13 @@ def account(self) -> dict:

# Variable credentials receives the attributes of aws_connection class
# which gets the credentials of .env file
credentials = aws_connection(profile='admin').account
client = boto3.client('s3',
aws_access_key_id=credentials['aws_access_key_id'],
aws_secret_access_key=credentials['aws_secret_access_key']
)
# credentials = aws_connection(profile='admin').account
# client = boto3.client('s3',
# aws_access_key_id=credentials['aws_access_key_id'],
# aws_secret_access_key=credentials['aws_secret_access_key']
# )

# credentials = aws_connection(profile='admin').account

class folder_builder():
'''
Expand Down Expand Up @@ -176,105 +197,107 @@ def storage_selector(self) -> dict:
'target_bucket':f'de-okkus-{self.targetlayer}-dev-727477891012',
'key':f'{YearMonthDateFolder}'}





# def transfer(self):

# bucket_x_key = files_IO(storageOption=self.storageOption,sourcelayer=self.sourcelayer,targetlayer=self.targetlayer).sourcelayerMode
# Bucket = bucket_x_key['Bucket']
# Key = bucket_x_key['Key']
# data = client.get_object(Bucket=Bucket,Key=Key)
class data_transfer():
bucket = dict
identity = str
def __init__(self,
source:str,
target:str,
provider:str,
profile:str,
file:object,
pipeline=str,
pipeline_id=None,
tag=None):

# df = pd.read_csv(data['Body'],sep=';',encoding='utf-8')
# df[f'loaded_to{self.sourcelayer}_date'] = date.today()
# df[f'loaded_to{self.sourcelayer}_time'] = datetime.now().time()
# buffer = io.StringIO()
self.source = source
self.target = target
self.provider = provider
self.profile = profile
self.file = file
self.pipeline = pipeline
self.pipeline_id = pipeline_id
self.tag = tag

# bucket_x_key = files_IO(storageOption=self.storageOption,sourcelayer=self.sourcelayer,targetlayer=self.targetlayer).targetlayer
# Bucket = bucket_x_key['Bucket']
# Key = bucket_x_key['Key']
# df.to_csv(buffer,encoding='utf-8',sep=';')

# client.put_object(
# ACL='private',
# Body=buffer.getvalue(),
# Bucket=f'de-okkus-{self.targetlayer}-dev-727477891012',
# Key=f'{YearMonthDateFolder}{self.filename}')

# x = files_IO(
# sourcelayer='landing',
# targetlayer='processed',
# filename='cad_cia_aberta.csv',
# storageOption='s3').transfer

# files_IO.transfer()
# bucketKey = files_IO(storageOption='s3',layer='processed').layerMode
# print(bucketKey['Bucket'])
# print(bucketKey['Key'])


@property
def path(self) -> dict:

bucket = folder_builder(
sourcelayer=self.source,
targetlayer=self.target,
storageOption=self.provider).storage_selector

return bucket


def transfer(self):
'''
This class performs data transfer between S3 buckets and add metadata fields in
order to better management of the dataflow. The goal of that feature is to provide
agility whenever debugging is neeeded.
Four field are added by default:
identity: Field which contains the pipeline name and pipeline_id
loaded_{*Target bucket name}_date: Date of data uploading
loaded_{*Target bucket name}_time: Time of data uploading
tags: Tags that are binded to pipeline name
To perform the transfer method, write as bellow:
# credentials = aws_connection(profile="admin").account
# # url = 'https://dados.cvm.gov.br/dados/CIA_ABERTA/CAD/DADOS/cad_cia_aberta.csv'
# df = pd.read_csv(self.source,sep=';',encoding='Windows-1252')
# buffer = io.StringIO()
# df.to_csv(buffer,encoding='utf-8',sep=';')
# # rawData = io.StringIO(urlData.decode('ISO 8859-1'))
# client = boto3.client('s3',aws_access_key_id=credentials['aws_access_key_id'],
# aws_secret_access_key=credentials['aws_secret_access_key']
# )

# client.put_object(ACL='private',
# Body=buffer.getvalue(),
# Bucket='de-okkus-landing-dev-727477891012',
# Key=f'{self.folderStructure}cad_cia_aberta.csv')
# @property
# def conn(self):
# return aws_connection(os.getenv("aws_access_key_id"),os.getenv("aws_secret_access_key"),os.getenv("aws_region"))
# @property
# def credentials(self):
data_transfer(source='landing',
target='processed',
provider='s3',
profile='admin',
file=pd.DataFrame(),
pipeline='ppl_fundamentus').transfer()
This function will get fundametus.csv file from S3 source to target and also will
add the four metadata fields mentioned before.
'''

credentials = aws_connection(profile=self.profile).account
client = boto3.client(self.provider,
aws_access_key_id=credentials['aws_access_key_id'],
aws_secret_access_key=credentials['aws_secret_access_key']
)

# self.aws_access_key_id = os.getenv("aws_access_key_id"),
# self.aws_secret_access_key = os.getenv("aws_secret_access_key"),
# self.aws_region = os.getenv("aws_region")
logging.info(f"AWS connection was establshed by using the profile{self.profile} for aws_connection class")

# return self.aws_access_key_id

# ends settings from folder structure and s3 buckets path
# credentials = aws_connection(profile="admin").account
# print (credentials['aws_access_key_id'])
# print (credentials['aws_region'])
bucket = data_transfer(
source=self.source,
target=self.target,
provider=self.provider,
profile=self.profile,
file=pd.DataFrame(),
pipeline_id=str()
).path

key = f'{YearMonthDateFolder}fundamentus_historico.csv'

# def FindDREfiles(param:str) -> list:
# """
# Finds and Lists all files that matches with DRE
# """
# FileList = []
# for file in os.listdir(ProcessedZone):
data = client.get_object(Bucket=bucket['source_bucket'],Key=key)
self.file = pd.read_csv(data["Body"],sep=sep['standard'],encoding=encoding['standard'])

# if re.findall(param,file):

# FileList.append(file)

# return FileList
# adding new fields
# self.pipeline = 'ppl_intra_s3'
self.pipeline_id = uuid.uuid4().hex[:16]
identity = f"{self.pipeline}_{self.pipeline_id}"

logging.info(f"Adding new metadata fields")
self.file['identity'] = identity
self.file[f"loaded_{bucket['target_bucket']}_date"] = date.today()
self.file[f"loaded_{bucket['target_bucket']}_time"] = datetime.now().time()
self.file['tags'] = tags[self.pipeline]

buffer = io.StringIO()
self.file.to_csv(buffer,sep=sep['standard'],encoding=encoding['standard'],index=None)
logging.info(f"Dataframe was successfully buffered")

client.put_object(ACL='private',
Body=buffer.getvalue(),
Bucket=bucket['target_bucket'],
Key=f'{YearMonthDateFolder}fundamentus_historico.csv')
logging.info(f"Done! Pipeline ran as {identity} using tags {tags[self.pipeline]}")

# def ReadDREFilesTransformAndSaveAsParquet():

# fileList = FindDREfiles('DRE')

# for file in fileList[:]:
# foldername = f"{str(file).strip('.csv')}/"
# schema = dre
# df = pd.read_csv(f'{ProcessedZone}{file}',delimiter=';',encoding='ISO-8859-1')
# df['Transform_Date'] = pd.to_datetime('today').to_datetime64()
# os.makedirs(f"{ConsumeZone}{foldername}",exist_ok=True)
# file = str(file).strip('.csv')
# df.to_parquet(f"{ConsumeZone}{foldername}{file}_{pd.to_datetime('today').date()}.parquet",engine='pyarrow',partition_cols='DT_INI_EXERC')
# file = None
# return None

0 comments on commit 8d5c8a6

Please sign in to comment.