我有一个包含date和price列的时间序列。我使用的是LSTM,当我对数据进行回测时,它可以很好地拟合图形。
现在我想预测从今天开始的下一个滚动X天,也就是数据不存在的地方。我找不到这样做的方法。我看到的每一个例子,这些例子都只展示了对历史数据的测试。
我是否仍然需要将数据拆分为测试和训练?我不确定我在model.predict(?)中投入了什么才能给我一个对未来的预测。
import pandas as pd
from datetime import date
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM,Dropout,Dense
import datetime
import math
from datetime import datetime, timedelta
from sklearn import model_selection
from sklearn.metrics import mean_squared_error
plt.style.use('fivethirtyeight')
def create_dataset(dataset, look_back=1):
dataX, dataY = [], []
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), 0]
dataX.append(a)
dataY.append(dataset[i + look_back, 0])
return np.array(dataX), np.array(dataY)
def createForecastDF(num_days,COLUMN_NAME):
datelist = pd.date_range(datetime.today(), periods=num_days).date.tolist()
datelist = [date_obj.strftime('%d/%m/%Y') for date_obj in datelist]
for i in range (0,len(datelist)):
datelist[i] = [datelist[i],1]
forecast_df = pd.DataFrame(datelist, columns = ['Date',COLUMN_NAME])
forecast_df = forecast_df.set_index('Date')
return forecast_df
def back_test(data,look_back):
# Apply scaler
scaler = MinMaxScaler(feature_range = (0,1))
dataset = scaler.fit_transform(data)
# Split Data into train and test
training_set, testing_set = model_selection.train_test_split(dataset, test_size=0.33, shuffle=False)
# reshape into X=t and Y=t+1
trainX, trainY = create_dataset(training_set, look_back)
testX, testY = create_dataset(testing_set, look_back)
# reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1]))
testX = np.reshape(testX, (testX.shape[0], 1, testX.shape[1]))
model = Sequential() h
model.add(LSTM(4, input_shape=(1, look_back)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, epochs=100, batch_size=1, verbose=2)
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)
# invert predictions
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform([trainY])
testPredict = scaler.inverse_transform(testPredict)
testY = scaler.inverse_transform([testY])
# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(trainY[0], trainPredict[:,0]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(testY[0], testPredict[:,0]))
print('Test Score: %.2f RMSE' % (testScore))
trainPredictPlot = np.empty_like(dataset)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
# shift test predictions for plotting
testPredictPlot = np.empty_like(dataset)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(trainPredict)+(look_back*2)+1:len(dataset)-1, :] = testPredict
# plot baseline and predictions
plt.plot(scaler.inverse_transform(dataset))
plt.plot(trainPredictPlot)
plt.plot(testPredictPlot)
plt.title("Training Dataset")
plt.show()
def forecast(data,look_back,num_historial_data,df_future):
final_data = data[:num_historial_data]
# Apply scaler
scaler = MinMaxScaler(feature_range = (0,1))
dataset = scaler.fit_transform(final_data)
# reshape into X=t and Y=t+1
trainX, trainY = create_dataset(dataset, look_back)
# reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1]))
model = Sequential()
model.add(LSTM(4, input_shape=(1, look_back)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, epochs=100, batch_size=1, verbose=2)
trainPredict = model.predict(trainX)
# invert predictions
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform([trainY])
COLUMN_NAME = "Close"
# Make sure your data frame has only one column beside index [Date, col]
historical_df = pd.read_csv('dataset.csv', index_col='Date')
historical_df = historical_df[[COLUMN_NAME]]
# Variables
num_historial_data = len(historical_df)
forecast_length = 26
today = date.today()
# 1. Creating future dates df
df_future = createForecastDF(100,COLUMN_NAME)
# 2. Merge dataframes together
dataframe = historical_df.append(df_future)
# 3.Back Test Data
back_test(dataframe[:num_historial_data], forecast_length)
# 4. Forecast Data
#forecast(dataframe, forecast_length,num_historial_data,df_future)发布于 2021-01-13 22:00:58
history = model.fit(X_train, y_train,
validation_data=(X_valid, y_valid),
epochs=100,
callbacks=tf.keras.callbacks.EarlyStopping(patience=5))
model.evaluate(X_valid, y_valid)
#n_steps length of your time series/day
X_new, Y_new = series[:, :n_steps], series[:, n_steps:]
X = X_new
for step_ahead in range(10):
y_pred_one = model.predict(X[:, step_ahead:])[:, np.newaxis,:]
X = np.concatenate([X,y_pred_one], axis=1)
Y_pred = X[:, n_steps:]
# the next 10 values predicted, using the last val. predicted as input
https://stackoverflow.com/questions/65702977
复制相似问题