Kmeans with Polars

Sun 18 August 2024
rusty python logo

The goal of this article is to use the rust polars directly to perform a common machine learning task I usually do using python.

The task

Let's choose a simple task: use of K-means. To do so, we should:

  1. retrieve a dataset
  2. make some preprocessing
  3. perform kmeans
  4. draw a confusion matrix

The code

The polar rust API documentation is way less documented than the python API.

Data retrieval

This part is quite simple. Yet, to practice, I decided to retrieve data, write it into a temporary CSV file and read it back from the temporary file.

Let's import what we want:

use std::io::Write;
use polars::prelude::*;
use tempfile::tempfile;
use tempfile::NamedTempFile;

and write some code. This code is straight forward. Given the chosen dataset (usual wine dataset) does not have headers, I had to add the column names manually.

// get dataset
let data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data";
let body = reqwest::blocking::get(data_url)?.text()?;

// write dataset to file
let mut data_file = tempfile()?;
write!(data_file, "{}", &body)?;

let temp_file = NamedTempFile::new()?;
write!(&temp_file, "{}", &body)?;
// dbg!(temp_file.into_temp_path());

// read CSV file
// let lf = CsvReader::new(data_file);
let lf = CsvReadOptions::default()
    .with_has_header(false)
    .into_reader_with_file_handle(data_file);
let mut df: DataFrame = lf.finish()?;
let columns = vec![
    "class",
    "Alcohol",
    "Malic acid",
    "Ash",
    "Alcalinity of ash",
    "Magnesium",
    "Total phenols",
    "Flavanoids",
    "Nonflavanoid phenols",
    "Proanthocyanins",
    "Color intensity",
    "Hue",
    "OD280/OD315 of diluted wines",
    "Proline",
];
df.set_column_names(&columns)?;

Preprocessing

Description

There is no describe() method in the rust API. I decided to build my own version, partially to practice rust, partially because I didn't want to spend time navigating through the python code and the not complete rust documentation (for instance, when searching for standard deviation computation in a Series, the parameter ddof was not documented in the rust API but it was in the python API).

I created a wrapping structure around the DataFrame and implemented my methods. I compute several statistical indicator for each columns, collect them in Series, concatenate them in a DataFrame.

I don't put all the (repetitive) code (full code available here):

struct DataFramePreproc(DataFrame);

impl DataFramePreproc {
    fn describe(&self) -> Result<DataFrame> {
        let df = &self.0;
        let df_min = DataFrame::new(
            df.iter()
                .map(|s| {
                    s.min_reduce()
                        .expect("data frame should not be empty")
                        .into_series(s.name())
                        .cast(&DataType::Float64) // Cast to avoid mixing i64 and f64
                        .expect("cast from int to float should go smoothly")
                })
                .collect::<Vec<_>>(),
        )?;

...
        let df_std = DataFrame::new(
            // ddof argument of std documented at
            // https://github.com/pola-rs/polars/blob/daf2e4983b6d94b06f2eaa3a77c2e02c112f5675/py-polars/polars/expr/list.py#L300
            df.iter()
                .map(|s| {
                    s.std_reduce(1)
                        .expect("data frame should not be empty")
                        .into_series(s.name())
                        .cast(&DataType::Float64)
                        .expect("cast from int to float should go smoothly")
                })
                .collect::<Vec<_>>(),
        )?;

        let df_mean = DataFrame::new(
            df.iter()
                .map(|s| {
                    s.mean_reduce()
                        .into_series(s.name())
                        .cast(&DataType::Float64)
                        .expect("cast from int to float should go smoothly")
                })
                .collect::<Vec<_>>(),
        )?;

        let result = concat(
            [
                df_min.lazy(),
                df_q1.lazy(),
                df_median.lazy(),
                df_q3.lazy(),
                df_max.lazy(),
                df_mean.lazy(),
                df_std.lazy(),
            ],
            UnionArgs::default(),
        )?
        .collect()?;
        let labels = df!(""=>["min", "q1", "med", "q3", "max", "avg", "std dev"])?;
        Ok(polars::functions::concat_df_horizontal(&[labels, result])?)
    }

}

