Kmeans with Polars

Sun 18 August 2024
rusty python logo

The goal of this article is to use the rust polars directly to perform a common machine learning task I usually do using python.

The task

Let's choose a simple task: use of K-means. To do so, we should:

  1. retrieve a dataset
  2. make some preprocessing
  3. perform kmeans
  4. draw a confusion matrix

The code

The polar rust API documentation is way less documented than the python API.

Data retrieval

This part is quite simple. Yet, to practice, I decided to retrieve data, write it into a temporary CSV file and read it back from the temporary file.

Let's import what we want:

use std::io::Write;
use polars::prelude::*;
use tempfile::tempfile;
use tempfile::NamedTempFile;

and write some code. This code is straight forward. Given the chosen dataset (usual wine dataset) does not have headers, I had to add the column names manually.

// get dataset
let data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data";
let body = reqwest::blocking::get(data_url)?.text()?;

// write dataset to file
let mut data_file = tempfile()?;
write!(data_file, "{}", &body)?;

let temp_file = NamedTempFile::new()?;
write!(&temp_file, "{}", &body)?;
// dbg!(temp_file.into_temp_path());

// read CSV file
// let lf = CsvReader::new(data_file);
let lf = CsvReadOptions::default()
    .with_has_header(false)
    .into_reader_with_file_handle(data_file);
let mut df: DataFrame = lf.finish()?;
let columns = vec![
    "class",
    "Alcohol",
    "Malic acid",
    "Ash",
    "Alcalinity of ash",
    "Magnesium",
    "Total phenols",
    "Flavanoids",
    "Nonflavanoid phenols",
    "Proanthocyanins",
    "Color intensity",
    "Hue",
    "OD280/OD315 of diluted wines",
    "Proline",
];
df.set_column_names(&columns)?;

Preprocessing

Description

There is no describe() method in the rust API. I decided to build my own version, partially to practice rust, partially because I didn't want to spend time navigating through the python code and the not complete rust documentation (for instance, when searching for standard deviation computation in a Series, the parameter ddof was not documented in the rust API but it was in the python API).

I created a wrapping structure around the DataFrame and implemented my methods. I compute several statistical indicator for each columns, collect them in Series, concatenate them in a DataFrame.

I don't put all the (repetitive) code (full code available here):

struct DataFramePreproc(DataFrame);

impl DataFramePreproc {
    fn describe(&self) -> Result<DataFrame> {
        let df = &self.0;
        let df_min = DataFrame::new(
            df.iter()
                .map(|s| {
                    s.min_reduce()
                        .expect("data frame should not be empty")
                        .into_series(s.name())
                        .cast(&DataType::Float64) // Cast to avoid mixing i64 and f64
                        .expect("cast from int to float should go smoothly")
                })
                .collect::<Vec<_>>(),
        )?;

...
        let df_std = DataFrame::new(
            // ddof argument of std documented at
            // https://github.com/pola-rs/polars/blob/daf2e4983b6d94b06f2eaa3a77c2e02c112f5675/py-polars/polars/expr/list.py#L300
            df.iter()
                .map(|s| {
                    s.std_reduce(1)
                        .expect("data frame should not be empty")
                        .into_series(s.name())
                        .cast(&DataType::Float64)
                        .expect("cast from int to float should go smoothly")
                })
                .collect::<Vec<_>>(),
        )?;

        let df_mean = DataFrame::new(
            df.iter()
                .map(|s| {
                    s.mean_reduce()
                        .into_series(s.name())
                        .cast(&DataType::Float64)
                        .expect("cast from int to float should go smoothly")
                })
                .collect::<Vec<_>>(),
        )?;

        let result = concat(
            [
                df_min.lazy(),
                df_q1.lazy(),
                df_median.lazy(),
                df_q3.lazy(),
                df_max.lazy(),
                df_mean.lazy(),
                df_std.lazy(),
            ],
            UnionArgs::default(),
        )?
        .collect()?;
        let labels = df!(""=>["min", "q1", "med", "q3", "max", "avg", "std dev"])?;
        Ok(polars::functions::concat_df_horizontal(&[labels, result])?)
    }

}

Normalization

The preprocessing parts for the K-means mainly consists in normalizing the data.

First the Series:

Let's define several way to normalize

#[derive(Debug, Copy, Clone)]
enum Normalization {
    /// Bring standard deviation to 1 and average to zero
    Standard,
    /// Bring values in [0, 1]
    MinMax,
    /// Bring q1 to -1 and q3 to 1
    Quartiles,
    /// Bring average to zero
    Center,
}

