In this example I prepare a dataset with python and then use pyspark (Apache Spark) to perform machine learning to predict the quality of wine based on a selection of values.
#import libraries and open csv.
import pandas as pd
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_csv('WineQuality.csv')
df.head()
Unnamed: 0 | fixed acidity | volatile acidity | citric acid | residual sugar | chlorides | free sulfur dioxide | total sulfur dioxide | density | pH | sulphates | alcohol | quality | Type | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2732 | 7.4 | 0.170 | 0.29 | 1.4 | 0.047 | 23.0 | 107.0 | 0.99390 | 3.52 | 0.65 | 10.4 | 6 | White Wine |
1 | 2607 | 5.3 | 0.310 | 0.38 | 10.5 | 0.031 | 53.0 | 140.0 | 0.99321 | 3.34 | 0.46 | 11.7 | 6 | White Wine |
2 | 1653 | 4.7 | 0.145 | 0.29 | 1.0 | 0.042 | 35.0 | 90.0 | 0.99080 | 3.76 | 0.49 | 11.3 | 6 | White Wine |
3 | 3264 | 6.9 | 0.260 | 0.29 | 4.2 | 0.043 | 33.0 | 114.0 | 0.99020 | 3.16 | 0.31 | 12.5 | 6 | White Wine |
4 | 4931 | 6.4 | 0.450 | 0.07 | 1.1 | 0.030 | 10.0 | 131.0 | 0.99050 | 2.97 | 0.28 | 10.8 | 5 | White Wine |
#dropping unnecesary column
df = df.drop(['Unnamed: 0'],axis=1)
# OHE a catergory for white/red wine.
df = pd.get_dummies(df, columns=["Type"])
# list of coumns that need scaling for numeric values.
numeric = ['fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide',
'total sulfur dioxide','density','pH','sulphates','alcohol']
#scaling numeric values
scaler = StandardScaler()
scaler.fit(df[numeric])
df_scaled = df
df_scaled[numeric] = scaler.transform(df[numeric])
df_scaled.sample()
fixed acidity | volatile acidity | citric acid | residual sugar | chlorides | free sulfur dioxide | total sulfur dioxide | density | pH | sulphates | alcohol | quality | Type_Red Wine | Type_White Wine | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
4527 | 1.517559 | -0.667772 | -0.126245 | -0.904056 | -0.724843 | 0.541904 | -0.295036 | -0.603167 | -1.728764 | -1.556726 | -0.067773 | 6 | 0 | 1 |
#making 'target' the last column
df_scaled['target'] = df_scaled['quality']
df_scaled = df_scaled.drop(['quality'], axis =1)
df_scaled.head()
fixed acidity | volatile acidity | citric acid | residual sugar | chlorides | free sulfur dioxide | total sulfur dioxide | density | pH | sulphates | alcohol | Type_Red Wine | Type_White Wine | target | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.141618 | -1.031607 | -0.195139 | -0.841545 | -0.261102 | -0.423577 | -0.153330 | -0.271528 | 1.863943 | 0.796858 | -0.067773 | 0 | 1 | 6 |
1 | -1.463647 | -0.182657 | 0.424908 | 1.054626 | -0.724843 | 1.280213 | 0.431204 | -0.500359 | 0.748965 | -0.480802 | 1.024074 | 0 | 1 | 6 |
2 | -1.922294 | -1.183206 | -0.195139 | -0.924893 | -0.406021 | 0.257939 | -0.454454 | -1.299610 | 3.350580 | -0.279066 | 0.688121 | 0 | 1 | 6 |
3 | -0.240588 | -0.485854 | -0.195139 | -0.258108 | -0.377038 | 0.144353 | -0.029338 | -1.498594 | -0.366013 | -1.489481 | 1.695980 | 0 | 1 | 6 |
4 | -0.622794 | 0.666293 | -1.710810 | -0.904056 | -0.753827 | -1.161886 | 0.271785 | -1.399102 | -1.542935 | -1.691217 | 0.268180 | 0 | 1 | 5 |
#writing to csv.
df_scaled.to_csv('processed_wine_quality.csv',index=False)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.pipeline import Pipeline
# Create a Spark session
spark = SparkSession.builder.appName("AdvancedMLExample").getOrCreate()
# Load the dataset
data = spark.read.csv("processed_wine_quality.csv", header=True, inferSchema=True)
# Prepare the data for training
feature_columns = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)
# Create regression models
lr = LinearRegression(featuresCol="features", labelCol="target")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="target")
rf = RandomForestRegressor(featuresCol="features", labelCol="target")
# Define a parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
.addGrid(dt.maxDepth, [5, 10, 15]) \
.addGrid(rf.numTrees, [10, 20, 30]) \
.build()
# Create an evaluator
evaluator = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
# Create a cross-validator for hyperparameter tuning
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)
# Train the models and perform hyperparameter tuning
lr_model = lr.fit(train_data)
cv_model = cv.fit(train_data)
rf_model = rf.fit(train_data)
23/08/12 12:52:20 WARN Instrumentation: [d9df0392] regParam is zero, which might cause numerical instability and overfitting. 23/08/12 12:52:20 WARN Instrumentation: [d9df0392] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
# Make predictions
lr_predictions = lr_model.transform(test_data)
cv_predictions = cv_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)
# Evaluate the models
lr_rmse = evaluator.evaluate(lr_predictions)
cv_rmse = evaluator.evaluate(cv_predictions)
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"Linear Regression RMSE: {lr_rmse}")
print(f"Decision Tree RMSE: {cv_rmse}")
print(f"Random Forest RMSE: {rf_rmse}")
Linear Regression RMSE: 0.7208100551468412 Decision Tree RMSE: 0.30245773677258897 Random Forest RMSE: 0.6821307647041012
# Print the best parameters from cross-validation
best_dt_model = cv_model.bestModel
print("Best Decision Tree Max Depth:", best_dt_model.getMaxDepth())
Best Decision Tree Max Depth: 15
# Stop the Spark session
spark.stop()
I was able to make a decision tree model with a significantly low rmse. The other two models unsurprisingly were slightly more prone to errors, however I beleive this is a good result which was aided by the data preprocessing.
I would count am rmse of 0.3 for this project a sucess.