Normalization

The preprocessing parts for the K-means mainly consists in normalizing the data.

First the Series:

Let's define several way to normalize

#[derive(Debug, Copy, Clone)]
enum Normalization {
    /// Bring standard deviation to 1 and average to zero
    Standard,
    /// Bring values in [0, 1]
    MinMax,
    /// Bring q1 to -1 and q3 to 1
    Quartiles,
    /// Bring average to zero
    Center,
}

Let's implement those normalization. I wrapped the Series into a new structure. I tried to cast in float as all real numerical values should be castable in float and the needed transformation result in float whatever the initial value (integer, float,...)

struct SeriePreproc(Series);

impl SeriePreproc {
    fn normalize(&self, kind: Normalization) -> Result<Series> {
        match kind {
            Normalization::Standard => Ok(self.standard()?.with_name(self.0.name())),
            Normalization::MinMax => Ok(self.minmax()?.with_name(self.0.name())),
            Normalization::Quartiles => Ok(self.quartiles()?.with_name(self.0.name())),
            Normalization::Center => Ok(self.center()?.with_name(self.0.name())),
        }
    }

    fn standard(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let mean = s.mean_reduce().as_any_value().extract::<f64>().unwrap();
        let std = s.std(1).expect("Serie should not be empty");
        Ok(s.iter()
            .map(|elt| (elt.extract::<f64>().unwrap() - mean) / std)
            .collect())
    }

    fn minmax(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let min = s.min_reduce()?.as_any_value().extract::<f64>().unwrap();
        let max = s.max_reduce()?.as_any_value().extract::<f64>().unwrap();
        Ok(s.iter()
            .map(|elt| (elt.extract::<f64>().unwrap() - min) / (max - min))
            .collect())
    }
    fn quartiles(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let q1 = s
            .quantile_reduce(0.25, QuantileInterpolOptions::Linear)?
            .as_any_value()
            .extract::<f64>()
            .unwrap();
        let q3 = s
            .quantile_reduce(0.75, QuantileInterpolOptions::Linear)?
            .as_any_value()
            .extract::<f64>()
            .unwrap();
        let center = (q1 + q3) / 2_f64;
        let inter_q = q3 - q1;
        Ok(s.iter()
            .map(|elt| (elt.extract::<f64>().unwrap() - center) / inter_q)
            .collect())
    }
    fn center(&self) -> Result<Series> {
        let s = &self.0.cast(&DataType::Float64)?;
        let mean = s.mean_reduce().as_any_value().extract::<f64>().unwrap();
        Ok(s.iter()
            .map(|elt| elt.extract::<f64>().unwrap() - mean)
            .collect())
    }
}

Then the DataFrame

impl DataFramePreproc {
    fn normalize(&self, kind: Normalization) -> Result<DataFrame> {
        let df = DataFrame::new(
            self.0
                .iter()
                .map(|s| SeriePreproc(s.clone()).normalize(kind).unwrap())
                .collect::<Vec<_>>(),
        )?;
        Ok(df)
    }
}

KMeans

For type simplifications, and because I didn't want to use mix generics and AnyValue, I choose to represent class labels with i64.

I write a Kmeans function. This function assumes the dataset contains a "class" column, and perform Kmeans on the dataset without this column.

use linfa::prelude::Predict; // to use Kmeans.predict()
use linfa::traits::Fit;
use linfa::DatasetBase;
use linfa_clustering::KMeans;
use linfa_nn::distance::L2Dist;

/// kmeans
fn k_means(data: &DataFrame, n_cluster: usize) -> Result<KMeans<f64, L2Dist>> {
    let cols = data.get_column_names();
    let classes = data
        .get(cols.iter().position(|s| s == &"class").unwrap())
        .expect("classes should be provided");
    let data = data.drop("class")?;
    let data = DatasetBase::new(
        data.to_ndarray::<Float64Type>(IndexOrder::default())?,
        classes,
    );
    let model = KMeans::params(n_cluster).fit(&data).expect("data fitted");
    Ok(model)
}

