%stop_session
%additional_python_modules #xgboost==1.7.1
#%extra_jars spark-deep-learning-1.5.0-spark2.4-s_2.11.jar
%number_of_workers 10
import sys
print(sys.version)
# %help
df_ride_metadata = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws/nyc-taxi/ride-metadata/")
# The following command caches the DataFrame in memory. This improves performance since subsequent calls to the DataFrame can read from memory instead of re-reading the data from disk.
#df.cache()
# #df.repartition(1).write.option("overwrite", "true").option("header", "true").csv("s3://dsoaws/nyc-taxi-csv/")
# df.repartition(1).write.option("overwrite", "true").parquet("s3://dsoaws/nyc-taxi-single-parquet-all/")
df.show(10)
print("The dataset has %d rows." % df.count())
The goal is to predict the total_amount
(typically called the fare
) of each ride. To simplify the pre-processing, we may want to drop certain features like pickup_at
and dropoff_at
since taxi fares do not depend on the time of day, typically (unlike ride-share fares like Uber and Lyft).
We may also want to drop unused fields like store_and_fwd_flag
which is an edge case where the taxi-meter was disconnected during the trip. This should not impact the fare.
TODO: Describe why we should drop the rate_code_id
- or otherwise explain how it could be used.
Lastly, the payment_type
are not useful for this predictive model as the fare should not depend on how the user is paying.
# Data type string of column vendor_id is not supported.
# Data type timestamp of column pickup_at is not supported.
# Data type timestamp of column dropoff_at is not supported.
# Data type string of column rate_code_id is not supported.
# Data type string of column store_and_fwd_flag is not supported.
# Data type string of column payment_type is not supported.
df = df.drop("vendor_id").drop("pickup_at").drop("dropoff_at").drop("rate_code_id").drop("store_and_fwd_flag").drop("payment_type")
df.show()
df.printSchema()
Randomly split data into training and test sets. By doing this, you can train and tune the model using only the training subset, and then evaluate the model's performance on the test set to get a sense of how the model will perform on new data.
# Split the dataset randomly into 70% for training and 30% for testing. Passing a seed for deterministic behavior
train, test = df.randomSplit([0.7, 0.3], seed = 0)
print("There are %d training examples and %d test examples." % (train.count(), test.count()))
You can plot the data to explore it visually. The following plot shows the number of bicycle rentals during each hour of the day. As you might expect, rentals are low during the night, and peak at commute hours.
To create plots, call display()
on a DataFrame in Databricks and click the plot icon below the table.
To create the plot shown, run the command in the following cell. The results appear in a table. From the drop-down menu below the table, select "Line". Click Plot Options.... In the dialog, drag hr
to the Keys field, and drag cnt
to the Values field. Also in the Keys field, click the "x" next to <id>
to remove it. In the Aggregation drop down, select "AVG".
# train.select("passenger_count", "total_amount").show()
Now that you have reviewed the data and prepared it as a DataFrame with numeric values, you're ready to train a model to predict future bike sharing rentals.
Most MLlib algorithms require a single input column containing a vector of features and a single target column. The DataFrame currently has one column for each feature. MLlib provides functions to help you prepare the dataset in the required format.
MLlib pipelines combine multiple steps into a single workflow, making it easier to iterate as you develop the model.
In this example, you create a pipeline using the following functions:
VectorAssembler
: Assembles the feature columns into a feature vector.VectorIndexer
: Identifies columns that should be treated as categorical. This is done heuristically, identifying any column with a small number of distinct values as categorical.SparkXGBRegressor
: Uses the SparkXGBRegressor estimator to learn how to predict the fare from the feature vectors.CrossValidator
: The XGBoost regression algorithm has several hyperparameters. This notebook illustrates how to use hyperparameter tuning in Spark. This capability automatically tests a grid of hyperparameters and chooses the best resulting model.For more information:
VectorAssembler
VectorIndexer
The first step is to create the VectorAssembler and VectorIndexer steps.
from pyspark.ml.feature import VectorAssembler, VectorIndexer
# Remove the target column from the input feature set.
featuresCols = df.columns
featuresCols.remove('total_amount')
# vectorAssembler combines all feature columns into a single feature vector column, "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures", handleInvalid="skip")
# vectorIndexer identifies categorical features and indexes them, and creates a new column "features".
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=100, handleInvalid="skip")
Next, define the model. To use distributed training, set num_workers
to the number of spark tasks you want to concurrently run during training xgboost model.
!pip install xgboost==1.6.2
import xgboost
print(xgboost.__version__)
from xgboost.spark import SparkXGBRegressor
# The next step is to define the model training stage of the pipeline.
# The following command defines a XgboostRegressor model that takes an input column "features" by default and learns to predict the labels in the "cnt" column.
# Set `num_workers` to the number of spark tasks you want to concurrently run during training xgboost model.
xgb_regressor = SparkXGBRegressor(label_col="total_amount", missing=0.0)
The third step is to wrap the model you just defined in a CrossValidator
stage. CrossValidator
calls the XgboostRegressor estimator with different hyperparameter settings. It trains multiple models and selects the best one, based on minimizing a specified metric. In this example, the metric is root mean squared error (RMSE).
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
# - maxDepth: maximum depth of each decision tree
# - maxIter: iterations, or the total number of trees
paramGrid = ParamGridBuilder()\
.addGrid(xgb_regressor.max_depth, [2, 5])\
.addGrid(xgb_regressor.n_estimators, [10, 100])\
.build()
# Define an evaluation metric. The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
evaluator = RegressionEvaluator(metricName="rmse",
labelCol=xgb_regressor.getLabelCol(),
predictionCol=xgb_regressor.getPredictionCol())
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=xgb_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid)
Create the pipeline.
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])
Train the pipeline.
Now that you have set up the workflow, you can train the pipeline with a single call.
When you call fit()
, the pipeline runs feature processing, model tuning, and training and returns a fitted pipeline with the best model it found.
This step takes several minutes.
pipelineModel = pipeline.fit(train)
The final step is to use the fitted model to make predictions on the test dataset and evaluate the model's performance. The model's performance on the test dataset provides an approximation of how it is likely to perform on new data.
Computing evaluation metrics is important for understanding the quality of predictions, as well as for comparing models and tuning parameters.
predictions = pipelineModel.transform(test)
predictions = pipelineModel.transform(test)
predictions.select("total_amount", "prediction", *featuresCols).show()
The transform()
method of the pipeline model applies the full pipeline to the input dataset. The pipeline applies the feature processing steps to the dataset and then uses the fitted XGBoost Regressor model to make predictions. The pipeline returns a DataFrame with a new column predictions
.
A common way to evaluate the performance of a regression model is the calculate the root mean squared error (RMSE). The value is not very informative on its own, but you can use it to compare different models. CrossValidator
determines the best model by selecting the one that minimizes RMSE.
rmse = evaluator.evaluate(predictions)
print("RMSE on our test set: %g" % rmse)
rmse = evaluator.evaluate(predictions)
print("RMSE on our test set: %g" % rmse)
You can also plot the results, as you did the original dataset. In this case, the hourly count of rentals shows a similar shape.
predictions.select("passenger_count", "prediction").show()
#%%sh
#rm -rf /dbfs/tmp/xgboost/pipeline_001
#rm -rf /dbfs/tmp/xgboost/pipelineModel_001
# Save the pipeline that created the model
pipeline.save('/tmp/xgboost/pipeline_001')
# Save the model itself
pipelineModel.save('/tmp/xgboost/pipelineModel_001')
# Load the pipeline
loaded_pipeline = Pipeline.load('/tmp/xgboost/pipeline_001')
# Load and use the model
from pyspark.ml import PipelineModel
loaded_pipelineModel = PipelineModel.load('/tmp/xgboost/pipelineModel_001')
# To represent new data, use the first 3 rows of the test dataset
new_data = test.limit(3)
# Make predictions with the loaded model
new_preds = loaded_pipelineModel.transform(new_data)
display(new_preds.select("total_amount", "prediction", *featuresCols))