Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
# get data
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler,MinMaxScaler
raw_data = pd.read_pickle("./data/raw_feature.pkl")
X_train = pd.read_pickle("./data/X_train.pkl")
y_train = pd.read_pickle("./data/y_train.pkl")
X_test = pd.read_pickle("./data/X_test.pkl")
y_test = pd.read_pickle("./data/y_test.pkl")
def apply_sin(X,axis,range):
"""Apply sin function on some time features"""
X = X.copy()
X[:,axis,:] = np.sin(X[:,axis,:]*2*np.pi/range)
return X
def apply_cos(X,axis,range):
"""same for cos"""
X = X.copy()
X[:,axis,:] = np.cos(X[:,axis,:]*2*np.pi/range)
return X
def create_rolling_window(matrix,t):
"""This function is used for create X for lstm"""
matrix_shape = matrix.shape
return_length = matrix_shape[0] - t
dataset = tf.data.Dataset.from_tensor_slices(matrix)
windows = dataset.window(t,shift = 1,drop_remainder=True)
windows = windows.take(return_length)
windows = windows.flat_map(lambda window: window.batch(t))
return windows
def create_result_ds(matrix,delay):
"""Creat Y target """
dataset = tf.data.Dataset.from_tensor_slices(matrix)
dataset = dataset.skip(delay)
return dataset
def combine_ds(X_train,ds1,ds2):
"""zip two dataset and returns a batch dataset"""
combined_ds = tf.data.Dataset.zip(((X_train,ds1),ds2))
combined_ds = combined_ds.batch(batch_size=32)
return combined_ds
def gen_train_ds(X_train,y_train,step_len):
X = create_rolling_window(y_train,step_len)
X_train_ds = create_result_ds(X_train,step_len)
y = create_result_ds(y_train,step_len)
train_ds = combine_ds(X_train_ds,X,y)
return train_ds
def gen_test_ds(X_test,y_test,step_len):
X = create_rolling_window(y_test,step_len)
y = create_result_ds(y_test,step_len)
X_test_ds = create_result_ds(X_test,step_len)
test_ds = combine_ds(X_test_ds,X,y)
return test_ds
# preprocess data
"""Normalize the original data"""
std = StandardScaler()
train_shape = y_train.shape
test_shape = y_test.shape
y_train = std.fit_transform(np.reshape(y_train,(-1,y_train.shape[-2]*y_train.shape[-1])))
y_train = y_train.reshape(train_shape)
y_test = std.transform(np.reshape(y_test,(-1,y_test.shape[-2]*y_test.shape[-1])))
y_test = y_test.reshape(test_shape)
X_train = X_train[:,1:,:].astype(np.float64)
X_test = X_test[:,1:,:].astype(np.float64)
X_test_raw = X_test.copy()
X_train = apply_sin(X_train,2,24)
X_test = apply_sin(X_test,2,24)
X_train = X_train/X_train.max()
X_test = X_test/X_test.max()
# gen dataset
time_step = 48
train_ds = gen_train_ds(X_train,y_train,step_len=time_step)
test_ds = gen_test_ds(X_test,y_test,step_len=time_step)
class GenDs(object):
def gen_train(time_step = 48):
train_ds = gen_train_ds(X_train,y_train,step_len=time_step)
return train_ds,std
def gen_test(time_step = 48):
test_ds = gen_test_ds(X_test,y_test,step_len=time_step)
return test_ds
def gen_raw_test(time_step = 48):
test_raw_ds = gen_test_ds(X_test_raw,y_test,step_len=time_step)
return test_raw_ds