In [ ]:
# %pip install s3fs seaborn xgboost pyarrow s3fs boto3==1.26.37 awswrangler --upgrade
%pip install xgboost boto3 awswrangler --upgrade

Downloading the data¶

In [ ]:
import pandas as pd
import numpy as np
import boto3

# To handle import certain amount of rows for parquet files
import awswrangler as wr

import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import matplotlib

matplotlib.rcParams['figure.figsize'] = 10,10
In [ ]:
NUMBER_OF_RECORDS_USED = 1_000_000
In [ ]:
def list_s3_files_in_folder_using_client(bucket_name, prefix):
    """
    This function will list down all files in a folder from S3 bucket
    :return: None
    """
    s3_client = boto3.client("s3")
    # bucket_name = "testbucket-frompython-2"
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    files = response.get("Contents")
    for file in files:
        print(f"file_name: {file['Key']}, size: {(file['Size']/1_000_000):,} MB")
        
In [ ]:
list_s3_files_in_folder_using_client('dsoaws', 'nyc-taxi-orig-cleaned-split-parquet-per-year')
In [ ]:
# %%time

# # Full year (2018) data is about 100 million rows

# # Using panda
# nyc_df_fare = pd.read_parquet('s3a://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2018')
# nyc_df_fare

# # Instance: ml.m5.16xlarge
# # CPU times: user 7.12 s, sys: 9.33 s, total: 16.5 s
# # Wall time: 1min
In [ ]:
%%time

# Using aws wrangler
nyc_df_fare = wr.s3.read_parquet(path="s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2019", dataset=True)
nyc_df_fare

# Instance: ml.m5.16xlarge
# CPU times: user 12.8 s, sys: 10.4 s, total: 23.3 s
# Wall time: 19.8 s
In [ ]:
# import pyarrow.parquet as pq
# import pyarrow as pa 
# import s3fs
In [ ]:
# %%time

# # Using pyarrow
# fs = s3fs.S3FileSystem()

# bucket_uri = 's3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2019'
# dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
# # dataset = pa.dataset.dataset(bucket_uri, filesystem=fs)
# table = dataset.read(use_threads=True)
# nyc_df_fare = table.to_pandas(use_threads=True) 

# nyc_df_fare

# # Instance: ml.m5.16xlarge
# # CPU times: user 7.61 s, sys: 9.69 s, total: 17.3 s
# # Wall time: 1min 2s
In [ ]:
# DO NOT use this since the data in tables are not in the same order, you will NOT be able to join it later on
def get_nrows_parquet(s3_path: str, nrows: int = 1_000_000) -> pd.DataFrame:
    # Reading in a small number of rows using awswrangler
    dfs = wr.s3.read_parquet(path=s3_path, dataset=True,
                            chunked=nrows)
    for df in dfs:
        return df
In [ ]:
%%time

# Using aws wrangler
nyc_df_info = wr.s3.read_parquet(path="s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-info/year=2019", dataset=True)
nyc_df_info
In [ ]:
# %%time

# nyc_df_metadata['ride_id'] = nyc_df_metadata['ride_id'].astype(str)
# nyc_df_fare['ride_id'] = nyc_df_fare['ride_id'].astype(str)
# nyc_df_pickup['ride_id'] = nyc_df_pickup['ride_id'].astype(str)
# nyc_df_dropoff['ride_id'] = nyc_df_dropoff['ride_id'].astype(str)
In [ ]:
# nyc_df_metadata.info()
# nyc_df_fare.info()
# nyc_df_pickup.info()
# nyc_df_dropoff.info()
In [ ]:
%%time

# Join different tables
nyc_df = nyc_df_fare.merge(nyc_df_info, on='ride_id')
In [ ]:
%%time

nyc_df.head(10)
nyc_df.info()
In [ ]:
# Get memory usage
memory_bytes = nyc_df.memory_usage(index=True, deep=True).sum()
print(f'Full df memory usage: {(memory_bytes / 1_000_000_000):,} GBs')

Data Preprocessing¶

We can see that the trip_duration field contains values in the range of 1-86390. So let's exclude the data that lies outisde 2 standard deviations from the mean.

In [ ]:
# Check for any null entries
nyc_df.isnull().sum()

Now looking into the longitude the coordinates lies between (-74.53,-72.71) and the latitude coordinates lies between (40.44,41.09). But the pickup_latitude and pickup_longitude and dropoff_latitude and dropoff_laongitude lies outside this range. So let's clean them.

In [ ]:
%%time

nyc_df.describe()

Now let's change the data type of pickup_datetime and dropoff_datetime fields as they may be useful later.

In [ ]:
nyc_df['pickup_at'] = nyc_df['pickup_at'].astype('datetime64')
nyc_df['pickup_date'] = nyc_df['pickup_at'].dt.date
nyc_df['pickup_date']
In [ ]:
nyc_df['dropoff_at'] = nyc_df['dropoff_at'].astype('datetime64')
nyc_df['dropoff_date'] = nyc_df['dropoff_at'].dt.date
nyc_df['dropoff_date']
In [ ]:
nyc_df['Month'] = nyc_df['pickup_at'].dt.month
nyc_df['Month']
In [ ]:
nyc_df['Hour'] = nyc_df['pickup_at'].dt.hour
nyc_df['Hour']
In [ ]:
nyc_df['Year'] = nyc_df['pickup_at'].dt.year
nyc_df['Year']

EDA¶

In [ ]:
nyc_df.columns
In [ ]:
# plt.title("Distribution of total_amount")
# sns.histplot(nyc_df['total_amount'], bins=100)
In [ ]:
# nyc_df['log_total_amount']  = np.log(nyc_df['total_amount'].values+1)
# plt.title('Distribution of total amount')
# sns.histplot(nyc_df['log_total_amount'].values,bins=100)
In [ ]:
# nyc_df.groupby('pickup_date').count() #['ride_id']
# plt.plot(nyc_df.groupby('pickup_date').count(), 'o-', label='train')
In [ ]:
# df = nyc_df.groupby('passenger_count')['total_amount'].mean()
# plt.title('Distribution of trip_duration with respect to the trip_duration')
# sns.barplot(df.index,df.values)

We can see that the number of passengers has nothing to do with the trip_duration

In [ ]:
# df = nyc_df.groupby('vendor_id')['trip_duration'].mean()
# print(df)
# plt.subplots(1,1,figsize=(15,10))
# plt.ylim(ymin=800)
# plt.ylim(ymax=842)
# sns.barplot(df.index,df.values)
# plt.title('Time per Vendor')
# plt.legend(loc=0)
# plt.ylabel('Time in Seconds')

It doesn't seem to be like the trip_duration differs between the vendors.

In [ ]:
# df = nyc_df.groupby('store_and_fwd_flag')['trip_duration'].mean()
# plt.subplots(1,1,figsize=(15,10))
# plt.title('Time per store_and_fwd_flag')
# plt.legend(loc=0)
# plt.ylabel('Time in Seconds')
# sns.barplot(df.index,df.values)

So it would seem that the store_and_fwd_flag discriminates well between travel times. Clearly there is a slight skew in the data where some of the vendor employees didn't record their travel times accurately.

Splitting the data¶

Since we need data to train, validate and test them let's split the data using train_test_split from sklearn module

Refernce:https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html

In [ ]:
from sklearn.model_selection import train_test_split

train_val_df , test_df = train_test_split(nyc_df, test_size=0.2,random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.25,random_state=42)
In [ ]:
train_df
In [ ]:
%%time

train_df.info()

Identifying input and output columns¶

In [ ]:
%%time

train_df.corr()
In [ ]:
input_col = ['vendor_id','pickup_at','dropoff_at', 'passenger_count','store_and_fwd_flag']
target_col = 'total_amount'
In [ ]:
train_inputs = train_df[input_col].copy()
train_targets = train_df[target_col].copy()
In [ ]:
val_inputs = val_df[input_col].copy()
val_targets = val_df[target_col].copy()
In [ ]:
test_inputs = test_df[input_col].copy()
test_targets = test_df[target_col].copy()
In [ ]:
numeric_col = train_inputs.select_dtypes(include=np.number).columns.tolist()
cate_col = train_inputs.select_dtypes('object').columns.tolist()
In [ ]:
train_inputs[numeric_col]
In [ ]:
train_inputs[cate_col]

Imputing missing values¶

Imputing is an technique in which the fill in the missing numeric values based on some category. Here if we want we'll use SimpleImputer form sklearn moduel

Reference:https://scikit-learn.org/stable/modules/generated/sklearn.impute.SimpleImputer.html

In [ ]:
%%time