Let's implement those normalization. I wrapped the Series into a new structure. I tried to cast in float as all real numerical values should be castable in float and the needed transformation result in float whatever the initial value (integer, float,...)

struct SeriePreproc(Series);

impl SeriePreproc {
    fn normalize(&self, kind: Normalization) -> Result<Series> {
        match kind {
            Normalization::Standard => Ok(self.standard()?.with_name(self.0.name())),
            Normalization::MinMax => Ok(self.minmax()?.with_name(self.0.name())),
            Normalization::Quartiles => Ok(self.quartiles()?.with_name(self.0.name())),
            Normalization::Center => Ok(self.center()?.with_name(self.0.name())),
        }
    }

    fn standard(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let mean = s.mean_reduce().as_any_value().extract::<f64>().unwrap();
        let std = s.std(1).expect("Serie should not be empty");
        Ok(s.iter()
            .map(|elt| (elt.extract::<f64>().unwrap() - mean) / std)
            .collect())
    }

    fn minmax(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let min = s.min_reduce()?.as_any_value().extract::<f64>().unwrap();
        let max = s.max_reduce()?.as_any_value().extract::<f64>().unwrap();
        Ok(s.iter()
            .map(|elt| (elt.extract::<f64>().unwrap() - min) / (max - min))
            .collect())
    }
    fn quartiles(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let q1 = s
            .quantile_reduce(0.25, QuantileInterpolOptions::Linear)?
            .as_any_value()
            .extract::<f64>()
            .unwrap();
        let q3 = s
            .quantile_reduce(0.75, QuantileInterpolOptions::Linear)?
            .as_any_value()
            .extract::<f64>()
            .unwrap();
        let center = (q1 + q3) / 2_f64;
        let inter_q = q3 - q1;
        Ok(s.iter()
            .map(|elt| (elt.extract::<f64>().unwrap() - center) / inter_q)
            .collect())
    }
    fn center(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let mean = s.mean_reduce().as_any_value().extract::<f64>().unwrap();
        Ok(s.iter()
            .map(|elt| elt.extract::<f64>().unwrap() - mean)
            .collect())
    }
}

Then the DataFrame

impl DataFramePreproc {
    fn normalize(&self, kind: Normalization) -> Result<DataFrame> {
        let df = DataFrame::new(
            self.0
                .iter()
                .map(|s| SeriePreproc(s.clone()).normalize(kind).unwrap())
                .collect::<Vec<_>>(),
        )?;
        Ok(df)
    }
}

KMeans

For type simplifications, and because I didn't want to use mix generics and AnyValue, I choose to represent class labels with i64.

I write a Kmeans function. This function assumes the dataset contains a "class" column, and perform Kmeans on the dataset without this column.

use linfa::prelude::Predict; // to use Kmeans.predict()
use linfa::traits::Fit;
use linfa::DatasetBase;
use linfa_clustering::KMeans;
use linfa_nn::distance::L2Dist;

/// kmeans
fn k_means(data: &DataFrame, n_cluster: usize) -> Result<KMeans<f64, L2Dist>> {
    let cols = data.get_column_names();
    let classes = data
        .get(cols.iter().position(|s| s == &"class").unwrap())
        .expect("classes should be provided");
    let data = data.drop("class")?;
    let data = DatasetBase::new(
        data.to_ndarray::<Float64Type>(IndexOrder::default())?,
        classes,
    );
    let model = KMeans::params(n_cluster).fit(&data).expect("data fitted");
    Ok(model)
}

then, I use this function to get classes:

let model = k_means(&df, 4)?;
let pred = model.predict(DatasetBase::from(
    df.drop("class")?
        .to_ndarray::<Float64Type>(IndexOrder::default())?,
));

let results = DataFrame::new(vec![
    df.column("class")?.clone().rename("truth").clone(),
    pred.targets()
        .iter()
        .map(|&s| s as i64)
        .collect::<Series>()
        .rename("prediction")
        .clone(),
])?;

Now the result DataFrame contains 2 columns: one with the truth and another with the prediction.

Confusion matrix

I choose to create a wrapping structure over the result DataFrame. The result dataframe consists in 2 columns: one with the actual value and the other one with the prediction.

/// Classification results
/// 2 columns data frame.
///
/// | truth | prediction |
/// |-------|------------|
/// | ...   | ...        |
///
#[derive(Debug)]
struct ClassificationResult(DataFrame);

Reordering

Given the random nature of the initialization, the classes should be reordered (no control is exercised over the name or order of classes).

To do so, I compute true positive for the result class:

