Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import pandas as pd | |
| from pytorch_forecasting import TimeSeriesDataSet | |
| from pytorch_forecasting.data import GroupNormalizer | |
| class Energy_DataLoader: | |
| """ | |
| A class for loading and preparing energy consumption data for modeling. | |
| Parameters: | |
| path (str): The path to the data file. | |
| test_dataset_size (int): The size of the test dataset. Defaults to 24. | |
| max_prediction_length (int): The maximum prediction length. Defaults to 24. | |
| max_encoder_length (int): The maximum encoder length. Defaults to 168. | |
| Methods: | |
| load_data(): Loads the energy consumption data from a CSV file. | |
| data_transformation(data): Performs data transformation and preprocessing. | |
| lead(df, lead): Computes the lead of the power usage time series for each consumer. | |
| lag(df, lag): Computes the lag of the power usage time series for each consumer. | |
| select_chunk(data): Selects a subset of the data corresponding to the top 10 consumers. | |
| time_features(df): Extracts time-based features from the data. | |
| data_split(df): Splits the data into training and test datasets. | |
| tft_data(): Prepares the data for training with the Temporal Fusion Transformer (TFT) model. | |
| fb_data(): Prepares the data for training with the Facebook Prophet model. | |
| """ | |
| def __init__(self,path:str,test_dataset_size:int=24, | |
| max_prediction_length:int=24, | |
| max_encoder_length:int=168): | |
| """ | |
| Initialize the Energy_DataLoader class. | |
| Parameters: | |
| path (str): The path to the data file. | |
| test_dataset_size (int): The size of the test dataset. Defaults to 24. | |
| max_prediction_length (int): The maximum prediction length. Defaults to 24. | |
| max_encoder_length (int): The maximum encoder length. Defaults to 168. | |
| """ | |
| self.path=path | |
| self.test_dataset_size=test_dataset_size | |
| self.max_prediction_length=max_prediction_length | |
| self.max_encoder_length=max_encoder_length | |
| def load_data(self): | |
| """ | |
| Load the energy consumption data from a CSV file. | |
| Returns: | |
| data (pandas.DataFrame): The loaded data. | |
| """ | |
| try: | |
| data = pd.read_csv(self.path, index_col=0, sep=';', decimal=',') | |
| print('Load the data sucessfully.') | |
| return data | |
| except: | |
| print("Load the Data Again") | |
| def data_transformation(self,data:pd.DataFrame): | |
| """ | |
| Perform data transformation and preprocessing. | |
| Parameters: | |
| data (pandas.DataFrame): The input data. | |
| Returns: | |
| data (pandas.DataFrame): The transformed data. | |
| """ | |
| data.index = pd.to_datetime(data.index) | |
| data.sort_index(inplace=True) | |
| # resample the data into hr | |
| data = data.resample('1h').mean().replace(0., np.nan) | |
| new_data=data.reset_index() | |
| new_data['year']=new_data['index'].dt.year | |
| data1=new_data.loc[(new_data['year']!=2011)] | |
| data1=data1.set_index('index') | |
| data1=data1.drop(['year'],axis=1) | |
| return data1 | |
| def lead(self,df:pd.DataFrame,lead:int=-1): | |
| """ | |
| Compute the lead of the power usage time series for each consumer. | |
| Parameters: | |
| df (pandas.DataFrame): The input dataframe. | |
| lead (int): The lead time period. Defaults to -1. | |
| Returns: | |
| d_lead (pandas.Series): The lead time series. | |
| """ | |
| d_lead=df.groupby('consumer_id')['power_usage'].shift(lead) | |
| return d_lead | |
| def lag(self,df:pd.DataFrame,lag:int=1): | |
| """ | |
| Compute the lag of the power usage time series for each consumer. | |
| Parameters: | |
| df (pandas.DataFrame): The input dataframe. | |
| lag (int): The lag time period. Defaults to 1. | |
| Returns: | |
| d_lag (pandas.Series): The lag time series. | |
| """ | |
| d_lag=df.groupby('consumer_id')['power_usage'].shift(lag) | |
| return d_lag | |
| def select_chunk(self,data:pd.DataFrame): | |
| """ | |
| Select a subset of the data corresponding to the top 10 consumers. | |
| Parameters: | |
| data (pandas.DataFrame): The input data. | |
| Returns: | |
| df (pandas.DataFrame): The selected chunk of data. | |
| """ | |
| top_10_consumer=data.columns[:10] | |
| # select Chuck of data intially | |
| # df=data[['MT_002','MT_004','MT_005','MT_006','MT_008' ]] | |
| df=data[top_10_consumer] | |
| return df | |
| def time_features(self,df:pd.DataFrame): | |
| """ | |
| Extract time-based features from the data. | |
| Parameters: | |
| df (pandas.DataFrame): The input data. | |
| Returns: | |
| time_df (pandas.DataFrame): The dataframe with time-based features. | |
| earliest_time (pandas.Timestamp): The earliest timestamp in the data. | |
| """ | |
| earliest_time = df.index.min() | |
| print(earliest_time) | |
| df_list = [] | |
| for label in df: | |
| print() | |
| ts = df[label] | |
| start_date = min(ts.fillna(method='ffill').dropna().index) | |
| end_date = max(ts.fillna(method='bfill').dropna().index) | |
| # print(start_date) | |
| # print(end_date) | |
| active_range = (ts.index >= start_date) & (ts.index <= end_date) | |
| ts = ts[active_range].fillna(0.) | |
| tmp = pd.DataFrame({'power_usage': ts}) | |
| date = tmp.index | |
| tmp['hours_from_start'] = (date - earliest_time).seconds / 60 / 60 + (date - earliest_time).days * 24 | |
| tmp['hours_from_start'] = tmp['hours_from_start'].astype('int') | |
| tmp['days_from_start'] = (date - earliest_time).days | |
| tmp['date'] = date | |
| tmp['consumer_id'] = label | |
| tmp['hour'] = date.hour | |
| tmp['day'] = date.day | |
| tmp['day_of_week'] = date.dayofweek | |
| tmp['month'] = date.month | |
| #stack all time series vertically | |
| df_list.append(tmp) | |
| time_df = pd.concat(df_list).reset_index(drop=True) | |
| lead_1=self.lead(time_df) | |
| time_df['Lead_1']=lead_1 | |
| lag_1=self.lag(time_df,lag=1) | |
| time_df['lag_1']=lag_1 | |
| lag_5=self.lag(time_df,lag=5) | |
| time_df['lag_5']=lag_5 | |
| time_df=time_df.dropna() | |
| return time_df,earliest_time | |
| def data_split(self,df:pd.DataFrame): | |
| """ | |
| Split the data into training and test datasets. | |
| Parameters: | |
| df (pandas.DataFrame): The input data. | |
| Returns: | |
| train_dataset (pandas.DataFrame): The training dataset. | |
| test_dataset (pandas.DataFrame): The test dataset. | |
| training (TimeSeriesDataSet): The training dataset for modeling. | |
| validation (TimeSeriesDataSet): The validation dataset for modeling. | |
| """ | |
| ## Train dataset >> train + validation | |
| train_dataset=df.loc[df['date']<df.date.unique()[-self.test_dataset_size:][0]] | |
| ## Test Dataset | |
| test_dataset=df.loc[df['date']>=df.date.unique()[-self.test_dataset_size:][0]] | |
| # training stop cut off | |
| training_cutoff = train_dataset["hours_from_start"].max() - self.max_prediction_length | |
| print('training cutoff ::',training_cutoff) | |
| training = TimeSeriesDataSet( | |
| train_dataset[lambda x: x.hours_from_start <= training_cutoff], | |
| time_idx="hours_from_start", | |
| target="Lead_1", | |
| group_ids=["consumer_id"], | |
| min_encoder_length=self.max_encoder_length // 2, | |
| max_encoder_length=self.max_encoder_length, | |
| min_prediction_length=1, | |
| max_prediction_length=self.max_prediction_length, | |
| static_categoricals=["consumer_id"], | |
| time_varying_known_reals=['power_usage',"hours_from_start","day","day_of_week", | |
| "month", 'hour','lag_1','lag_5'], | |
| time_varying_unknown_reals=['Lead_1'], | |
| target_normalizer=GroupNormalizer( | |
| groups=["consumer_id"], transformation="softplus" # softplus: Apply softplus to output (inverse transformation) and #inverse softplus to input,we normalize by group | |
| ), | |
| add_relative_time_idx=True, # if to add a relative time index as feature (i.e. for each sampled sequence, the index will range from -encoder_length to prediction_length) | |
| add_target_scales=True,# if to add scales for target to static real features (i.e. add the center and scale of the unnormalized timeseries as features) | |
| add_encoder_length=True, # if to add decoder length to list of static real variables. True if min_encoder_length != max_encoder_length | |
| # lags={"power_usage":[12,24]} | |
| ) | |
| validation = TimeSeriesDataSet.from_dataset(training, train_dataset, predict=True, stop_randomization=True) | |
| # create dataloaders for our model | |
| batch_size = 32 | |
| # if you have a strong GPU, feel free to increase the number of workers | |
| train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0) | |
| val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0) | |
| return train_dataset,test_dataset,training,validation | |
| def tft_data(self): | |
| """ | |
| Prepare the data for training with the Temporal Fusion Transformer (TFT) model. | |
| Returns: | |
| train_dataset (pandas.DataFrame): The training dataset. | |
| test_dataset (pandas.DataFrame): The test dataset. | |
| training (TimeSeriesDataSet): The training dataset for modeling. | |
| validation (TimeSeriesDataSet): The validation dataset for modeling. | |
| earliest_time (pandas.Timestamp): The earliest timestamp in the data. | |
| """ | |
| df=self.load_data() | |
| df=self.data_transformation(df) | |
| df=self.select_chunk(df) | |
| df,earliest_time=self.time_features(df) | |
| train_dataset,test_dataset,training,validation =self.data_split(df) | |
| return train_dataset,test_dataset,training,validation,earliest_time | |
| def fb_data(self): | |
| """ | |
| Prepare the data for training with the Facebook Prophet model. | |
| Returns: | |
| train_data (pandas.DataFrame): The training dataset. | |
| test_data (pandas.DataFrame): The test dataset. | |
| consumer_dummay (pandas.Index): The consumer ID columns. | |
| """ | |
| df=self.load_data() | |
| df=self.data_transformation(df) | |
| df=self.select_chunk(df) | |
| df,earliest_time=self.time_features(df) | |
| consumer_dummay=pd.get_dummies(df['consumer_id']) | |
| ## add encoded column into main | |
| df[consumer_dummay.columns]=consumer_dummay | |
| updated_df=df.drop(['consumer_id','hours_from_start','days_from_start'],axis=1) | |
| updated_df=updated_df.rename({'date':'ds',"Lead_1":'y'},axis=1) | |
| ## Train dataset >> train + validation | |
| train_data=updated_df.loc[updated_df['ds']<updated_df.ds.unique()[-self.test_dataset_size:][0]] | |
| ## Test Dataset | |
| test_data=updated_df.loc[updated_df['ds']>=updated_df.ds.unique()[-self.test_dataset_size:][0]] | |
| return train_data,test_data,consumer_dummay.columns | |
| #------------------------------------------------------------------------------------- | |
| class StoreDataLoader: | |
| def __init__(self,path): | |
| self.path=path | |
| def load_data(self): | |
| try: | |
| data = pd.read_csv(self.path) | |
| data['date']= pd.to_datetime(data['date']) | |
| items=[i for i in range(1,11)] | |
| data=data.loc[(data['store']==1) & (data['item'].isin(items))] | |
| # data['date']=data['date'].dt.date | |
| print('Load the data sucessfully.') | |
| return data | |
| except: | |
| print("Load the Data Again") | |
| def create_week_date_featues(self,df,date_column): | |
| df['Month'] = pd.to_datetime(df[date_column]).dt.month | |
| df['Day'] = pd.to_datetime(df[date_column]).dt.day | |
| df['Dayofweek'] = pd.to_datetime(df[date_column]).dt.dayofweek | |
| df['DayOfyear'] = pd.to_datetime(df[date_column]).dt.dayofyear | |
| df['Week'] = pd.to_datetime(df[date_column]).dt.week | |
| df['Quarter'] = pd.to_datetime(df[date_column]).dt.quarter | |
| df['Is_month_start'] = np.where(pd.to_datetime(df[date_column]).dt.is_month_start,0,1) | |
| df['Is_month_end'] = np.where(pd.to_datetime(df[date_column]).dt.is_month_end,0,1) | |
| df['Is_quarter_start'] = np.where(pd.to_datetime(df[date_column]).dt.is_quarter_start,0,1) | |
| df['Is_quarter_end'] = np.where(pd.to_datetime(df[date_column]).dt.is_quarter_end,0,1) | |
| df['Is_year_start'] = np.where(pd.to_datetime(df[date_column]).dt.is_year_start,0,1) | |
| df['Is_year_end'] = np.where(pd.to_datetime(df[date_column]).dt.is_year_end,0,1) | |
| df['Semester'] = np.where(df[date_column].isin([1,2]),1,2) | |
| df['Is_weekend'] = np.where(df[date_column].isin([5,6]),1,0) | |
| df['Is_weekday'] = np.where(df[date_column].isin([0,1,2,3,4]),1,0) | |
| df['Days_in_month'] = pd.to_datetime(df[date_column]).dt.days_in_month | |
| return df | |
| def lead(self,df,lead=-1): | |
| d_lead=df.groupby(['store','item'])['sales'].shift(lead) | |
| return d_lead | |
| def lag(self,df,lag=1): | |
| d_lag=df.groupby(['store','item'])['sales'].shift(lag) | |
| return d_lag | |
| def time_features(self,df): | |
| earliest_time = df['date'].min() | |
| print(earliest_time) | |
| df['hours_from_start'] = (df['date'] - earliest_time).dt.seconds / 60 / 60 + (df['date'] - earliest_time).dt.days * 24 | |
| df['hours_from_start'] = df['hours_from_start'].astype('int') | |
| df['days_from_start'] = (df['date'] - earliest_time).dt.days | |
| # new_weather_data['date'] = date | |
| # new_weather_data['consumer_id'] = label | |
| df=self.create_week_date_featues(df,'date') | |
| # change dtypes of store | |
| df['store']=df['store'].astype('str') | |
| df['item']=df['item'].astype('str') | |
| df['sales']=df['sales'].astype('float') | |
| df["log_sales"] = np.log(df.sales + 1e-8) | |
| df["avg_demand_by_store"] = df.groupby(["days_from_start", "store"], observed=True).sales.transform("mean") | |
| df["avg_demand_by_item"] = df.groupby(["days_from_start", "item"], observed=True).sales.transform("mean") | |
| # items=[str(i) for i in range(1,11)] | |
| # df=df.loc[(df['store']=='1') & (df['item'].isin(items))] | |
| # df=df.reset_index(drop=True) | |
| d_1=self.lead(df) | |
| df['Lead_1']=d_1 | |
| d_lag1=self.lag(df,lag=1) | |
| df['lag_1']=d_lag1 | |
| d_lag5=self.lag(df,lag=5) | |
| df['lag_5']=d_lag5 | |
| df=df.dropna() | |
| return df,earliest_time | |
| def split_data(self,df,test_dataset_size=30,max_prediction_length=30,max_encoder_length=120): | |
| # df=self.load_data() | |
| # df,earliest_time=self.time_features(df) | |
| ## Train dataset >> train + validation | |
| train_dataset=df.loc[df['date']<df.date.unique()[-test_dataset_size:][0]] | |
| ## Test Dataset | |
| test_dataset=df.loc[df['date']>=df.date.unique()[-test_dataset_size:][0]] | |
| training_cutoff = train_dataset["days_from_start"].max() - max_prediction_length | |
| print("Training cutoff point ::",training_cutoff) | |
| training = TimeSeriesDataSet( | |
| train_dataset[lambda x: x.days_from_start <= training_cutoff], | |
| time_idx="days_from_start", | |
| target="Lead_1", ## target use as lead | |
| group_ids=['store','item'], | |
| min_encoder_length=max_encoder_length // 2, | |
| max_encoder_length=max_encoder_length, | |
| min_prediction_length=1, | |
| max_prediction_length=max_prediction_length, | |
| static_categoricals=["store",'item'], | |
| static_reals=[], | |
| time_varying_known_categoricals=[], | |
| time_varying_known_reals=["days_from_start","Day", "Month","Dayofweek","DayOfyear","Days_in_month",'Week', 'Quarter', | |
| 'Is_month_start', 'Is_month_end', 'Is_quarter_start', 'Is_quarter_end', | |
| 'Is_year_start', 'Is_year_end', 'Semester', 'Is_weekend', 'Is_weekday','Dayofweek', 'DayOfyear','lag_1','lag_5','sales'], | |
| time_varying_unknown_reals=['Lead_1','log_sales','avg_demand_by_store','avg_demand_by_item'], | |
| target_normalizer=GroupNormalizer( | |
| groups=["store","item"], transformation="softplus" | |
| ), # we normalize by group | |
| add_relative_time_idx=True, | |
| add_target_scales=True, | |
| add_encoder_length=True, # | |
| allow_missing_timesteps=True, | |
| ) | |
| validation = TimeSeriesDataSet.from_dataset(training, train_dataset, predict=True, stop_randomization=True) | |
| # create dataloaders for our model | |
| batch_size = 32 | |
| # if you have a strong GPU, feel free to increase the number of workers | |
| train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0) | |
| val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0) | |
| return train_dataset,test_dataset,training,validation | |
| def tft_data(self): | |
| df=self.load_data() | |
| df,earliest_time=self.time_features(df) | |
| train_dataset,test_dataset,training,validation=self.split_data(df) | |
| return train_dataset,test_dataset,training,validation,earliest_time | |
| def fb_data(self,test_dataset_size=30): | |
| df=self.load_data() | |
| df,earliest_time=self.time_features(df) | |
| store_dummay=pd.get_dummies(df['store'],prefix='store') | |
| # store_dummay.head() | |
| item_dummay=pd.get_dummies(df['item'],prefix='item') | |
| # item_dummay.head() | |
| df_encode=pd.concat([store_dummay,item_dummay],axis=1) | |
| # df_encode.head() | |
| ## add encoded column into main | |
| df[df_encode.columns]=df_encode | |
| df=df.drop(['store','item','log_sales','avg_demand_by_store','avg_demand_by_item'],axis=1) | |
| df=df.rename({'date':'ds',"Lead_1":'y'},axis=1) | |
| fb_train_data = df.loc[df['ds'] <= '2017-11-30'] | |
| fb_test_data = df.loc[df['ds'] > '2017-11-30'] | |
| # fb_train_data=df.loc[df['ds']<df.ds.unique()[-test_dataset_size:][0]] | |
| # fb_test_data=df.loc[df['ds']>=df.ds.unique()[-test_dataset_size:][0]] | |
| return fb_train_data,fb_test_data,item_dummay,store_dummay | |
| if __name__=='__main__': | |
| obj=Energy_DataLoader(r'D:\Ai Practices\Transformer Based Forecasting\stremlit app\LD2011_2014.txt') | |
| obj.load() | |