# %pip install s3fs seaborn xgboost pyarrow s3fs boto3==1.26.37 awswrangler --upgrade
%pip install xgboost boto3 awswrangler --upgrade
import pandas as pd
import numpy as np
import boto3
# To handle import certain amount of rows for parquet files
import awswrangler as wr
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import matplotlib
matplotlib.rcParams['figure.figsize'] = 10,10
NUMBER_OF_RECORDS_USED = 1_000_000
def list_s3_files_in_folder_using_client(bucket_name, prefix):
"""
This function will list down all files in a folder from S3 bucket
:return: None
"""
s3_client = boto3.client("s3")
# bucket_name = "testbucket-frompython-2"
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
files = response.get("Contents")
for file in files:
print(f"file_name: {file['Key']}, size: {(file['Size']/1_000_000):,} MB")
list_s3_files_in_folder_using_client('dsoaws', 'nyc-taxi-orig-cleaned-split-parquet-per-year')
# %%time
# # Full year (2018) data is about 100 million rows
# # Using panda
# nyc_df_fare = pd.read_parquet('s3a://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2018')
# nyc_df_fare
# # Instance: ml.m5.16xlarge
# # CPU times: user 7.12 s, sys: 9.33 s, total: 16.5 s
# # Wall time: 1min
%%time
# Using aws wrangler
nyc_df_fare = wr.s3.read_parquet(path="s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2019", dataset=True)
nyc_df_fare
# Instance: ml.m5.16xlarge
# CPU times: user 12.8 s, sys: 10.4 s, total: 23.3 s
# Wall time: 19.8 s
# import pyarrow.parquet as pq
# import pyarrow as pa
# import s3fs
# %%time
# # Using pyarrow
# fs = s3fs.S3FileSystem()
# bucket_uri = 's3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2019'
# dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
# # dataset = pa.dataset.dataset(bucket_uri, filesystem=fs)
# table = dataset.read(use_threads=True)
# nyc_df_fare = table.to_pandas(use_threads=True)
# nyc_df_fare
# # Instance: ml.m5.16xlarge
# # CPU times: user 7.61 s, sys: 9.69 s, total: 17.3 s
# # Wall time: 1min 2s
# DO NOT use this since the data in tables are not in the same order, you will NOT be able to join it later on
def get_nrows_parquet(s3_path: str, nrows: int = 1_000_000) -> pd.DataFrame:
# Reading in a small number of rows using awswrangler
dfs = wr.s3.read_parquet(path=s3_path, dataset=True,
chunked=nrows)
for df in dfs:
return df
%%time
# Using aws wrangler
nyc_df_info = wr.s3.read_parquet(path="s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-info/year=2019", dataset=True)
nyc_df_info
# %%time
# nyc_df_metadata['ride_id'] = nyc_df_metadata['ride_id'].astype(str)
# nyc_df_fare['ride_id'] = nyc_df_fare['ride_id'].astype(str)
# nyc_df_pickup['ride_id'] = nyc_df_pickup['ride_id'].astype(str)
# nyc_df_dropoff['ride_id'] = nyc_df_dropoff['ride_id'].astype(str)
# nyc_df_metadata.info()
# nyc_df_fare.info()
# nyc_df_pickup.info()
# nyc_df_dropoff.info()
%%time
# Join different tables
nyc_df = nyc_df_fare.merge(nyc_df_info, on='ride_id')
%%time
nyc_df.head(10)
nyc_df.info()
# Get memory usage
memory_bytes = nyc_df.memory_usage(index=True, deep=True).sum()
print(f'Full df memory usage: {(memory_bytes / 1_000_000_000):,} GBs')
We can see that the trip_duration field contains values in the range of 1-86390. So let's exclude the data that lies outisde 2 standard deviations from the mean.
# Check for any null entries
nyc_df.isnull().sum()
Now looking into the longitude the coordinates lies between (-74.53,-72.71) and the latitude coordinates lies between (40.44,41.09). But the pickup_latitude and pickup_longitude and dropoff_latitude and dropoff_laongitude lies outside this range. So let's clean them.
%%time
nyc_df.describe()
Now let's change the data type of pickup_datetime and dropoff_datetime fields as they may be useful later.
nyc_df['pickup_at'] = nyc_df['pickup_at'].astype('datetime64')
nyc_df['pickup_date'] = nyc_df['pickup_at'].dt.date
nyc_df['pickup_date']
nyc_df['dropoff_at'] = nyc_df['dropoff_at'].astype('datetime64')
nyc_df['dropoff_date'] = nyc_df['dropoff_at'].dt.date
nyc_df['dropoff_date']
nyc_df['Month'] = nyc_df['pickup_at'].dt.month
nyc_df['Month']
nyc_df['Hour'] = nyc_df['pickup_at'].dt.hour
nyc_df['Hour']
nyc_df['Year'] = nyc_df['pickup_at'].dt.year
nyc_df['Year']
nyc_df.columns
# plt.title("Distribution of total_amount")
# sns.histplot(nyc_df['total_amount'], bins=100)
# nyc_df['log_total_amount'] = np.log(nyc_df['total_amount'].values+1)
# plt.title('Distribution of total amount')
# sns.histplot(nyc_df['log_total_amount'].values,bins=100)
# nyc_df.groupby('pickup_date').count() #['ride_id']
# plt.plot(nyc_df.groupby('pickup_date').count(), 'o-', label='train')
# df = nyc_df.groupby('passenger_count')['total_amount'].mean()
# plt.title('Distribution of trip_duration with respect to the trip_duration')
# sns.barplot(df.index,df.values)
We can see that the number of passengers has nothing to do with the trip_duration
# df = nyc_df.groupby('vendor_id')['trip_duration'].mean()
# print(df)
# plt.subplots(1,1,figsize=(15,10))
# plt.ylim(ymin=800)
# plt.ylim(ymax=842)
# sns.barplot(df.index,df.values)
# plt.title('Time per Vendor')
# plt.legend(loc=0)
# plt.ylabel('Time in Seconds')
It doesn't seem to be like the trip_duration differs between the vendors.
# df = nyc_df.groupby('store_and_fwd_flag')['trip_duration'].mean()
# plt.subplots(1,1,figsize=(15,10))
# plt.title('Time per store_and_fwd_flag')
# plt.legend(loc=0)
# plt.ylabel('Time in Seconds')
# sns.barplot(df.index,df.values)
So it would seem that the store_and_fwd_flag discriminates well between travel times. Clearly there is a slight skew in the data where some of the vendor employees didn't record their travel times accurately.
Since we need data to train, validate and test them let's split the data using train_test_split
from sklearn module
Refernce:https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html
from sklearn.model_selection import train_test_split
train_val_df , test_df = train_test_split(nyc_df, test_size=0.2,random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.25,random_state=42)
train_df
%%time
train_df.info()
%%time
train_df.corr()
input_col = ['vendor_id','pickup_at','dropoff_at', 'passenger_count','store_and_fwd_flag']
target_col = 'total_amount'
train_inputs = train_df[input_col].copy()
train_targets = train_df[target_col].copy()
val_inputs = val_df[input_col].copy()
val_targets = val_df[target_col].copy()
test_inputs = test_df[input_col].copy()
test_targets = test_df[target_col].copy()
numeric_col = train_inputs.select_dtypes(include=np.number).columns.tolist()
cate_col = train_inputs.select_dtypes('object').columns.tolist()
train_inputs[numeric_col]
train_inputs[cate_col]
Imputing is an technique in which the fill in the missing numeric values based on some category. Here if we want we'll use SimpleImputer form sklearn moduel
Reference:https://scikit-learn.org/stable/modules/generated/sklearn.impute.SimpleImputer.html
%%time
train_inputs[numeric_col].isna().sum()
%%time
val_inputs[numeric_col].isna().sum()
%%time
test_inputs[numeric_col].isna().sum()
It seems like there are no missing values in the train,validation and test datasets.
Now let's scale the numeric values to in range of (0,1)
Reference:https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html
train_inputs[numeric_col].describe()
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
%%time
scaler.fit(train_inputs[numeric_col])
%%time
train_inputs[numeric_col] = scaler.transform(train_inputs[numeric_col])
val_inputs[numeric_col] = scaler.transform(val_inputs[numeric_col])
test_inputs[numeric_col] = scaler.transform(test_inputs[numeric_col])
train_inputs[numeric_col].describe()
Now are going to encode categorical columns using one hot encoder into one hot numeric array Reference:https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(sparse=False,handle_unknown='ignore')
%%time
encoder.fit(train_inputs[cate_col])
enc_col = encoder.get_feature_names(cate_col).tolist()
enc_col
%%time
train_inputs[enc_col] = encoder.transform(train_inputs[cate_col])
val_inputs[enc_col] = encoder.transform(val_inputs[cate_col])
test_inputs[enc_col] = encoder.transform(test_inputs[cate_col])
train_inputs[enc_col]
val_inputs
%%time
x_train = train_inputs[numeric_col + enc_col]
x_val = val_inputs[numeric_col+enc_col]
x_test = test_inputs[numeric_col + enc_col]
Linear Regression is a machine learning algorithm based on supervised learning. It performs a regression task. Regression models a target prediction value based on independent variables. It is mostly used for finding out the relationship between variables and forecasting. Reference:https://machinelearningmastery.com/linear-regression-for-machine-learning/
from sklearn.linear_model import LinearRegression
model = LinearRegression()
%%time
model.fit(x_train,train_targets)
%%time
model.predict(x_train)
train_targets
from sklearn.metrics import mean_squared_error
mean_squared_error(model.predict(x_train),train_targets,squared=False)
mean_squared_error(model.predict(x_val),val_targets,squared=False)
XGBoost is an powerful approach for solving regresion models.Since we are dealing regression problems, we are now going to use xgboost model Reference:https://xgboost.readthedocs.io/en/latest/
import xgboost
print(xgboost.__version__)
from xgboost import XGBRegressor
model = XGBRegressor(n_jobs=-1,random_state=42)
%%time
model.fit(x_train,train_targets)
%%time
model.predict(x_train)
mean_squared_error(model.predict(x_train),train_targets,squared=False)
mean_squared_error(model.predict(x_val),val_targets,squared=False)
Now let's visualize the model
# %pip install graphviz
# from xgboost import plot_tree
# matplotlib.rcParams['figure.figsize'] = 20,20
# plot_tree(model,rankdir='LR')
# matplotlib.rcParams['figure.figsize'] = 20,20
# plot_tree(model,rankdir='LR',num_trees=19)
trees = model.get_booster().get_dump()
print(trees[0])
Let's create a new data frame which contains the fields and their respective Importance given by the model
importance_df = pd.DataFrame({
'Feature' : numeric_col+enc_col,
'Importance':model.feature_importances_
}).sort_values('Importance',ascending=False)
matplotlib.rcParams['figure.figsize'] = 10,10
sns.barplot((importance_df['Feature'], importance_df['Importance']))
We can see that dropoff_longitude has the most importance given by the model
Now we have created a model, next we have to tune the model using the parameters so that we can reduce the loss Refer:https://towardsdatascience.com/xgboost-fine-tune-and-optimize-your-model-23d996fab663
def test_param(**params):
model = XGBRegressor(n_jobs=-1,random_state=42,**params)
model.fit(x_train,train_targets)
train_rmse = mean_squared_error(model.predict(x_train),train_targets,squared=False)
val_rmse = mean_squared_error(model.predict(x_val),val_targets,squared=False)
print(train_rmse," ",val_rmse)
return train_rmse,val_rmse
def test_param_and_plot(param_name, param_values):
train_errors, val_errors = [], []
for value in param_values:
params = {param_name: value}
train_rmse, val_rmse = test_param(**params)
train_errors.append(train_rmse)
val_errors.append(val_rmse)
plt.figure(figsize=(10,6))
plt.title('Overfitting curve: ' + param_name)
plt.plot(param_values, train_errors, 'b-o')
plt.plot(param_values, val_errors, 'r-o')
plt.xlabel(param_name)
plt.ylabel('RMSE')
plt.legend(['Training', 'Validation'])
test_param(max_depth=2)
test_param(max_depth=5)
test_param(max_depth=10)
#test_param_and_plot('max_depth',[x for x in range(0,60,10)])
test_param(n_estimators=10)
test_param(n_estimators=20)
test_param(n_estimators=30)
test_param(n_estimators=40)
test_param(n_estimators=50)
test_param(n_estimators=50,max_depth=30)
test_param(n_estimators=60,max_depth=30)
test_param_and_plot('n_estimators',[x for x in range(100,1100,100)])
test_param(learning_rate=0.01)
test_param(learning_rate=0.1)
test_param(learning_rate=0.3)
test_param(learning_rate=0.5)
test_param(learning_rate=0.9)
test_param(learning_rate=1)
test_param(learning_rate=1,max_depth=60,n_estimators=60)
test_param(learning_rate=1,max_depth=60,n_estimators=100)
test_param(booster='gblinear')
test_param(booster='gblinear', max_depth=60, n_estimators=100, learning_rate=1)
test_param(booster='gbtree',max_depth=60,n_estimators=100,learning_rate=1)
model = XGBRegressor(n_jobs=-1,random_state=42,max_depth=60,n_estimaotrs=100,learning_rate=0.9,booster='gbtree')
model.fit(x_train,train_targets)
mean_squared_error(model.predict(x_train),train_targets,squared=False)
mean_squared_error(model.predict(x_val),val_targets,squared=False)
mean_squared_error(model.predict(x_test),test_targets,squared=False)
Let's define a helper function which takes in a model and a input and then computes the trip duration
# from scikit import imputer
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
def predict_input(model, single_input):
input_df = pd.DataFrame([single_input])
imputer.fit(input_df[numeric_col])
input_df[numeric_col] = imputer.transform(input_df[numeric_col])
input_df[numeric_col] = scaler.transform(input_df[numeric_col])
input_df[enc_col] = encoder.transform(input_df[cate_col])
X_input = input_df[numeric_col + enc_col]
pred = model.predict(X_input)[0]
return pred
inp = {
'vendor_id':1,
'pickup_at':'2016-04-01 13:39:13.964110080',
'dropoff_at':'2016-04-01 14:00:13.964110080',
'passenger_count':6,
# 'pickup_latitude':40.7,
# 'dropoff_longitude':-73,
# 'dropoff_latitude':40.7,
'store_and_fwd_flag':'N',
'store_and_fwd_flag_N':1,
'store_and_fwd_flag_Y':0
}
predict_input(model,inp)
So from the given input it's seen that the predicted durationn time 318.1897 seconds