Machine Learning With Big Data and CRM Framework
The synthetic dataset, while small (10 records), provides a representative sample of real-world data challenges that require meticulous handling before model training can proceed. The project used the Python programming language and its standard data manipulation libraries, such as Pandas, to execute the following steps, drawing on techniques from Python Machine Learning for practical application, as shown in the table below. This process is designed to be a repeatable and automated pipeline stage.
# Conceptual Python code snippet used in implementation
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib # Added for saving/loading model
# Assume X_train, X_test, y_train, y_test are prepared
# X = [MonthlyCharges, TotalCharges, Contract_Oneyear, Contract_Twoyear]
model = LogisticRegression(random_state=42)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
# Save the model and preprocessing pipeline for deployment
joblib.dump(model, 'churn_model.pkl')
# Model coefficients provide insight into churn drivers
# (For example, longer contracts correlate with lower churn risk)
This project successfully established a foundational, end-to-end operational methodology for integrating machine learning (ML) capabilities into an organizational CRM strategy. The objective was to utilize ML to analyze customer payment data to predict churn. We used a synthetic dataset with deliberately introduced data quality issues to validate the robustness of the data preprocessing pipeline and automated workflow. The implementation phase leveraged practical guidance from key ML texts to clean the data, engineer relevant features, train a baseline Logistic Regression model, deploy it as a microservice, and establish monitoring hooks that generate actionable churn probability scores. While the model demonstrated perfect illustrative performance on the minimal test set, the primary achievement was the successful creation of an operational framework that seamlessly integrates with CRM operational thinking. This process transforms raw data into a strategic asset for customer retention teams. Future work involves scaling this framework to real-world datasets and exploring advanced models to maximize predictive accuracy and quantify business value through controlled experiments, such as A/B testing and continuous improvement cycles within MLOps. Ultimately, this project validates a data-driven approach to customer relationship management, moving the organization toward proactive customer engagement based on sound data science principles.
1-Abu-Mostafa; Malik Magdon-Ismail; Hausan-Tien. Learning From Data. ISBN: 9781600490064.
2-James, Gareth.; Witten, Daniela.; Hastie, Trevor.; Tibshirani, Robert. An Introduction to Statistical Learning: with Applications in R [Link to external resource]. New York, NY: Springer New York, 2013. ISBN: 978-1-4614-7138-7.
3-Provost, Foster. Data Science for Business: What You Need to Know about Data Mining and Data-analytic Thinking. O'Reilly Media, 2013.
4-Raschka, Sebastian. Python machine learning: unlock deeper insights into machine learning with this vital guide to cutting-edge predictive analysis. Birmingham: Packt Publishing Limited, 2015. ISBN: 1-78355-513-0.
[2] James, Gareth, et al. An Introduction to Statistical Learning: with Applications in R. Springer New York, 2013.
[3] Raschka, Sebastian. Python machine learning: unlock deeper insights into machine learning with this vital guide to cutting-edge predictive analysis. Packt Publishing Limited, 2015.
Original file is located at
https://colab.research.google.com/drive/1FliIzMMTye6ps-CNIzKMjWOZQFtpQQ60
"""
import os
import sys
# -----------------------------
# Step 0: Java and Spark setup are handled by previous cells.
# -----------------------------
# Step 1: Set Python for PySpark
# -----------------------------
PYSPARK_PYTHON = sys.executable
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
print(f"✅ Using Python executable: {PYSPARK_PYTHON}")
# -----------------------------
# Explicitly set Spark-related environment variables for robust initialization
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1" # Often needed in containerized environments like Colab
# Ensure SPARK_HOME is set before constructing PYSPARK_SUBMIT_ARGS
SPARK_HOME = os.environ.get("SPARK_HOME")
if SPARK_HOME is None:
print("❌ SPARK_HOME environment variable not set. Cannot configure SPARK_SUBMIT_ARGS.")
sys.exit(1)
# Constructing PYSPARK_SUBMIT_ARGS with explicit classpath
spark_jars_path = os.path.join(SPARK_HOME, "jars", "*")
pyspark_submit_args_value = f"--master local[*] --driver-class-path {spark_jars_path} --jars {spark_jars_path} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args_value
# Also ensure Spark bin is in PATH for findspark and spark-submit
os.environ["PATH"] = os.path.join(SPARK_HOME, "bin") + ":" + os.environ["PATH"]
# -----------------------------
# Re-initialize findspark to ensure environment variables are correctly picked up
import findspark
findspark.init() # SPARK_HOME will be picked from os.environ
# Import PySpark modules AFTER environment variables are configured
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# -----------------------------
# Step 2: Start Spark
# -----------------------------
# Get Spark home to construct extraClassPath (already done above for PYSPARK_SUBMIT_ARGS)
extra_classpath_config = f"file://{spark_jars_path}"
# This section requires careful indentation of chained methods
spark = SparkSession.builder \
.appName("CustomerPaymentReliability") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
print(f"✅ Spark version: {spark.version}")
# -----------------------------
# Step 3: Create Dataset
# -----------------------------
data = [
("C1001", 110.50, "Month-to-month", "2500.00", "No", "The total charges seem low for the monthly charges over time."),
("C1002", 105.00, "One year", "4500.00", "Yes", "No obvious issues."),
("C1003", 115.20, "Two year", "0.00", "No", "Incomplete/Missing: TotalCharges is 0.00."),
("C1004", 100.00, "Month-to-month", "1200", "Yes", "No obvious issues."),
("C1005", 108.75, "Month-to-month", "50000.00", "Yes", "Outlier: Total Charges unrealistically high."),
("C1006", 110.00, "One year", "6000.00", "No", "Duplicate in full dataset."),
("C1007", 102.40, "Two year", "0.00", "No", "Incomplete/Missing: Total Charges is 0.00."),
("C1008", 112.00, "Month-to-month", None, "Yes", "Missing: NaN in Total Charges."),
("C1009", 106.80, "Month-to-month", "4500.00", "No", "No obvious issues."),
("C1010", 104.50, "Two year", "100.5", "Yes", "Inaccurate: TotalCharges less than MonthlyCharges."),
]
columns = ["CustomerID", "MonthlyCharges", "Contract", "TotalCharges", "Churn", "Notes"]
df = spark.createDataFrame(data, columns)
df.show(truncate=False)
# -----------------------------
# Step 4: Data Cleaning
# -----------------------------
df_clean = df.withColumn("TotalCharges", regexp_replace(col("TotalCharges"), ",", ""))
df_clean = df_clean.withColumn("TotalCharges", col("TotalCharges").cast("double"))
median_value = df_clean.approxQuantile("TotalCharges", [0.5], 0.0)[0]
df_clean = df_clean.withColumn(
"TotalCharges",
when((col("TotalCharges").isNull()) | (col("TotalCharges") == 0), median_value)
.otherwise(col("TotalCharges"))
)
df_clean = df_clean.withColumn("label", when(col("Churn") == "Yes", 1).otherwise(0))
df_clean.show(truncate=False)
# -----------------------------
# Step 5: MapReduce Example
# -----------------------------
rdd = df_clean.rdd
mapped = rdd.map(lambda row: (row["Contract"], 1))
contract_counts = mapped.reduceByKey(lambda a, b: a + b)
print("✅ Contract counts:", contract_counts.collect())
# -----------------------------
# Step 6: Random Forest ML
# -----------------------------
contract_indexer = StringIndexer(inputCol="Contract", outputCol="ContractIndex")
df_encoded = contract_indexer.fit(df_clean).transform(df_clean)
assembler = VectorAssembler(inputCols=["MonthlyCharges", "TotalCharges", "ContractIndex"], outputCol="features")
data_features = assembler.transform(df_encoded)
train, test = data_features.randomSplit([0.8, 0.2], seed=42)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
model = rf.fit(train)
pred = model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(pred)
print(f"✅ AUC = {auc}")
pred.select("CustomerID", "features", "prediction", "probability").show(truncate=False)
# spark.stop()
print("✅ Spark session stopped. Script completed successfully.")
"""### Steg 1: Installera Java 17 och konfigurera `JAVA_HOME`
Detta installerar OpenJDK 17 och ställer in de nödvändiga miljövariablerna för PySpark att hitta Java.
"""
# Installera OpenJDK 17
!apt-get update -qq > /dev/null
!apt-get install -y openjdk-17-jdk-headless -qq > /dev/null
# Sätt JAVA_HOME miljövariabeln
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
# Uppdatera PATH för att inkludera Java bin-katalogen
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
print("✅ Java 17 installerat och JAVA_HOME konfigurerat.")
"""### Steg 2: Installera Apache Spark och konfigurera `SPARK_HOME`
Detta laddar ner och extraherar Spark och ställer in `SPARK_HOME` samt lägger till Spark bin-katalogen i `PATH`.
"""
# Ladda ner och extrahera Spark
SPARK_VERSION = "3.5.1"
HADOOP_VERSION = "3"
# Construct the full filename and folder name in Python
TAR_FILENAME = f"spark-{SPARK_VERSION}-bin-hadoop{HADOOP_VERSION}.tgz"
SPARK_FOLDER_NAME = f"spark-{SPARK_VERSION}-bin-hadoop{HADOOP_VERSION}"
SPARK_DOWNLOAD_URL = f"https://archive.apache.org/dist/spark/spark-{SPARK_VERSION}/{TAR_FILENAME}"
# Use f-strings to pass the constructed filenames directly to shell commands
!wget -q {SPARK_DOWNLOAD_URL}
!tar xf {TAR_FILENAME}
# Installera findspark för att enkelt initialisera PySpark
!pip install findspark -qq
# Sätt SPARK_HOME miljövariabeln
os.environ["SPARK_HOME"] = f"/content/{SPARK_FOLDER_NAME}"
# Uppdatera PATH för att inkludera Spark bin-katalogen
os.environ["PATH"] = os.environ["SPARK_HOME"] + "/bin:" + os.environ["PATH"]
import findspark
findspark.init()
print(f"✅ Spark {SPARK_VERSION} installerat och SPARK_HOME konfigurerat.")
"""Här är schemat för DataFrame `df`, som visar kolumnnamn och deras datatyper, följt av de första raderna i dataramen."""
df.printSchema()
df.show(truncate=False)




Soheil Kabodvand
ReplyDelete