then, I use this function to get classes:

let model = k_means(&df, 4)?;
let pred = model.predict(DatasetBase::from(
    df.drop("class")?
        .to_ndarray::<Float64Type>(IndexOrder::default())?,
));

let results = DataFrame::new(vec![
    df.column("class")?.clone().rename("truth").clone(),
    pred.targets()
        .iter()
        .map(|&s| s as i64)
        .collect::<Series>()
        .rename("prediction")
        .clone(),
])?;

Now the result DataFrame contains 2 columns: one with the truth and another with the prediction.

Confusion matrix

I choose to create a wrapping structure over the result DataFrame. The result dataframe consists in 2 columns: one with the actual value and the other one with the prediction.

/// Classification results
/// 2 columns data frame.
///
/// | truth | prediction |
/// |-------|------------|
/// | ...   | ...        |
///
#[derive(Debug)]
struct ClassificationResult(DataFrame);

Reordering

Given the random nature of the initialization, the classes should be reordered (no control is exercised over the name or order of classes).

To do so, I compute true positive for the result class:

impl ClassificationResult {
    fn tp(&self) -> usize {
        let truth = self.0.column("truth").unwrap();
        let prediction = self.0.column("prediction").unwrap();
        zip(truth.iter(), prediction.iter())
            .filter(|(ref t, ref p)| t == p)
            .count()
    }
}

and tried to swap result class names to obtain the maximum true positive.

impl ClassificationResult {

    /// Reassign classes to match truth
    ///
    /// clustering may assign classes that are not the original one.
    /// this function tries to match result classes to oringinal classes
    /// by swapping result classes
    fn reorder_classes(&self) -> Self {
        let classes = self
            .classes()
            .iter()
            .map(|c| c.try_extract::<i64>().unwrap())
            .collect();
        let predicted = self.0.column("prediction").unwrap();

        get_classes_swaps(classes)
            .into_iter()
            .map(|swap| {
                Self(
                    df!("truth" => self.0.column("truth").unwrap().clone(),
                    "prediction" => swap_series(predicted.clone(), swap))
                    .unwrap(),
                )
            })
            .max_by_key(|result| result.tp())
            .unwrap()
    }
}

Confusion matrix is then computed by counting each tuple (truth, prediction).

I don't know the rustcean way to iterate over rows thus I iterate over row indexes.

For the row and column names, I used the class name prefixed with either truth or prediction. For the total row and columns, an sum aggregation is performed. The row/columns are then inserted. (a Dataframe is buildable from a Vec<Series>).

impl ClassificationResult {