train_inputs[numeric_col].isna().sum()
In [ ]:
%%time

val_inputs[numeric_col].isna().sum()
In [ ]:
%%time

test_inputs[numeric_col].isna().sum()

It seems like there are no missing values in the train,validation and test datasets.

Scaling numeric values¶

Now let's scale the numeric values to in range of (0,1)

Reference:https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html

In [ ]:
train_inputs[numeric_col].describe()
In [ ]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
In [ ]:
%%time

scaler.fit(train_inputs[numeric_col])
In [ ]:
%%time

train_inputs[numeric_col] = scaler.transform(train_inputs[numeric_col])
val_inputs[numeric_col]  = scaler.transform(val_inputs[numeric_col])
test_inputs[numeric_col] = scaler.transform(test_inputs[numeric_col])
In [ ]:
train_inputs[numeric_col].describe()

Encoding categorical columns¶

Now are going to encode categorical columns using one hot encoder into one hot numeric array Reference:https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html

In [ ]:
from sklearn.preprocessing import OneHotEncoder
encoder  = OneHotEncoder(sparse=False,handle_unknown='ignore')
In [ ]:
%%time

encoder.fit(train_inputs[cate_col])
In [ ]:
enc_col = encoder.get_feature_names(cate_col).tolist()
enc_col
In [ ]:
%%time

train_inputs[enc_col] = encoder.transform(train_inputs[cate_col])
val_inputs[enc_col] = encoder.transform(val_inputs[cate_col])
test_inputs[enc_col] = encoder.transform(test_inputs[cate_col])
In [ ]:
train_inputs[enc_col]
In [ ]:
val_inputs
In [ ]:
%%time

x_train = train_inputs[numeric_col + enc_col]
x_val = val_inputs[numeric_col+enc_col]
x_test = test_inputs[numeric_col + enc_col]

Model¶

Linear model¶

Linear Regression is a machine learning algorithm based on supervised learning. It performs a regression task. Regression models a target prediction value based on independent variables. It is mostly used for finding out the relationship between variables and forecasting. Reference:https://machinelearningmastery.com/linear-regression-for-machine-learning/

In [ ]:
from sklearn.linear_model import LinearRegression
In [ ]:
model = LinearRegression()
In [ ]:
%%time

model.fit(x_train,train_targets)
In [ ]:
%%time

model.predict(x_train)
In [ ]:
train_targets
In [ ]:
from sklearn.metrics import mean_squared_error
mean_squared_error(model.predict(x_train),train_targets,squared=False)
In [ ]:
mean_squared_error(model.predict(x_val),val_targets,squared=False)

XGBoost¶

XGBoost is an powerful approach for solving regresion models.Since we are dealing regression problems, we are now going to use xgboost model Reference:https://xgboost.readthedocs.io/en/latest/

In [ ]:
import xgboost

print(xgboost.__version__)
In [ ]:
from xgboost import XGBRegressor
model =  XGBRegressor(n_jobs=-1,random_state=42)
In [ ]:
%%time

model.fit(x_train,train_targets)
In [ ]:
%%time

model.predict(x_train)
In [ ]:
mean_squared_error(model.predict(x_train),train_targets,squared=False)
In [ ]:
mean_squared_error(model.predict(x_val),val_targets,squared=False)

Now let's visualize the model

In [ ]:
# %pip install graphviz
In [ ]:
# from xgboost import plot_tree
# matplotlib.rcParams['figure.figsize']  = 20,20
# plot_tree(model,rankdir='LR')
In [ ]:
# matplotlib.rcParams['figure.figsize']  = 20,20
# plot_tree(model,rankdir='LR',num_trees=19)
In [ ]:
trees = model.get_booster().get_dump()
print(trees[0])

Feature importance¶

Let's create a new data frame which contains the fields and their respective Importance given by the model

In [ ]:
importance_df = pd.DataFrame({
    'Feature' : numeric_col+enc_col,
    'Importance':model.feature_importances_
}).sort_values('Importance',ascending=False)
In [ ]:
matplotlib.rcParams['figure.figsize'] = 10,10
sns.barplot((importance_df['Feature'], importance_df['Importance']))

We can see that dropoff_longitude has the most importance given by the model

Hyperparameter tuning¶

Now we have created a model, next we have to tune the model using the parameters so that we can reduce the loss Refer:https://towardsdatascience.com/xgboost-fine-tune-and-optimize-your-model-23d996fab663

max_depth¶

In [ ]:
def test_param(**params):
  model = XGBRegressor(n_jobs=-1,random_state=42,**params)
  model.fit(x_train,train_targets)
  train_rmse = mean_squared_error(model.predict(x_train),train_targets,squared=False)
  val_rmse = mean_squared_error(model.predict(x_val),val_targets,squared=False)
  print(train_rmse," ",val_rmse)
  return train_rmse,val_rmse
In [ ]:
def test_param_and_plot(param_name, param_values):
    train_errors, val_errors = [], [] 
    for value in param_values:
        params = {param_name: value}
        train_rmse, val_rmse = test_param(**params)
        train_errors.append(train_rmse)
        val_errors.append(val_rmse)
    plt.figure(figsize=(10,6))
    plt.title('Overfitting curve: ' + param_name)
    plt.plot(param_values, train_errors, 'b-o')
    plt.plot(param_values, val_errors, 'r-o')
    plt.xlabel(param_name)
    plt.ylabel('RMSE')
    plt.legend(['Training', 'Validation'])
In [ ]:
test_param(max_depth=2)
In [ ]:
test_param(max_depth=5)
In [ ]:
test_param(max_depth=10)
In [ ]:
#test_param_and_plot('max_depth',[x for x in range(0,60,10)])

n_estimators¶

In [ ]:
test_param(n_estimators=10)
In [ ]:
test_param(n_estimators=20)
In [ ]:
test_param(n_estimators=30)
In [ ]:
test_param(n_estimators=40)
In [ ]:
test_param(n_estimators=50)
In [ ]:
test_param(n_estimators=50,max_depth=30)
In [ ]:
test_param(n_estimators=60,max_depth=30)
In [ ]:
test_param_and_plot('n_estimators',[x for x in range(100,1100,100)])

learning_rate¶

In [ ]:
test_param(learning_rate=0.01)
In [ ]:
test_param(learning_rate=0.1)
In [ ]:
test_param(learning_rate=0.3)
In [ ]:
test_param(learning_rate=0.5)
In [ ]:
test_param(learning_rate=0.9)
In [ ]:
test_param(learning_rate=1)
In [ ]:
test_param(learning_rate=1,max_depth=60,n_estimators=60)
In [ ]:
test_param(learning_rate=1,max_depth=60,n_estimators=100)

booster¶

In [ ]:
test_param(booster='gblinear')
In [ ]:
test_param(booster='gblinear', max_depth=60, n_estimators=100, learning_rate=1)
In [ ]:
test_param(booster='gbtree',max_depth=60,n_estimators=100,learning_rate=1)

Putting it together¶

In [ ]:
model = XGBRegressor(n_jobs=-1,random_state=42,max_depth=60,n_estimaotrs=100,learning_rate=0.9,booster='gbtree')
In [ ]:
model.fit(x_train,train_targets)
In [ ]:
mean_squared_error(model.predict(x_train),train_targets,squared=False)
In [ ]:
mean_squared_error(model.predict(x_val),val_targets,squared=False)
In [ ]:
mean_squared_error(model.predict(x_test),test_targets,squared=False)

Making predictions on new inputs¶

Let's define a helper function which takes in a model and a input and then computes the trip duration

In [ ]:
# from scikit import imputer
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(missing_values=np.nan, strategy='mean')

def predict_input(model, single_input):
    input_df = pd.DataFrame([single_input])
    imputer.fit(input_df[numeric_col])
    input_df[numeric_col] = imputer.transform(input_df[numeric_col])
    input_df[numeric_col] = scaler.transform(input_df[numeric_col])
    input_df[enc_col] = encoder.transform(input_df[cate_col])
    X_input = input_df[numeric_col + enc_col]
    pred = model.predict(X_input)[0]
    return pred
In [ ]:
inp = {
    'vendor_id':1,
    'pickup_at':'2016-04-01 13:39:13.964110080',
    'dropoff_at':'2016-04-01 14:00:13.964110080',    
    'passenger_count':6,
    # 'pickup_latitude':40.7,
    # 'dropoff_longitude':-73,
    # 'dropoff_latitude':40.7,
    'store_and_fwd_flag':'N',
    'store_and_fwd_flag_N':1,
    'store_and_fwd_flag_Y':0
}
predict_input(model,inp)

So from the given input it's seen that the predicted durationn time 318.1897 seconds