In [ ]:
%stop_session
In [ ]:
%glue_ray
%additional_python_modules xgboost==1.7.2,xgboost_ray
%min_workers 25
%number_of_workers 25
%object_memory_worker 10
In [ ]:
import sys

print(sys.version)
In [ ]:
import ray

dataset = ray.data.read_parquet([
#    's3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-all-years/', # fails with "disk full" error
    's3://dsoaws/nyc-taxi-orig-cleaned-parquet-all-years/', 
    ], 
#    columns=columns,
#    filter=(pa.field("total_amount") >= 0.0) # this worked briefly, then stopped working for some reason
)

dataset.count()
In [ ]:
dataset.schema
In [ ]:
# import glob
# from xgboost_ray import RayDMatrix, RayFileType
# import ray
# from ray.train.xgboost import XGBoostTrainer
# from ray.air.config import ScalingConfig
# from ray.air.config import RunConfig

# # We can also pass a list of files
# #path = list(sorted(glob.glob("/root/aws-samples-for-ray/glue/data.parquet")))

# # This argument will be passed to `pd.read_parquet()`
# columns = [
#     "passenger_count",
#     "trip_distance", 
#     # "pickup_longitude", # these are part of some years but not others
#     # "pickup_latitude",
#     # "dropoff_longitude", 
#     # "dropoff_latitude",
#     "payment_type",
#     "fare_amount", 
#     "extra", 
#     "mta_tax", 
#     "tip_amount",
#     "tolls_amount", 
#     "total_amount"
# ]
# # .drop_columns(cols=["vendor_id", "pickup_at", "dropoff_at", "rate_code_id", "store_and_fwd_flag", "payment_type"]) \

# # filter out any rows missing the total_amount column
# # (the other option is to 
# #import pyarrow as pa
# # filter_expr = (
# #     (pa.dataset.field("total_amount") >= 0.0)
# # )

# # ray.init(
# #     _system_config={
# #         "max_io_workers": 4,  # More IO workers for remote storage.
# #         "min_spilling_size": 100 * 1024 * 1024,  # Spill at least 100MB at a time.
# #         "object_spilling_config": json.dumps(
# #             {
# #               "type": "smart_open",
# #               "params": {
# #                 "uri": "s3://dsoaws/ray-glue-xgboost-nyc-taxi-spill"
# #               },
# #               "buffer_size": 100 * 1024 * 1024,  # Use a 100MB buffer for writes
# #             },
# #         )
# #     },
# # )

# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2019/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2018/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2017/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2016/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2015/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2014/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2013/")
# # [SUCCEEDED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2012/")

# # Note:  I removed [FAILED] directories from s3://dsoaws-databricks/...

# # [FAILED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2011/")
# # [FAILED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2010/")
# # [FAILED] df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/2009/")

# # df = spark.read.option("recursiveFileLookup", "true").parquet("s3://dsoaws-databricks/nyc-taxi/")

# # dataset = ray.data.read_parquet([
# #     's3://dsoaws/nyc-taxi/ride-metadata/year=2012/',
# #     ], 
# #     columns=columns,
# # #    filter=(pa.field("total_amount") >= 0.0) # this worked briefly, then stopped working for some reason
# # )

# dataset.count()
In [ ]:
# import pandas as pd

# def map_fn(batch: pd.DataFrame) -> pd.DataFrame:
#     batch = batch[batch.total_amount.notnull()]
#     return batch

# dataset = dataset.map_batches(map_fn)

# dataset.count()
In [ ]:
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

print(f'Train dataset size is {train_dataset.count()} and Validation dataset size is {valid_dataset.count()}.')
In [ ]:
num_workers = 25
# train_dataset.repartition(num_workers)
# valid_dataset.repartition(num_workers)
In [ ]:
from xgboost_ray import RayDMatrix, RayFileType
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=num_workers, #int(num_workers * 8 * 0.8),
        # Whether to use GPU acceleration.
#        trainer_resources={"CPU": 0},
        resources_per_worker={"CPU": 6},
        use_gpu=False,
        # Leave enough CPU for I/O.  80% is a good choice. 
        # (Ray will warn you if you don't leave enough CPU for I/O.)
        _max_cpu_fraction_per_node = 0.8
    ),
    run_config=RunConfig(local_dir="/tmp/ray_results"),
    label_column='total_amount',
    num_boost_round=5,
    params={
        # XGBoost specific params
        "objective": "reg:squarederror",
        "eval_metric": ["rmse", "mae"]
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
)

result = trainer.fit()
In [ ]:
print(result.metrics)
In [ ]:
with open('/tmp/ray_results/XGBoostTrainer_2023-01-09_21-11-06/XGBoostTrainer_21b7c_00000_0_2023-01-09_21-11-06/error.txt', 'r') as f:
    print(f.read())
In [ ]: