polars_core/chunked_array/ops/
apply.rs

1//! Implementations of the ChunkApply Trait.
2use std::borrow::Cow;
3
4use crate::chunked_array::arity::{unary_elementwise, unary_elementwise_values};
5use crate::chunked_array::cast::CastOptions;
6use crate::prelude::*;
7use crate::series::IsSorted;
8
9impl<T> ChunkedArray<T>
10where
11    T: PolarsDataType,
12{
13    /// Applies a function only to the non-null elements, propagating nulls.
14    pub fn apply_nonnull_values_generic<'a, U, K, F>(
15        &'a self,
16        dtype: DataType,
17        mut op: F,
18    ) -> ChunkedArray<U>
19    where
20        U: PolarsDataType,
21        F: FnMut(T::Physical<'a>) -> K,
22        U::Array: ArrayFromIterDtype<K> + ArrayFromIterDtype<Option<K>>,
23    {
24        let iter = self.downcast_iter().map(|arr| {
25            if arr.null_count() == 0 {
26                let out: U::Array = arr
27                    .values_iter()
28                    .map(&mut op)
29                    .collect_arr_with_dtype(dtype.to_arrow(CompatLevel::newest()));
30                out.with_validity_typed(arr.validity().cloned())
31            } else {
32                let out: U::Array = arr
33                    .iter()
34                    .map(|opt| opt.map(&mut op))
35                    .collect_arr_with_dtype(dtype.to_arrow(CompatLevel::newest()));
36                out.with_validity_typed(arr.validity().cloned())
37            }
38        });
39
40        ChunkedArray::from_chunk_iter(self.name().clone(), iter)
41    }
42
43    /// Applies a function only to the non-null elements, propagating nulls.
44    pub fn try_apply_nonnull_values_generic<'a, U, K, F, E>(
45        &'a self,
46        mut op: F,
47    ) -> Result<ChunkedArray<U>, E>
48    where
49        U: PolarsDataType,
50        F: FnMut(T::Physical<'a>) -> Result<K, E>,
51        U::Array: ArrayFromIter<K> + ArrayFromIter<Option<K>>,
52    {
53        let iter = self.downcast_iter().map(|arr| {
54            let arr = if arr.null_count() == 0 {
55                let out: U::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
56                out.with_validity_typed(arr.validity().cloned())
57            } else {
58                let out: U::Array = arr
59                    .iter()
60                    .map(|opt| opt.map(&mut op).transpose())
61                    .try_collect_arr()?;
62                out.with_validity_typed(arr.validity().cloned())
63            };
64            Ok(arr)
65        });
66
67        ChunkedArray::try_from_chunk_iter(self.name().clone(), iter)
68    }
69
70    pub fn apply_into_string_amortized<'a, F>(&'a self, mut f: F) -> StringChunked
71    where
72        F: FnMut(T::Physical<'a>, &mut String),
73    {
74        let mut buf = String::new();
75        let chunks = self
76            .downcast_iter()
77            .map(|arr| {
78                let mut mutarr = MutablePlString::with_capacity(arr.len());
79                arr.iter().for_each(|opt| match opt {
80                    None => mutarr.push_null(),
81                    Some(v) => {
82                        buf.clear();
83                        f(v, &mut buf);
84                        mutarr.push_value(&buf)
85                    },
86                });
87                mutarr.freeze()
88            })
89            .collect::<Vec<_>>();
90        ChunkedArray::from_chunk_iter(self.name().clone(), chunks)
91    }
92
93    pub fn try_apply_into_string_amortized<'a, F, E>(&'a self, mut f: F) -> Result<StringChunked, E>
94    where
95        F: FnMut(T::Physical<'a>, &mut String) -> Result<(), E>,
96    {
97        let mut buf = String::new();
98        let chunks = self
99            .downcast_iter()
100            .map(|arr| {
101                let mut mutarr = MutablePlString::with_capacity(arr.len());
102                for opt in arr.iter() {
103                    match opt {
104                        None => mutarr.push_null(),
105                        Some(v) => {
106                            buf.clear();
107                            f(v, &mut buf)?;
108                            mutarr.push_value(&buf)
109                        },
110                    };
111                }
112                Ok(mutarr.freeze())
113            })
114            .collect::<Vec<_>>();
115        ChunkedArray::try_from_chunk_iter(self.name().clone(), chunks)
116    }
117}
118
119fn apply_in_place_impl<S, F>(name: PlSmallStr, chunks: Vec<ArrayRef>, f: F) -> ChunkedArray<S>
120where
121    F: Fn(S::Native) -> S::Native + Copy,
122    S: PolarsNumericType,
123{
124    use arrow::Either::*;
125    let chunks = chunks.into_iter().map(|arr| {
126        let owned_arr = arr
127            .as_any()
128            .downcast_ref::<PrimitiveArray<S::Native>>()
129            .unwrap()
130            .clone();
131        // Make sure we have a single ref count coming in.
132        drop(arr);
133
134        let compute_immutable = |arr: &PrimitiveArray<S::Native>| {
135            arrow::compute::arity::unary(arr, f, S::get_dtype().to_arrow(CompatLevel::newest()))
136        };
137
138        if owned_arr.values().is_sliced() {
139            compute_immutable(&owned_arr)
140        } else {
141            match owned_arr.into_mut() {
142                Left(immutable) => compute_immutable(&immutable),
143                Right(mut mutable) => {
144                    let vals = mutable.values_mut_slice();
145                    vals.iter_mut().for_each(|v| *v = f(*v));
146                    mutable.into()
147                },
148            }
149        }
150    });
151
152    ChunkedArray::from_chunk_iter(name, chunks)
153}
154
155impl<T: PolarsNumericType> ChunkedArray<T> {
156    /// Cast a numeric array to another numeric data type and apply a function in place.
157    /// This saves an allocation.
158    pub fn cast_and_apply_in_place<F, S>(&self, f: F) -> ChunkedArray<S>
159    where
160        F: Fn(S::Native) -> S::Native + Copy,
161        S: PolarsNumericType,
162    {
163        // if we cast, we create a new arrow buffer
164        // then we clone the arrays and drop the cast arrays
165        // this will ensure we have a single ref count
166        // and we can mutate in place
167        let chunks = {
168            let s = self
169                .cast_with_options(&S::get_dtype(), CastOptions::Overflowing)
170                .unwrap();
171            s.chunks().clone()
172        };
173        apply_in_place_impl(self.name().clone(), chunks, f)
174    }
175
176    /// Cast a numeric array to another numeric data type and apply a function in place.
177    /// This saves an allocation.
178    pub fn apply_in_place<F>(mut self, f: F) -> Self
179    where
180        F: Fn(T::Native) -> T::Native + Copy,
181    {
182        let chunks = std::mem::take(&mut self.chunks);
183        apply_in_place_impl(self.name().clone(), chunks, f)
184    }
185}
186
187impl<T: PolarsNumericType> ChunkedArray<T> {
188    pub fn apply_mut<F>(&mut self, f: F)
189    where
190        F: Fn(T::Native) -> T::Native + Copy,
191    {
192        // SAFETY, we do no t change the lengths
193        unsafe {
194            self.downcast_iter_mut()
195                .for_each(|arr| arrow::compute::arity_assign::unary(arr, f))
196        };
197        // can be in any order now
198        self.compute_len();
199        self.set_sorted_flag(IsSorted::Not);
200    }
201}
202
203impl<'a, T> ChunkApply<'a, T::Native> for ChunkedArray<T>
204where
205    T: PolarsNumericType,
206{
207    type FuncRet = T::Native;
208
209    fn apply_values<F>(&'a self, f: F) -> Self
210    where
211        F: Fn(T::Native) -> T::Native + Copy,
212    {
213        let chunks = self
214            .data_views()
215            .zip(self.iter_validities())
216            .map(|(slice, validity)| {
217                let arr: T::Array = slice.iter().copied().map(f).collect_arr();
218                arr.with_validity(validity.cloned())
219            });
220        ChunkedArray::from_chunk_iter(self.name().clone(), chunks)
221    }
222
223    fn apply<F>(&'a self, f: F) -> Self
224    where
225        F: Fn(Option<T::Native>) -> Option<T::Native> + Copy,
226    {
227        let chunks = self.downcast_iter().map(|arr| {
228            let iter = arr.into_iter().map(|opt_v| f(opt_v.copied()));
229            PrimitiveArray::<T::Native>::from_trusted_len_iter(iter)
230        });
231        Self::from_chunk_iter(self.name().clone(), chunks)
232    }
233
234    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
235    where
236        F: Fn(Option<T::Native>, &V) -> V,
237    {
238        assert!(slice.len() >= self.len());
239
240        let mut idx = 0;
241        self.downcast_iter().for_each(|arr| {
242            arr.into_iter().for_each(|opt_val| {
243                // SAFETY:
244                // length asserted above
245                let item = unsafe { slice.get_unchecked_mut(idx) };
246                *item = f(opt_val.copied(), item);
247                idx += 1;
248            })
249        });
250    }
251}
252
253impl<'a> ChunkApply<'a, bool> for BooleanChunked {
254    type FuncRet = bool;
255
256    fn apply_values<F>(&self, f: F) -> Self
257    where
258        F: Fn(bool) -> bool + Copy,
259    {
260        // Can just fully deduce behavior from two invocations.
261        match (f(false), f(true)) {
262            (false, false) => self.apply_kernel(&|arr| {
263                Box::new(
264                    BooleanArray::full(arr.len(), false, ArrowDataType::Boolean)
265                        .with_validity(arr.validity().cloned()),
266                )
267            }),
268            (false, true) => self.clone(),
269            (true, false) => !self,
270            (true, true) => self.apply_kernel(&|arr| {
271                Box::new(
272                    BooleanArray::full(arr.len(), true, ArrowDataType::Boolean)
273                        .with_validity(arr.validity().cloned()),
274                )
275            }),
276        }
277    }
278
279    fn apply<F>(&'a self, f: F) -> Self
280    where
281        F: Fn(Option<bool>) -> Option<bool> + Copy,
282    {
283        unary_elementwise(self, f)
284    }
285
286    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
287    where
288        F: Fn(Option<bool>, &T) -> T,
289    {
290        assert!(slice.len() >= self.len());
291
292        let mut idx = 0;
293        self.downcast_iter().for_each(|arr| {
294            arr.into_iter().for_each(|opt_val| {
295                // SAFETY:
296                // length asserted above
297                let item = unsafe { slice.get_unchecked_mut(idx) };
298                *item = f(opt_val, item);
299                idx += 1;
300            })
301        });
302    }
303}
304
305impl StringChunked {
306    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
307    where
308        F: FnMut(&'a str) -> &'a str,
309    {
310        let chunks = self.downcast_iter().map(|arr| {
311            let iter = arr.values_iter().map(&mut f);
312            let new = Utf8ViewArray::arr_from_iter(iter);
313            new.with_validity(arr.validity().cloned())
314        });
315        StringChunked::from_chunk_iter(self.name().clone(), chunks)
316    }
317}
318
319impl BinaryChunked {
320    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
321    where
322        F: FnMut(&'a [u8]) -> &'a [u8],
323    {
324        let chunks = self.downcast_iter().map(|arr| {
325            let iter = arr.values_iter().map(&mut f);
326            let new = BinaryViewArray::arr_from_iter(iter);
327            new.with_validity(arr.validity().cloned())
328        });
329        BinaryChunked::from_chunk_iter(self.name().clone(), chunks)
330    }
331}
332
333impl<'a> ChunkApply<'a, &'a str> for StringChunked {
334    type FuncRet = Cow<'a, str>;
335
336    fn apply_values<F>(&'a self, f: F) -> Self
337    where
338        F: Fn(&'a str) -> Cow<'a, str> + Copy,
339    {
340        unary_elementwise_values(self, f)
341    }
342
343    fn apply<F>(&'a self, f: F) -> Self
344    where
345        F: Fn(Option<&'a str>) -> Option<Cow<'a, str>> + Copy,
346    {
347        unary_elementwise(self, f)
348    }
349
350    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
351    where
352        F: Fn(Option<&'a str>, &T) -> T,
353    {
354        assert!(slice.len() >= self.len());
355
356        let mut idx = 0;
357        self.downcast_iter().for_each(|arr| {
358            arr.into_iter().for_each(|opt_val| {
359                // SAFETY:
360                // length asserted above
361                let item = unsafe { slice.get_unchecked_mut(idx) };
362                *item = f(opt_val, item);
363                idx += 1;
364            })
365        });
366    }
367}
368
369impl<'a> ChunkApply<'a, &'a [u8]> for BinaryChunked {
370    type FuncRet = Cow<'a, [u8]>;
371
372    fn apply_values<F>(&'a self, f: F) -> Self
373    where
374        F: Fn(&'a [u8]) -> Cow<'a, [u8]> + Copy,
375    {
376        unary_elementwise_values(self, f)
377    }
378
379    fn apply<F>(&'a self, f: F) -> Self
380    where
381        F: Fn(Option<&'a [u8]>) -> Option<Cow<'a, [u8]>> + Copy,
382    {
383        unary_elementwise(self, f)
384    }
385
386    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
387    where
388        F: Fn(Option<&'a [u8]>, &T) -> T,
389    {
390        assert!(slice.len() >= self.len());
391
392        let mut idx = 0;
393        self.downcast_iter().for_each(|arr| {
394            arr.into_iter().for_each(|opt_val| {
395                // SAFETY:
396                // length asserted above
397                let item = unsafe { slice.get_unchecked_mut(idx) };
398                *item = f(opt_val, item);
399                idx += 1;
400            })
401        });
402    }
403}
404
405impl ChunkApplyKernel<BooleanArray> for BooleanChunked {
406    fn apply_kernel(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> Self {
407        let chunks = self.downcast_iter().map(f).collect();
408        unsafe { Self::from_chunks(self.name().clone(), chunks) }
409    }
410
411    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> ChunkedArray<S>
412    where
413        S: PolarsDataType,
414    {
415        let chunks = self.downcast_iter().map(f).collect();
416        unsafe { ChunkedArray::<S>::from_chunks(self.name().clone(), chunks) }
417    }
418}
419
420impl<T> ChunkApplyKernel<PrimitiveArray<T::Native>> for ChunkedArray<T>
421where
422    T: PolarsNumericType,
423{
424    fn apply_kernel(&self, f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef) -> Self {
425        self.apply_kernel_cast(&f)
426    }
427    fn apply_kernel_cast<S>(
428        &self,
429        f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef,
430    ) -> ChunkedArray<S>
431    where
432        S: PolarsDataType,
433    {
434        let chunks = self.downcast_iter().map(f).collect();
435        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
436    }
437}
438
439impl ChunkApplyKernel<Utf8ViewArray> for StringChunked {
440    fn apply_kernel(&self, f: &dyn Fn(&Utf8ViewArray) -> ArrayRef) -> Self {
441        self.apply_kernel_cast(&f)
442    }
443
444    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&Utf8ViewArray) -> ArrayRef) -> ChunkedArray<S>
445    where
446        S: PolarsDataType,
447    {
448        let chunks = self.downcast_iter().map(f).collect();
449        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
450    }
451}
452
453impl ChunkApplyKernel<BinaryViewArray> for BinaryChunked {
454    fn apply_kernel(&self, f: &dyn Fn(&BinaryViewArray) -> ArrayRef) -> Self {
455        self.apply_kernel_cast(&f)
456    }
457
458    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BinaryViewArray) -> ArrayRef) -> ChunkedArray<S>
459    where
460        S: PolarsDataType,
461    {
462        let chunks = self.downcast_iter().map(f).collect();
463        unsafe { ChunkedArray::from_chunks(self.name().clone(), chunks) }
464    }
465}
466
467impl<'a> ChunkApply<'a, Series> for ListChunked {
468    type FuncRet = Series;
469
470    /// Apply a closure `F` elementwise.
471    fn apply_values<F>(&'a self, f: F) -> Self
472    where
473        F: Fn(Series) -> Series + Copy,
474    {
475        if self.is_empty() {
476            return self.clone();
477        }
478        let mut fast_explode = true;
479        let mut function = |s: Series| {
480            let out = f(s);
481            if out.is_empty() {
482                fast_explode = false;
483            }
484            out
485        };
486        let mut ca: ListChunked = {
487            if !self.has_nulls() {
488                self.into_no_null_iter()
489                    .map(&mut function)
490                    .collect_trusted()
491            } else {
492                self.into_iter()
493                    .map(|opt_v| opt_v.map(&mut function))
494                    .collect_trusted()
495            }
496        };
497        if fast_explode {
498            ca.set_fast_explode()
499        }
500        ca
501    }
502
503    fn apply<F>(&'a self, f: F) -> Self
504    where
505        F: Fn(Option<Series>) -> Option<Series> + Copy,
506    {
507        if self.is_empty() {
508            return self.clone();
509        }
510        self.into_iter().map(f).collect_trusted()
511    }
512
513    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
514    where
515        F: Fn(Option<Series>, &T) -> T,
516    {
517        assert!(slice.len() >= self.len());
518
519        let mut idx = 0;
520        self.downcast_iter().for_each(|arr| {
521            arr.iter().for_each(|opt_val| {
522                let opt_val = opt_val
523                    .map(|arrayref| Series::try_from((PlSmallStr::EMPTY, arrayref)).unwrap());
524
525                // SAFETY:
526                // length asserted above
527                let item = unsafe { slice.get_unchecked_mut(idx) };
528                *item = f(opt_val, item);
529                idx += 1;
530            })
531        });
532    }
533}
534
535#[cfg(feature = "object")]
536impl<'a, T> ChunkApply<'a, &'a T> for ObjectChunked<T>
537where
538    T: PolarsObject,
539{
540    type FuncRet = T;
541
542    fn apply_values<F>(&'a self, f: F) -> Self
543    where
544        F: Fn(&'a T) -> T + Copy,
545    {
546        let mut ca: ObjectChunked<T> = self.into_iter().map(|opt_v| opt_v.map(f)).collect();
547        ca.rename(self.name().clone());
548        ca
549    }
550
551    fn apply<F>(&'a self, f: F) -> Self
552    where
553        F: Fn(Option<&'a T>) -> Option<T> + Copy,
554    {
555        let mut ca: ObjectChunked<T> = self.into_iter().map(f).collect();
556        ca.rename(self.name().clone());
557        ca
558    }
559
560    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
561    where
562        F: Fn(Option<&'a T>, &V) -> V,
563    {
564        assert!(slice.len() >= self.len());
565        let mut idx = 0;
566        self.downcast_iter().for_each(|arr| {
567            arr.into_iter().for_each(|opt_val| {
568                // SAFETY:
569                // length asserted above
570                let item = unsafe { slice.get_unchecked_mut(idx) };
571                *item = f(opt_val, item);
572                idx += 1;
573            })
574        });
575    }
576}
577
578impl StringChunked {
579    /// # Safety
580    /// Update the views. All invariants of the views apply.
581    pub unsafe fn apply_views<F: FnMut(View, &str) -> View + Copy>(&self, update_view: F) -> Self {
582        let mut out = self.clone();
583        for arr in out.downcast_iter_mut() {
584            *arr = arr.apply_views(update_view);
585        }
586        out
587    }
588}