-
Notifications
You must be signed in to change notification settings - Fork 0
/
hybrid_(con_attivazione_ponderata).py
325 lines (254 loc) · 13.4 KB
/
hybrid_(con_attivazione_ponderata).py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# -*- coding: utf-8 -*-
"""Hybrid_(con attivazione ponderata).ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1Qxf9tmoudBlOXg8XJ65lMIBx_EOK0M9J
"""
import pandas as pd
import datetime
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Activation, Attention, Concatenate, Conv1D, Dense, Dropout, BatchNormalization, Layer, LayerNormalization, MultiHeadAttention, Add, GlobalAveragePooling1D, Bidirectional, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers
from copy import deepcopy
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.optimizers.schedules import ExponentialDecay
import tensorflow as tf
from tensorflow.keras.utils import plot_model
# Definizione della funzione per convertire una stringa in un oggetto datetime
def str_to_datetime(s):
split = s.split('-')
year, month, day = int(split[0]), int(split[1]), int(split[2])
return datetime.datetime(year=year, month=month, day=day)
# Caricamento dei dati dai file CSV
df = pd.read_csv('AAPL.csv')
#df = pd.read_csv('AMZN.csv')
#df = pd.read_csv('GOOG.csv')
#df = pd.read_csv('MSFT.csv')
# Selezione delle colonne di interesse (Date, Close, Open, High, Low, Volume)
df = df[['Date', 'Close', 'Open', 'High', 'Low', 'Volume']]
# Conversione delle date da stringa a oggetti datetime
df['Date'] = df['Date'].apply(str_to_datetime)
df.index = df.pop('Date')
# Rimozione delle righe con dati mancanti o nulli
df.dropna(inplace=True)
# Normalizzazione dei dati utilizzando la tecnica Min-Max
scaler = MinMaxScaler()
df[['Close', 'Open', 'High', 'Low', 'Volume']] = scaler.fit_transform(df[['Close', 'Open', 'High', 'Low', 'Volume']])
# Creazione di input (X) e target (y) dai dati
X = df[['Close', 'Open', 'High', 'Low', 'Volume']].values
y = df[['Close', 'Open', 'High', 'Low', 'Volume']].values # Target multi-output
# Calcolo delle dimensioni dei set
train_size = int(len(X) * 0.8)
val_size = int(train_size * 0.2) # 20% del training set
# Divisione dei dati in set di addestramento, validazione e test
X_train, y_train = X[:train_size - val_size], y[:train_size - val_size]
X_val, y_val = X[train_size - val_size:train_size], y[train_size - val_size:train_size]
X_test, y_test = X[train_size:], y[train_size:]
# Aggiungi una dimensione temporale fittizia così X_train_3d, X_val_3d e X_test_3d hanno la forma (1006, 1, 5)
X_train_3d = X_train[:, np.newaxis, :]
X_val_3d = X_val[:, np.newaxis, :]
X_test_3d = X_test[:, np.newaxis, :]
# Aggiungi una dimensione temporale anche per i target
y_train_3d = y_train[:, np.newaxis, :]
y_val_3d = y_val[:, np.newaxis, :]
y_test_3d = y_test[:, np.newaxis, :]
# Definizione dello strato di Self-Attention
class SelfAttention(Layer):
def __init__(self, num_heads, head_dim, **kwargs):
super(SelfAttention, self).__init__(**kwargs)
self.num_heads = num_heads
self.head_dim = head_dim
def build(self, input_shape):
self.W_q = self.add_weight("W_q", shape=(input_shape[-1], self.num_heads * self.head_dim))
self.W_k = self.add_weight("W_k", shape=(input_shape[-1], self.num_heads * self.head_dim))
self.W_v = self.add_weight("W_v", shape=(input_shape[-1], self.num_heads * self.head_dim))
def call(self, inputs):
q = tf.matmul(inputs, self.W_q)
k = tf.matmul(inputs, self.W_k)
v = tf.matmul(inputs, self.W_v)
q = tf.reshape(q, (-1, tf.shape(q)[1], self.num_heads, self.head_dim))
k = tf.reshape(k, (-1, tf.shape(k)[1], self.num_heads, self.head_dim))
v = tf.reshape(v, (-1, tf.shape(v)[1], self.num_heads, self.head_dim))
attention_scores = tf.matmul(q, k, transpose_b=True)
attention_scores = tf.nn.softmax(attention_scores, axis=-1)
output = tf.matmul(attention_scores, v)
output = tf.reshape(output, (-1, tf.shape(output)[1], self.num_heads * self.head_dim))
return output
# Dimensioni dei dati
num_features = 5 # Numero di features nel dataset
num_heads = 8 # Numero di teste per l'attenzione multi-head
head_dim = 64 # Dimensione di ogni testa di attenzione
from tensorflow.keras.layers import Multiply, Add
# Definisci i modelli Bi-LSTM e Transformer separatamente
# Creazione del modello Sequential
bi_lstm_attention_model = Sequential(name="bilstm_model")
# Aggiunta degli strati al modello
bi_lstm_attention_model.add(Input(shape=(None, 5)))
bi_lstm_attention_model.add(Activation('relu'))
bi_lstm_attention_model.add(Bidirectional(LSTM(512, return_sequences=True)))
bi_lstm_attention_model.add(Dropout(0.2))
bi_lstm_attention_model.add(Activation('relu'))
bi_lstm_attention_model.add(Bidirectional(LSTM(256, return_sequences=True)))
bi_lstm_attention_model.add(Dropout(0.2))
bi_lstm_attention_model.add(SelfAttention(num_heads=num_heads, head_dim=head_dim))
bi_lstm_attention_model.add(Dense(512, activation='relu'))
bi_lstm_attention_model.add(Dense(256, activation='relu'))
bi_lstm_attention_model.add(Dense(128, activation='relu'))
bi_lstm_attention_model.add(Dense(64, activation='relu'))
bi_lstm_attention_model.add(Dense(32, activation='relu', kernel_regularizer=l2(0.005)))
bi_lstm_attention_model.add(Dense(5))
# Compilazione del modello
bi_lstm_attention_model.compile(loss='mse', optimizer=Adam(learning_rate=0.000015), metrics=['mean_absolute_error'])
# Creazione del modello Sequential
transformer_model = Sequential(name="transformer_model")
# Aggiunta degli strati al modello
input_layer = Input(shape=(None, num_features)) # Adatta la forma all'intero dataset
# Primo encoder
encoder_1 = SelfAttention(num_heads=num_heads, head_dim=head_dim)(input_layer)
encoder_1 = Dense(128, activation='relu')(encoder_1)
encoder_1 = Dense(128, activation='relu')(encoder_1)
# Secondo encoder
encoder_2 = SelfAttention(num_heads=num_heads, head_dim=head_dim)(encoder_1)
encoder_2 = Dense(64, activation='relu')(encoder_2)
encoder_2 = Dense(64, activation='relu')(encoder_2)
# Concatena l'output dei due encoder
concatenated_output = Concatenate()([encoder_1, encoder_2])
# Output layer
output_layer = Dense(num_features)(concatenated_output)
# Aggiungi l'output layer al modello
transformer_model = tf.keras.Model(inputs=input_layer, outputs=output_layer, name='Transformer')
# Creazione del modello con self-attention e più strati
transformer_model.compile(loss='mse',
optimizer=Adam(learning_rate=0.00001),
metrics=['mean_absolute_error'])
# Definisci gli input per i due modelli
input_layer = Input(shape=(None, 5))
# Ottieni le uscite dai modelli Bi-LSTM e Transformer
bi_lstm_output = bi_lstm_attention_model(input_layer)
transformer_output = transformer_model(input_layer)
# Calcola i pesi dell'attenzione
attention_weights = Dense(1, activation='sigmoid')(bi_lstm_output)
# Moltiplica le uscite dei modelli per i pesi dell'attenzione
weighted_bi_lstm_output = Multiply()([bi_lstm_output, attention_weights])
weighted_transformer_output = Multiply()([transformer_output, 1 - attention_weights])
# Combina le uscite pesate
combined_outputs = Add()([weighted_bi_lstm_output, weighted_transformer_output])
# Aggiungi strati densi per elaborare le uscite combinate
combined_outputs = Dense(256, activation='relu')(combined_outputs)
combined_outputs = Dense(128, activation='relu')(combined_outputs)
combined_outputs = Dense(64, activation='relu')(combined_outputs)
combined_outputs = Dense(5)(combined_outputs)
# Crea il modello ibrido
hybrid_model = Model(inputs=input_layer, outputs=combined_outputs)
# Stampa un riassunto del modello
hybrid_model.summary()
# Compila il modello ibrido
hybrid_model.compile(loss='mse', optimizer=Adam(learning_rate=0.00001), metrics=['mean_absolute_error'])
# Aggiunta di un callback per ridurre il tasso di apprendimento su plateau
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=10, verbose=1, min_lr=1e-6)
callbacks = [lr_scheduler]
# Esempio di utilizzo nei fit e predict del modello
history = hybrid_model.fit(X_train_3d, y_train_3d, validation_data=(X_val_3d, y_val_3d),
epochs=150,
batch_size=64,
callbacks=callbacks,
verbose=1)
predictions_val = hybrid_model.predict(X_val_3d)
predictions_test = hybrid_model.predict(X_test_3d)
# Valuta il modello sull'insieme di test
test_loss, test_mae = hybrid_model.evaluate(X_test_3d, y_test_3d, verbose=0)
# Riduci le dimensioni di y_test_3d a (1006, 5)
y_test_2d = y_test_3d.reshape(-1, 5)
# Rimuovi il terzo asse dalle previsioni
predictions_test_squeezed = np.squeeze(predictions_test)
# Calcola il RMSE
rmse = np.sqrt(mean_squared_error(y_test_2d, predictions_test_squeezed))
# Calcola il R-squared (R²)
r2 = r2_score(y_test_2d, predictions_test_squeezed)
print(f'Test Loss: {test_loss}')
print(f'Test Mean Absolute Error (MAE): {test_mae}')
print(f'Test Root Mean Squared Error (RMSE): {rmse}')
print(f'Test R-squared (R²): {r2}')
# Creazione dei grafici delle learning curve
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.subplot(1, 2, 2)
plt.plot(history.history['mean_absolute_error'], label='Training MAE')
plt.plot(history.history['val_mean_absolute_error'], label='Validation MAE')
plt.xlabel('Epoch')
plt.ylabel('Mean Absolute Error')
plt.legend()
plt.title('Training and Validation Mean Absolute Error')
plt.tight_layout()
plt.show()
# Trova l'indice dell'epoca con la minima val_loss e loss
best_epoch_val = np.argmin(history.history['val_loss'])
best_epoch = np.argmin(history.history['loss'])
# Trova il valore minimo della val_loss e loss
best_val_loss = history.history['val_loss'][best_epoch_val]
best_loss = history.history['loss'][best_epoch]
# Stampa il risultato
print(f'Best Validation Loss: {best_val_loss:.4f} at epoch {best_epoch_val + 1}')
print(f'Best Training Loss: {best_loss:.4f} at epoch {best_epoch + 1}')
feature_names = ['Close', 'Open', 'High', 'Low'] # Escludi la feature 'Volume'
plt.figure(figsize=(12, 24)) # Imposta la dimensione complessiva del grafico
total_train_samples = len(y_train)
total_val_samples = len(y_val)
total_test_samples = len(y_test)
# Estrai le date dei dati di addestramento e validazione
dates_train = df.index[:total_train_samples]
dates_val = df.index[total_train_samples:total_train_samples + total_val_samples]
dates_test = df.index[total_train_samples + total_val_samples:]
# Creazione dei grafici delle previsioni per ciascuna feature
for feature_index, feature_name in enumerate(feature_names):
plt.figure(figsize=(12, 4))
# Estrai i dati reali e le previsioni per la caratteristica specifica
y_train_feature = y_train_3d[:, 0, feature_index]
y_val_feature = y_val_3d[:, 0, feature_index]
predictions_val_feature = predictions_val[:, 0, feature_index]
y_test_feature = y_test_3d[:, 0, feature_index]
predictions_test_feature = predictions_test[:, 0, feature_index]
plt.plot(dates_train, y_train_feature, color='blue', label=f'Dati Reali di Training {feature_name}', linewidth=2.5)
plt.plot(dates_val, y_val_feature, color='orange', label=f'Dati Reali di Validazione {feature_name}', linewidth=2.5)
plt.plot(dates_val, predictions_val_feature, color='red', label=f'Previsioni di Validazione {feature_name}', linewidth=2.5)
plt.plot(dates_test, y_test_feature, color='yellow', label=f'Dati Reali di Test {feature_name}', linewidth=2.5)
plt.plot(dates_test, predictions_test_feature, color='green', label=f'Previsioni di Test {feature_name}', linewidth=2.5)
plt.xlabel('Data')
plt.ylabel('Valore')
plt.title(f'Confronto tra Previsioni e Dati Reali di {feature_name}')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# Crea un DataFrame da y_val con nomi di colonne appropriati
y_val_df = pd.DataFrame(y_val_3d[:, 0, :], columns=['Actual Close', 'Actual Open', 'Actual High', 'Actual Low', 'Actual Volume'])
print(y_val_df)
# Crea un nuovo DataFrame con le colonne di previsione
predictions_val_df = pd.DataFrame(predictions_val[:, 0, :], columns=['Prediction Close', 'Prediction Open', 'Prediction High', 'Prediction Low', 'Prediction Volume'])
print(predictions_val_df)
# Crea un DataFrame da y_val con nomi di colonne appropriati
y_test_df = pd.DataFrame(y_test_3d[:, 0, :], columns=['Actual Close', 'Actual Open', 'Actual High', 'Actual Low', 'Actual Volume'])
print(y_test_df)
# Crea un nuovo DataFrame con le colonne di previsione
predictions_test_df = pd.DataFrame(predictions_test[:, 0, :], columns=['Prediction Close', 'Prediction Open', 'Prediction High', 'Prediction Low', 'Prediction Volume'])
print(predictions_test_df)
# Unisci i due DataFrame in uno solo
combined_df = pd.concat([y_val_df, predictions_val_df], axis=1)
# Stampa il DataFrame risultante
#print(combined_df)
#--------------------stampare un grafico del modello-------------------------------------------------
plot_model(hybrid_model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)