    /// Compute confusion matrix
    ///
    /// |                    |                        prediction                   |
    /// |                    | class A | class B | class C | ... | class N | total |
    /// |         | class A  |         |         |         | ... |         |       |
    /// |         | class B  |         |         |         | ... |         |       |
    /// | truth   | class C  |         |         |         | ... |         |       |
    /// |         | ...      | ...     | ...     | ...     | ... | ...     |       |
    /// |         | class N  |         |         |         | ... |         |       |
    /// |         | total    |         |         |         |     |         |       |
    fn confusion_matrix(&self) -> DataFrame {
        let classes = self.classes();

        let res = self.reorder_classes();

        let mut counts = HashMap::new(); // {(truth, prediction): count}
        for idx in 0..res.0.shape().0 {
            let row = res.0.get_row(idx).expect("idx is a valid index");
            *counts
                .entry((row.0[0].clone(), row.0[1].clone()))
                .or_insert(0) += 1;
        }
        let result = DataFrame::new(
            classes
                .clone()
                .into_iter()
                .map(|c| {
                    Series::new(
                        &format!("prediction\n{c}"),
                        classes
                            .clone()
                            .into_iter()
                            .map(|d| counts.get(&(d, c.clone())).unwrap_or(&0))
                            .copied()
                            .collect::<Vec<_>>(),
                    )
                })
                .collect(),
        )
        .unwrap();

        let mut idx_label = classes
            .iter()
            .map(|c| format!("truth\n{c}"))
            .collect::<Vec<_>>();
        idx_label.push("total".into());

        // row total
        let mut total =
            df!("total" => result.iter().map(|s| {s.iter().map(|c|c.try_extract::<i64>().unwrap()).sum::<i64>()})
                .collect::<Vec<_>>()
            )
            .unwrap()
            .transpose(None, None)
            .unwrap();
        let b = total.clone();
        let names =
            zip(b.iter().map(|c| c.name()), result.iter().map(|c| c.name())).collect::<Vec<_>>();
        for (old, new) in names {
            total = total.rename(old, new).unwrap().clone();
        }
        let mut result = result
            .lazy()
            .cast_all(DataType::Int64, true)
            .collect()
            .unwrap()
            .vstack(&total)
            .unwrap();

        // column total
        let mut row = result.get_row(0).unwrap();
        let mut total: Vec<i64> = Vec::with_capacity(result.shape().0);
        for idx in 0..result.shape().0 {
            let _ = result.get_row_amortized(idx, &mut row);
            total.push(row.0.iter().map(|c| c.try_extract::<i64>().unwrap()).sum())
        }
        result
            .insert_column(result.shape().1, Series::new("total", total))
            .unwrap();

        // labels
        result
            .insert_column(0, idx_label.into_iter().collect::<Series>())
            .unwrap();
        result
    }
}

Final thoughts

Here is the final code. There is also a version on my github.

Rust is not made for data exploration. Given the strictness of types, extending dataframes, iterating on dataframes, hesitating between lazyframes and dataframes increase the feedback loop.

In short: perform your exploration using python and then, when your data pipeline is designed and stabilized, consider rewriting it in rust.

Category: how to Tagged: rust maths how to machine learning k-means polars


Cache implementation using weakref

Fri 30 April 2021
Bird's cache (Photo credit: Wikipedia)

This article presents a quick example of how to use weakref to implement a home-made cache mechanism.

Let's use the following use case:

  • Let's consider items:
    • items can be stored on a storage
    • items can be retrieved from storage
    • items are identify by an ID …

Category: how to Tagged: python cache weakref

Read More

Tkinter and Asyncio

Thu 18 February 2021
Asynchronous process results waiting (Photo credit: Wikipedia)

Graphical interfaces are typically the kind of object that can take advantage of asynchrounous programming as a GUI spend lot of time waiting for user input.

Tkinter <https://docs.python.org/3/library/tkinter.html#module-tkinter>_ is a kind of standard for …

Category: how to Tagged: python asyncio

Read More

Travis setup

Tue 12 May 2020
One job in continuous integration pipeline (Photo credit: Wikipedia)

The goal is to setup a CI pipeline based on Travis with external dependencies integrated to a Github repository

Travis basics

To enable Travis integration in Github, one must edit ./.travis.yml file.

I won't go into detail. The setup is …

Category: how to Tagged: travis ci how to

Read More

Wikidata crawling

Sun 26 April 2020
Graph database representation (Photo credit: Wikipedia)

I wish to have reliable data about vehicles. I decided to rely on one large source, namely Wikipedia. I chose it because it is reviewable and most of the time reviewed, and regularly updated and completed.

Wikipedia - Wikidata relationship

Wikidata items are made to …

Category: how to Tagged: python wikipedia wikidata html

Read More

awesome global shortcut

Mon 04 January 2016
Multimedia keyboard

Multimedia keyboard (Photo credit: Wikipedia)

The awesome window manager does not provide GUI configuration tool.

Here is a litte how to to provide a feature using global shortcut, illustrated with wolume control.

Defining and identifying the feature and the shortcut

The wanted feature is usually accessible via the CLI . For …

Category: how to Tagged: alsa ArchLinux awesome Configuration file FAQs Help and Tutorials Unix window manager tools unix-like

Read More
Page 1 of 3

Next »