impl ClassificationResult {
    fn tp(&self) -> usize {
        let truth = self.0.column("truth").unwrap();
        let prediction = self.0.column("prediction").unwrap();
        zip(truth.iter(), prediction.iter())
            .filter(|(ref t, ref p)| t == p)
            .count()
    }
}

and tried to swap result class names to obtain the maximum true positive.

impl ClassificationResult {

    /// Reassign classes to match truth
    ///
    /// clustering may assign classes that are not the original one.
    /// this function tries to match result classes to oringinal classes
    /// by swapping result classes
    fn reorder_classes(&self) -> Self {
        let classes = self
            .classes()
            .iter()
            .map(|c| c.try_extract::<i64>().unwrap())
            .collect();
        let predicted = self.0.column("prediction").unwrap();

        get_classes_swaps(classes)
            .into_iter()
            .map(|swap| {
                Self(
                    df!("truth" => self.0.column("truth").unwrap().clone(),
                    "prediction" => swap_series(predicted.clone(), swap))
                    .unwrap(),
                )
            })
            .max_by_key(|result| result.tp())
            .unwrap()
    }
}

Confusion matrix is then computed by counting each tuple (truth, prediction).

I don't know the rustcean way to iterate over rows thus I iterate over row indexes.

For the row and column names, I used the class name prefixed with either truth or prediction. For the total row and columns, an sum aggregation is performed. The row/columns are then inserted. (a Dataframe is buildable from a Vec<Series>).

impl ClassificationResult {


    /// Compute confusion matrix
    ///
    /// |                    |                        prediction                   |
    /// |                    | class A | class B | class C | ... | class N | total |
    /// |         | class A  |         |         |         | ... |         |       |
    /// |         | class B  |         |         |         | ... |         |       |
    /// | truth   | class C  |         |         |         | ... |         |       |
    /// |         | ...      | ...     | ...     | ...     | ... | ...     |       |
    /// |         | class N  |         |         |         | ... |         |       |
    /// |         | total    |         |         |         |     |         |       |
    fn confusion_matrix(&self) -> DataFrame {
        let classes = self.classes();

        let res = self.reorder_classes();

        let mut counts = HashMap::new(); // {(truth, prediction): count}
        for idx in 0..res.0.shape().0 {
            let row = res.0.get_row(idx).expect("idx is a valid index");
            *counts
                .entry((row.0[0].clone(), row.0[1].clone()))
                .or_insert(0) += 1;
        }
        let result = DataFrame::new(
            classes
                .clone()
                .into_iter()
                .map(|c| {
                    Series::new(
                        &format!("prediction\n{c}"),
                        classes
                            .clone()
                            .into_iter()
                            .map(|d| counts.get(&(d, c.clone())).unwrap_or(&0))
                            .copied()
                            .collect::<Vec<_>>(),
                    )
                })
                .collect(),
        )
        .unwrap();

        let mut idx_label = classes
            .iter()
            .map(|c| format!("truth\n{c}"))
            .collect::<Vec<_>>();
        idx_label.push("total".into());

        // row total
        let mut total =
            df!("total" => result.iter().map(|s| {s.iter().map(|c|c.try_extract::<i64>().unwrap()).sum::<i64>()})
                .collect::<Vec<_>>()
            )
            .unwrap()
            .transpose(None, None)
            .unwrap();
        let b = total.clone();
        let names =
            zip(b.iter().map(|c| c.name()), result.iter().map(|c| c.name())).collect::<Vec<_>>();
        for (old, new) in names {
            total = total.rename(old, new).unwrap().clone();
        }
        let mut result = result
            .lazy()
            .cast_all(DataType::Int64, true)
            .collect()
            .unwrap()
            .vstack(&total)
            .unwrap();

        // column total
        let mut row = result.get_row(0).unwrap();
        let mut total: Vec<i64> = Vec::with_capacity(result.shape().0);
        for idx in 0..result.shape().0 {
            let _ = result.get_row_amortized(idx, &mut row);
            total.push(row.0.iter().map(|c| c.try_extract::<i64>().unwrap()).sum())
        }
        result
            .insert_column(result.shape().1, Series::new("total", total))
            .unwrap();

        // labels
        result
            .insert_column(0, idx_label.into_iter().collect::<Series>())
            .unwrap();
        result
    }
}

Final thoughts

Here is the final code. There is also a version on my github.

Rust is not made for data exploration. Given the strictness of types, extending dataframes, iterating on dataframes, hesitating between lazyframes and dataframes increase the feedback loop.

In short: perform your exploration using python and then, when your data pipeline is designed and stabilized, consider rewriting it in rust.

Related articles (or not):

Category: how to Tagged: rust maths how to machine learning k-means polars