You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

generate_data.py 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. import numpy as np
  2. import pandas as pd
  3. from math import ceil
  4. from tqdm import tqdm
  5. from copy import deepcopy as dco
  6. import os, sys, gc, time, warnings, pickle, psutil, random
  7. from sklearn.preprocessing import LabelEncoder
  8. from sklearn.preprocessing import MinMaxScaler
  9. from .utils import *
  10. from .config import raw_data_dir, processed_data_dir, TARGET
  11. warnings.filterwarnings("ignore")
  12. # ==================== preprocessing ====================
  13. def melt_raw_data(train_df):
  14. if os.path.exists(os.path.join(processed_data_dir, "melt_raw_data.pkl")):
  15. return pd.read_pickle(os.path.join(processed_data_dir, "melt_raw_data.pkl"))
  16. index_columns = ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
  17. grid_df = pd.melt(train_df, id_vars=index_columns, var_name="d", value_name=TARGET)
  18. for col in index_columns:
  19. grid_df[col] = grid_df[col].astype("category")
  20. grid_df.to_pickle(os.path.join(processed_data_dir, "melt_raw_data.pkl"))
  21. return grid_df
  22. def add_release_week(grid_df, prices_df, calendar_df):
  23. if os.path.exists(os.path.join(processed_data_dir, "add_release_week.pkl")):
  24. return pd.read_pickle(os.path.join(processed_data_dir, "add_release_week.pkl"))
  25. release_df = prices_df.groupby(["store_id", "item_id"])["wm_yr_wk"].agg(["min"]).reset_index()
  26. release_df.columns = ["store_id", "item_id", "release"]
  27. grid_df = merge_by_concat(grid_df, release_df, ["store_id", "item_id"])
  28. grid_df = merge_by_concat(grid_df, calendar_df[["wm_yr_wk", "d"]], ["d"])
  29. # cutoff meaningless rows
  30. grid_df = grid_df[grid_df["wm_yr_wk"] >= grid_df["release"]]
  31. grid_df = grid_df.reset_index(drop=True)
  32. # scale the release
  33. grid_df["release"] = grid_df["release"] - grid_df["release"].min()
  34. grid_df["release"] = grid_df["release"].astype(np.int16)
  35. grid_df.to_pickle(os.path.join(processed_data_dir, "add_release_week.pkl"))
  36. return grid_df
  37. def add_prices(grid_df, prices_df, calendar_df):
  38. if os.path.exists(os.path.join(processed_data_dir, "add_prices.pkl")):
  39. return pd.read_pickle(os.path.join(processed_data_dir, "add_prices.pkl"))
  40. prices_df["price_max"] = prices_df.groupby(["store_id", "item_id"])["sell_price"].transform("max")
  41. prices_df["price_min"] = prices_df.groupby(["store_id", "item_id"])["sell_price"].transform("min")
  42. prices_df["price_std"] = prices_df.groupby(["store_id", "item_id"])["sell_price"].transform("std")
  43. prices_df["price_mean"] = prices_df.groupby(["store_id", "item_id"])["sell_price"].transform("mean")
  44. prices_df["price_norm"] = prices_df["sell_price"] / prices_df["price_max"]
  45. prices_df["price_nunique"] = prices_df.groupby(["store_id", "item_id"])["sell_price"].transform("nunique")
  46. prices_df["item_nunique"] = prices_df.groupby(["store_id", "sell_price"])["item_id"].transform("nunique")
  47. calendar_prices = calendar_df[["wm_yr_wk", "month", "year"]]
  48. calendar_prices = calendar_prices.drop_duplicates(subset=["wm_yr_wk"])
  49. prices_df = prices_df.merge(calendar_prices[["wm_yr_wk", "month", "year"]], on=["wm_yr_wk"], how="left")
  50. prices_df["price_momentum"] = prices_df["sell_price"] / prices_df.groupby(["store_id", "item_id"])[
  51. "sell_price"
  52. ].transform(lambda x: x.shift(1))
  53. prices_df["price_momentum_m"] = prices_df["sell_price"] / prices_df.groupby(["store_id", "item_id", "month"])[
  54. "sell_price"
  55. ].transform("mean")
  56. prices_df["price_momentum_y"] = prices_df["sell_price"] / prices_df.groupby(["store_id", "item_id", "year"])[
  57. "sell_price"
  58. ].transform("mean")
  59. grid_df = reduce_mem_usage(grid_df)
  60. prices_df = reduce_mem_usage(prices_df)
  61. original_columns = list(grid_df)
  62. grid_df = grid_df.merge(prices_df, on=["store_id", "item_id", "wm_yr_wk"], how="left")
  63. grid_df = reduce_mem_usage(grid_df)
  64. grid_df.to_pickle(os.path.join(processed_data_dir, "add_prices.pkl"))
  65. return grid_df
  66. def add_date(grid_df, calendar_df):
  67. if os.path.exists(os.path.join(processed_data_dir, "add_date.pkl")):
  68. return pd.read_pickle(os.path.join(processed_data_dir, "add_date.pkl"))
  69. # merge calendar partly
  70. icols = [
  71. "date",
  72. "d",
  73. "event_name_1",
  74. "event_type_1",
  75. "event_name_2",
  76. "event_type_2",
  77. "snap_CA",
  78. "snap_TX",
  79. "snap_WI",
  80. ]
  81. grid_df = grid_df.merge(calendar_df[icols], on=["d"], how="left")
  82. # convert to category
  83. icols = [
  84. "event_name_1",
  85. "event_type_1",
  86. "event_name_2",
  87. "event_type_2",
  88. "snap_CA",
  89. "snap_TX",
  90. "snap_WI",
  91. ]
  92. for col in icols:
  93. grid_df[col] = grid_df[col].astype("category")
  94. # make some features from date
  95. grid_df["date"] = pd.to_datetime(grid_df["date"])
  96. grid_df["tm_d"] = grid_df["date"].dt.day.astype(np.int8)
  97. grid_df["tm_w"] = grid_df["date"].dt.week.astype(np.int8)
  98. grid_df["tm_m"] = grid_df["date"].dt.month.astype(np.int8)
  99. grid_df["tm_y"] = grid_df["date"].dt.year
  100. grid_df["tm_y"] = (grid_df["tm_y"] - grid_df["tm_y"].min()).astype(np.int8)
  101. grid_df["tm_wm"] = grid_df["tm_d"].apply(lambda x: ceil(x / 7)).astype(np.int8)
  102. grid_df["tm_dw"] = grid_df["date"].dt.dayofweek.astype(np.int8)
  103. grid_df["tm_w_end"] = (grid_df["tm_dw"] >= 5).astype(np.int8)
  104. # clear columns
  105. grid_df["d"] = grid_df["d"].apply(lambda x: x[2:]).astype(np.int16)
  106. grid_df = grid_df.drop("wm_yr_wk", 1)
  107. grid_df.to_pickle(os.path.join(processed_data_dir, "add_date.pkl"))
  108. return grid_df
  109. def add_lags_rollings(grid_df):
  110. if os.path.exists(os.path.join(processed_data_dir, "add_lags_rollings.pkl")):
  111. return pd.read_pickle(os.path.join(processed_data_dir, "add_lags_rollings.pkl"))
  112. # add lags
  113. SHIFT_DAY = 28
  114. LAG_DAYS = [col for col in range(SHIFT_DAY, SHIFT_DAY + 15)]
  115. grid_df = grid_df.assign(
  116. **{
  117. "{}_lag_{}".format(col, l): grid_df.groupby(["id"])[col].transform(lambda x: x.shift(l))
  118. for l in LAG_DAYS
  119. for col in [TARGET]
  120. }
  121. )
  122. for col in list(grid_df):
  123. if "lag" in col:
  124. grid_df[col] = grid_df[col].astype(np.float16)
  125. # add rollings
  126. for i in [7, 14, 30, 60, 180]:
  127. grid_df["rolling_mean_" + str(i)] = (
  128. grid_df.groupby(["id"])[TARGET].transform(lambda x: x.shift(SHIFT_DAY).rolling(i).mean()).astype(np.float16)
  129. )
  130. grid_df["rolling_std_" + str(i)] = (
  131. grid_df.groupby(["id"])[TARGET].transform(lambda x: x.shift(SHIFT_DAY).rolling(i).std()).astype(np.float16)
  132. )
  133. # sliding window
  134. for d_shift in [1, 7, 14]:
  135. for d_window in [7, 14, 30, 60]:
  136. col_name = "rolling_mean_tmp_" + str(d_shift) + "_" + str(d_window)
  137. grid_df[col_name] = (
  138. grid_df.groupby(["id"])[TARGET]
  139. .transform(lambda x: x.shift(SHIFT_DAY + d_shift).rolling(d_window).mean())
  140. .astype(np.float16)
  141. )
  142. grid_df.to_pickle(os.path.join(processed_data_dir, "add_lags_rollings.pkl"))
  143. return grid_df
  144. def add_mean_enc(grid_df):
  145. if os.path.exists(os.path.join(processed_data_dir, "add_mean_enc.pkl")):
  146. return pd.read_pickle(os.path.join(processed_data_dir, "add_mean_enc.pkl"))
  147. sales_df = dco(grid_df["sales"])
  148. grid_df["sales"][grid_df["d"] > (1941 - 28)] = np.nan
  149. icols = [
  150. ["state_id"],
  151. ["store_id"],
  152. ["cat_id"],
  153. ["dept_id"],
  154. ["state_id", "cat_id"],
  155. ["state_id", "dept_id"],
  156. ["store_id", "cat_id"],
  157. ["store_id", "dept_id"],
  158. ["item_id"],
  159. ["item_id", "state_id"],
  160. ["item_id", "store_id"],
  161. ]
  162. for col in icols:
  163. col_name = "_" + "_".join(col) + "_"
  164. grid_df["enc" + col_name + "mean"] = grid_df.groupby(col)["sales"].transform("mean").astype(np.float16)
  165. grid_df["enc" + col_name + "std"] = grid_df.groupby(col)["sales"].transform("std").astype(np.float16)
  166. grid_df["sales"] = sales_df
  167. grid_df.to_pickle(os.path.join(processed_data_dir, "add_mean_enc.pkl"))
  168. return grid_df
  169. def add_snap(grid_df):
  170. if os.path.exists(os.path.join(processed_data_dir, "all_data_df.pkl")):
  171. return pd.read_pickle(os.path.join(processed_data_dir, "all_data_df.pkl"))
  172. mask_CA = grid_df["state_id"] == "CA"
  173. mask_WI = grid_df["state_id"] == "WI"
  174. mask_TX = grid_df["state_id"] == "TX"
  175. grid_df["snap"] = grid_df["snap_CA"]
  176. grid_df.loc[mask_WI, "snap"] = grid_df["snap_WI"]
  177. grid_df.loc[mask_TX, "snap"] = grid_df["snap_TX"]
  178. grid_df.to_pickle(os.path.join(processed_data_dir, "all_data_df.pkl"))
  179. return grid_df
  180. def preprocessing_m5():
  181. train_df = pd.read_csv(os.path.join(raw_data_dir, "sales_train_evaluation.csv"))
  182. prices_df = pd.read_csv(os.path.join(raw_data_dir, "sell_prices.csv"))
  183. calendar_df = pd.read_csv(os.path.join(raw_data_dir, "calendar.csv"))
  184. grid_df = melt_raw_data(train_df)
  185. print(f"df: ({grid_df.shape[0]}, {grid_df.shape[1]}) Melting raw data down!")
  186. grid_df = add_release_week(grid_df, prices_df, calendar_df)
  187. print(f"df: ({grid_df.shape[0]}, {grid_df.shape[1]}) Adding release week down!")
  188. grid_df = add_prices(grid_df, prices_df, calendar_df)
  189. print(f"df: ({grid_df.shape[0]}, {grid_df.shape[1]}) Adding prices down!")
  190. grid_df = add_date(grid_df, calendar_df)
  191. print(f"df: ({grid_df.shape[0]}, {grid_df.shape[1]}) Adding date down!")
  192. grid_df = add_lags_rollings(grid_df)
  193. print(f"df: ({grid_df.shape[0]}, {grid_df.shape[1]}) Adding lags and rollings down!")
  194. grid_df = add_mean_enc(grid_df)
  195. print(f"df: ({grid_df.shape[0]}, {grid_df.shape[1]}) Adding mean encoding down!")
  196. grid_df = pd.read_pickle(os.path.join(processed_data_dir, "add_mean_enc.pkl"))
  197. grid_df = add_snap(grid_df)
  198. print("Save the data down!")
  199. # ==================== split dataset ====================
  200. def label_encode(df, columns):
  201. le = LabelEncoder()
  202. data_list = []
  203. for column in columns:
  204. data_list += df[column].drop_duplicates().values.tolist()
  205. le.fit(data_list)
  206. for column in columns:
  207. df[column] = le.transform(df[column].values.tolist())
  208. return df
  209. def reorganize_data(grid_df):
  210. grid_df["snap"] = grid_df["snap"].astype("int8")
  211. columns_list = [
  212. ["item_id"],
  213. ["dept_id"],
  214. ["cat_id"],
  215. ["event_name_1", "event_name_2"],
  216. ["event_type_1", "event_type_2"],
  217. ]
  218. for columns in columns_list:
  219. grid_df[columns] = label_encode(grid_df[columns], columns)
  220. return reduce_mem_usage(grid_df)
  221. def split_data(df, store, fill_flag=False):
  222. for cat in category_list:
  223. df[cat] = df[cat].astype("category")
  224. if fill_flag:
  225. df = reduce_mem_usage(df, float16_flag=False)
  226. cols = df.isnull().any()
  227. idx = list(cols[cols.values].index)
  228. df[idx] = df.groupby("item_id", sort=False)[idx].apply(lambda x: x.ffill().bfill())
  229. df[idx] = df[idx].fillna(df[idx].mean())
  230. mms = MinMaxScaler()
  231. df[features_columns] = mms.fit_transform(df[features_columns])
  232. df = reduce_mem_usage(df)
  233. train_df = df[df["d"] <= END_TRAIN]
  234. val_df = df[df["d"] > END_TRAIN]
  235. train_df = train_df[features_columns + label_column]
  236. val_df = val_df[features_columns + label_column]
  237. print(train_df.shape, val_df.shape)
  238. suffix = f"_fill" if fill_flag else ""
  239. train_df.to_pickle(os.path.join(processed_data_dir, f"train_{store}{suffix}.pkl"))
  240. val_df.to_pickle(os.path.join(processed_data_dir, f"val_{store}{suffix}.pkl"))
  241. def split_m5():
  242. grid_df = pd.read_pickle(os.path.join(processed_data_dir, "all_data_df.pkl"))
  243. if os.path.exists(os.path.join(processed_data_dir, "label_encode.pkl")):
  244. grid_df = pd.read_pickle(os.path.join(processed_data_dir, "label_encode.pkl"))
  245. else:
  246. grid_df = reorganize_data(grid_df)
  247. grid_df.to_pickle(os.path.join(processed_data_dir, "label_encode.pkl"))
  248. for store in store_list:
  249. # split_data(grid_df[grid_df["store_id"] == store], store)
  250. split_data(grid_df[grid_df["store_id"] == store], store, True)
  251. def regenerate_data():
  252. preprocessing_m5()
  253. split_m5()

基于学件范式,全流程地支持学件上传、检测、组织、查搜、部署和复用等功能。同时,该仓库作为北冥坞系统的引擎,支撑北冥坞系统的核心功能。