polars_arrow/legacy/kernels/rolling/no_nulls/
mean.rs1use polars_error::polars_ensure;
2
3use super::*;
4
5pub struct MeanWindow<'a, T> {
6 sum: SumWindow<'a, T>,
7}
8
9impl<
10 'a,
11 T: NativeType + IsFloat + std::iter::Sum + AddAssign + SubAssign + Div<Output = T> + NumCast,
12 > RollingAggWindowNoNulls<'a, T> for MeanWindow<'a, T>
13{
14 fn new(slice: &'a [T], start: usize, end: usize, params: Option<RollingFnParams>) -> Self {
15 Self {
16 sum: SumWindow::new(slice, start, end, params),
17 }
18 }
19
20 unsafe fn update(&mut self, start: usize, end: usize) -> Option<T> {
21 let sum = self.sum.update(start, end).unwrap_unchecked();
22 Some(sum / NumCast::from(end - start).unwrap())
23 }
24}
25
26pub fn rolling_mean<T>(
27 values: &[T],
28 window_size: usize,
29 min_periods: usize,
30 center: bool,
31 weights: Option<&[f64]>,
32 _params: Option<RollingFnParams>,
33) -> PolarsResult<ArrayRef>
34where
35 T: NativeType + Float + std::iter::Sum<T> + SubAssign + AddAssign + IsFloat,
36{
37 let offset_fn = match center {
38 true => det_offsets_center,
39 false => det_offsets,
40 };
41 match weights {
42 None => rolling_apply_agg_window::<MeanWindow<_>, _, _>(
43 values,
44 window_size,
45 min_periods,
46 offset_fn,
47 None,
48 ),
49 Some(weights) => {
50 let mut wts = no_nulls::coerce_weights(weights);
52 let wsum = wts.iter().fold(T::zero(), |acc, x| acc + *x);
53 polars_ensure!(
54 wsum != T::zero(),
55 ComputeError: "Weighted mean is undefined if weights sum to 0"
56 );
57 wts.iter_mut().for_each(|w| *w = *w / wsum);
58 no_nulls::rolling_apply_weights(
59 values,
60 window_size,
61 min_periods,
62 offset_fn,
63 no_nulls::compute_sum_weights,
64 &wts,
65 )
66 },
67 }
68}