polars_core/chunked_array/ops/
arity.rs

1use std::error::Error;
2
3use arrow::array::{Array, MutablePlString, StaticArray};
4use arrow::compute::utils::combine_validities_and;
5use polars_error::PolarsResult;
6use polars_utils::pl_str::PlSmallStr;
7
8use crate::chunked_array::flags::StatisticsFlags;
9use crate::datatypes::{ArrayCollectIterExt, ArrayFromIter};
10use crate::prelude::{ChunkedArray, CompatLevel, PolarsDataType, Series, StringChunked};
11use crate::utils::{align_chunks_binary, align_chunks_binary_owned, align_chunks_ternary};
12
13// We need this helper because for<'a> notation can't yet be applied properly
14// on the return type.
15pub trait UnaryFnMut<A1>: FnMut(A1) -> Self::Ret {
16    type Ret;
17}
18
19impl<A1, R, T: FnMut(A1) -> R> UnaryFnMut<A1> for T {
20    type Ret = R;
21}
22
23// We need this helper because for<'a> notation can't yet be applied properly
24// on the return type.
25pub trait TernaryFnMut<A1, A2, A3>: FnMut(A1, A2, A3) -> Self::Ret {
26    type Ret;
27}
28
29impl<A1, A2, A3, R, T: FnMut(A1, A2, A3) -> R> TernaryFnMut<A1, A2, A3> for T {
30    type Ret = R;
31}
32
33// We need this helper because for<'a> notation can't yet be applied properly
34// on the return type.
35pub trait BinaryFnMut<A1, A2>: FnMut(A1, A2) -> Self::Ret {
36    type Ret;
37}
38
39impl<A1, A2, R, T: FnMut(A1, A2) -> R> BinaryFnMut<A1, A2> for T {
40    type Ret = R;
41}
42
43/// Applies a kernel that produces `Array` types.
44#[inline]
45pub fn unary_kernel<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
46where
47    T: PolarsDataType,
48    V: PolarsDataType<Array = Arr>,
49    Arr: Array,
50    F: FnMut(&T::Array) -> Arr,
51{
52    let iter = ca.downcast_iter().map(op);
53    ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
54}
55
56/// Applies a kernel that produces `Array` types.
57#[inline]
58pub fn unary_kernel_owned<T, V, F, Arr>(ca: ChunkedArray<T>, op: F) -> ChunkedArray<V>
59where
60    T: PolarsDataType,
61    V: PolarsDataType<Array = Arr>,
62    Arr: Array,
63    F: FnMut(T::Array) -> Arr,
64{
65    let name = ca.name().clone();
66    let iter = ca.downcast_into_iter().map(op);
67    ChunkedArray::from_chunk_iter(name, iter)
68}
69
70#[inline]
71pub fn unary_elementwise<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
72where
73    T: PolarsDataType,
74    V: PolarsDataType,
75    F: UnaryFnMut<Option<T::Physical<'a>>>,
76    V::Array: ArrayFromIter<<F as UnaryFnMut<Option<T::Physical<'a>>>>::Ret>,
77{
78    if ca.has_nulls() {
79        let iter = ca
80            .downcast_iter()
81            .map(|arr| arr.iter().map(&mut op).collect_arr());
82        ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
83    } else {
84        let iter = ca
85            .downcast_iter()
86            .map(|arr| arr.values_iter().map(|x| op(Some(x))).collect_arr());
87        ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
88    }
89}
90
91#[inline]
92pub fn try_unary_elementwise<'a, T, V, F, K, E>(
93    ca: &'a ChunkedArray<T>,
94    mut op: F,
95) -> Result<ChunkedArray<V>, E>
96where
97    T: PolarsDataType,
98    V: PolarsDataType,
99    F: FnMut(Option<T::Physical<'a>>) -> Result<Option<K>, E>,
100    V::Array: ArrayFromIter<Option<K>>,
101{
102    let iter = ca
103        .downcast_iter()
104        .map(|arr| arr.iter().map(&mut op).try_collect_arr());
105    ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
106}
107
108#[inline]
109pub fn unary_elementwise_values<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
110where
111    T: PolarsDataType,
112    V: PolarsDataType,
113    F: UnaryFnMut<T::Physical<'a>>,
114    V::Array: ArrayFromIter<<F as UnaryFnMut<T::Physical<'a>>>::Ret>,
115{
116    if ca.null_count() == ca.len() {
117        let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow(CompatLevel::newest()));
118        return ChunkedArray::with_chunk(ca.name().clone(), arr);
119    }
120
121    let iter = ca.downcast_iter().map(|arr| {
122        let validity = arr.validity().cloned();
123        let arr: V::Array = arr.values_iter().map(&mut op).collect_arr();
124        arr.with_validity_typed(validity)
125    });
126    ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
127}
128
129#[inline]
130pub fn try_unary_elementwise_values<'a, T, V, F, K, E>(
131    ca: &'a ChunkedArray<T>,
132    mut op: F,
133) -> Result<ChunkedArray<V>, E>
134where
135    T: PolarsDataType,
136    V: PolarsDataType,
137    F: FnMut(T::Physical<'a>) -> Result<K, E>,
138    V::Array: ArrayFromIter<K>,
139{
140    if ca.null_count() == ca.len() {
141        let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow(CompatLevel::newest()));
142        return Ok(ChunkedArray::with_chunk(ca.name().clone(), arr));
143    }
144
145    let iter = ca.downcast_iter().map(|arr| {
146        let validity = arr.validity().cloned();
147        let arr: V::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
148        Ok(arr.with_validity_typed(validity))
149    });
150    ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
151}
152
153/// Applies a kernel that produces `Array` types.
154///
155/// Intended for kernels that apply on values, this function will apply the
156/// validity mask afterwards.
157#[inline]
158pub fn unary_mut_values<T, V, F, Arr>(ca: &ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
159where
160    T: PolarsDataType,
161    V: PolarsDataType<Array = Arr>,
162    Arr: Array + StaticArray,
163    F: FnMut(&T::Array) -> Arr,
164{
165    let iter = ca
166        .downcast_iter()
167        .map(|arr| op(arr).with_validity_typed(arr.validity().cloned()));
168    ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
169}
170
171/// Applies a kernel that produces `Array` types.
172#[inline]
173pub fn unary_mut_with_options<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
174where
175    T: PolarsDataType,
176    V: PolarsDataType<Array = Arr>,
177    Arr: Array + StaticArray,
178    F: FnMut(&T::Array) -> Arr,
179{
180    ChunkedArray::from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
181}
182
183#[inline]
184pub fn try_unary_mut_with_options<T, V, F, Arr, E>(
185    ca: &ChunkedArray<T>,
186    op: F,
187) -> Result<ChunkedArray<V>, E>
188where
189    T: PolarsDataType,
190    V: PolarsDataType<Array = Arr>,
191    Arr: Array + StaticArray,
192    F: FnMut(&T::Array) -> Result<Arr, E>,
193    E: Error,
194{
195    ChunkedArray::try_from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
196}
197
198#[inline]
199pub fn binary_elementwise<T, U, V, F>(
200    lhs: &ChunkedArray<T>,
201    rhs: &ChunkedArray<U>,
202    mut op: F,
203) -> ChunkedArray<V>
204where
205    T: PolarsDataType,
206    U: PolarsDataType,
207    V: PolarsDataType,
208    F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
209    V::Array: for<'a> ArrayFromIter<
210        <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
211    >,
212{
213    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
214    let iter = lhs
215        .downcast_iter()
216        .zip(rhs.downcast_iter())
217        .map(|(lhs_arr, rhs_arr)| {
218            let element_iter = lhs_arr
219                .iter()
220                .zip(rhs_arr.iter())
221                .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
222            element_iter.collect_arr()
223        });
224    ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
225}
226
227#[inline]
228pub fn binary_elementwise_for_each<'a, 'b, T, U, F>(
229    lhs: &'a ChunkedArray<T>,
230    rhs: &'b ChunkedArray<U>,
231    mut op: F,
232) where
233    T: PolarsDataType,
234    U: PolarsDataType,
235    F: FnMut(Option<T::Physical<'a>>, Option<U::Physical<'b>>),
236{
237    let mut lhs_arr_iter = lhs.downcast_iter();
238    let mut rhs_arr_iter = rhs.downcast_iter();
239
240    let lhs_arr = lhs_arr_iter.next().unwrap();
241    let rhs_arr = rhs_arr_iter.next().unwrap();
242
243    let mut lhs_remaining = lhs_arr.len();
244    let mut rhs_remaining = rhs_arr.len();
245    let mut lhs_iter = lhs_arr.iter();
246    let mut rhs_iter = rhs_arr.iter();
247
248    loop {
249        let range = std::cmp::min(lhs_remaining, rhs_remaining);
250
251        for _ in 0..range {
252            // SAFETY: we loop until the smaller iter is exhausted.
253            let lhs_opt_val = unsafe { lhs_iter.next().unwrap_unchecked() };
254            let rhs_opt_val = unsafe { rhs_iter.next().unwrap_unchecked() };
255            op(lhs_opt_val, rhs_opt_val)
256        }
257        lhs_remaining -= range;
258        rhs_remaining -= range;
259
260        if lhs_remaining == 0 {
261            let Some(new_arr) = lhs_arr_iter.next() else {
262                return;
263            };
264            lhs_remaining = new_arr.len();
265            lhs_iter = new_arr.iter();
266        }
267        if rhs_remaining == 0 {
268            let Some(new_arr) = rhs_arr_iter.next() else {
269                return;
270            };
271            rhs_remaining = new_arr.len();
272            rhs_iter = new_arr.iter();
273        }
274    }
275}
276
277#[inline]
278pub fn try_binary_elementwise<T, U, V, F, K, E>(
279    lhs: &ChunkedArray<T>,
280    rhs: &ChunkedArray<U>,
281    mut op: F,
282) -> Result<ChunkedArray<V>, E>
283where
284    T: PolarsDataType,
285    U: PolarsDataType,
286    V: PolarsDataType,
287    F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
288    V::Array: ArrayFromIter<Option<K>>,
289{
290    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
291    let iter = lhs
292        .downcast_iter()
293        .zip(rhs.downcast_iter())
294        .map(|(lhs_arr, rhs_arr)| {
295            let element_iter = lhs_arr
296                .iter()
297                .zip(rhs_arr.iter())
298                .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
299            element_iter.try_collect_arr()
300        });
301    ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
302}
303
304#[inline]
305pub fn binary_elementwise_values<T, U, V, F, K>(
306    lhs: &ChunkedArray<T>,
307    rhs: &ChunkedArray<U>,
308    mut op: F,
309) -> ChunkedArray<V>
310where
311    T: PolarsDataType,
312    U: PolarsDataType,
313    V: PolarsDataType,
314    F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
315    V::Array: ArrayFromIter<K>,
316{
317    if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
318        let len = lhs.len().min(rhs.len());
319        let arr = V::Array::full_null(len, V::get_dtype().to_arrow(CompatLevel::newest()));
320
321        return ChunkedArray::with_chunk(lhs.name().clone(), arr);
322    }
323
324    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
325
326    let iter = lhs
327        .downcast_iter()
328        .zip(rhs.downcast_iter())
329        .map(|(lhs_arr, rhs_arr)| {
330            let validity = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
331
332            let element_iter = lhs_arr
333                .values_iter()
334                .zip(rhs_arr.values_iter())
335                .map(|(lhs_val, rhs_val)| op(lhs_val, rhs_val));
336
337            let array: V::Array = element_iter.collect_arr();
338            array.with_validity_typed(validity)
339        });
340    ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
341}
342
343/// Apply elementwise binary function which produces string, amortising allocations.
344///
345/// Currently unused within Polars itself, but it's a useful utility for plugin authors.
346#[inline]
347pub fn binary_elementwise_into_string_amortized<T, U, F>(
348    lhs: &ChunkedArray<T>,
349    rhs: &ChunkedArray<U>,
350    mut op: F,
351) -> StringChunked
352where
353    T: PolarsDataType,
354    U: PolarsDataType,
355    F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>, &mut String),
356{
357    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
358    let mut buf = String::new();
359    let iter = lhs
360        .downcast_iter()
361        .zip(rhs.downcast_iter())
362        .map(|(lhs_arr, rhs_arr)| {
363            let mut mutarr = MutablePlString::with_capacity(lhs_arr.len());
364            lhs_arr
365                .iter()
366                .zip(rhs_arr.iter())
367                .for_each(|(lhs_opt, rhs_opt)| match (lhs_opt, rhs_opt) {
368                    (None, _) | (_, None) => mutarr.push_null(),
369                    (Some(lhs_val), Some(rhs_val)) => {
370                        buf.clear();
371                        op(lhs_val, rhs_val, &mut buf);
372                        mutarr.push_value(&buf)
373                    },
374                });
375            mutarr.freeze()
376        });
377    ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
378}
379
380/// Applies a kernel that produces `Array` types.
381///
382/// Intended for kernels that apply on values, this function will filter out any
383/// results which do not have two non-null inputs.
384#[inline]
385pub fn binary_mut_values<T, U, V, F, Arr>(
386    lhs: &ChunkedArray<T>,
387    rhs: &ChunkedArray<U>,
388    mut op: F,
389    name: PlSmallStr,
390) -> ChunkedArray<V>
391where
392    T: PolarsDataType,
393    U: PolarsDataType,
394    V: PolarsDataType<Array = Arr>,
395    Arr: Array + StaticArray,
396    F: FnMut(&T::Array, &U::Array) -> Arr,
397{
398    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
399    let iter = lhs
400        .downcast_iter()
401        .zip(rhs.downcast_iter())
402        .map(|(lhs_arr, rhs_arr)| {
403            let ret = op(lhs_arr, rhs_arr);
404            let inp_val = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
405            let val = combine_validities_and(inp_val.as_ref(), ret.validity());
406            ret.with_validity_typed(val)
407        });
408    ChunkedArray::from_chunk_iter(name, iter)
409}
410
411/// Applies a kernel that produces `Array` types.
412#[inline]
413pub fn binary_mut_with_options<T, U, V, F, Arr>(
414    lhs: &ChunkedArray<T>,
415    rhs: &ChunkedArray<U>,
416    mut op: F,
417    name: PlSmallStr,
418) -> ChunkedArray<V>
419where
420    T: PolarsDataType,
421    U: PolarsDataType,
422    V: PolarsDataType<Array = Arr>,
423    Arr: Array,
424    F: FnMut(&T::Array, &U::Array) -> Arr,
425{
426    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
427    let iter = lhs
428        .downcast_iter()
429        .zip(rhs.downcast_iter())
430        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
431    ChunkedArray::from_chunk_iter(name, iter)
432}
433
434#[inline]
435pub fn try_binary_mut_with_options<T, U, V, F, Arr, E>(
436    lhs: &ChunkedArray<T>,
437    rhs: &ChunkedArray<U>,
438    mut op: F,
439    name: PlSmallStr,
440) -> Result<ChunkedArray<V>, E>
441where
442    T: PolarsDataType,
443    U: PolarsDataType,
444    V: PolarsDataType<Array = Arr>,
445    Arr: Array,
446    F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
447    E: Error,
448{
449    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
450    let iter = lhs
451        .downcast_iter()
452        .zip(rhs.downcast_iter())
453        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
454    ChunkedArray::try_from_chunk_iter(name, iter)
455}
456
457/// Applies a kernel that produces `Array` types.
458pub fn binary<T, U, V, F, Arr>(
459    lhs: &ChunkedArray<T>,
460    rhs: &ChunkedArray<U>,
461    op: F,
462) -> ChunkedArray<V>
463where
464    T: PolarsDataType,
465    U: PolarsDataType,
466    V: PolarsDataType<Array = Arr>,
467    Arr: Array,
468    F: FnMut(&T::Array, &U::Array) -> Arr,
469{
470    binary_mut_with_options(lhs, rhs, op, lhs.name().clone())
471}
472
473/// Applies a kernel that produces `Array` types.
474pub fn binary_owned<L, R, V, F, Arr>(
475    lhs: ChunkedArray<L>,
476    rhs: ChunkedArray<R>,
477    mut op: F,
478) -> ChunkedArray<V>
479where
480    L: PolarsDataType,
481    R: PolarsDataType,
482    V: PolarsDataType<Array = Arr>,
483    Arr: Array,
484    F: FnMut(L::Array, R::Array) -> Arr,
485{
486    let name = lhs.name().clone();
487    let (lhs, rhs) = align_chunks_binary_owned(lhs, rhs);
488    let iter = lhs
489        .downcast_into_iter()
490        .zip(rhs.downcast_into_iter())
491        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
492    ChunkedArray::from_chunk_iter(name, iter)
493}
494
495/// Applies a kernel that produces `Array` types.
496pub fn try_binary<T, U, V, F, Arr, E>(
497    lhs: &ChunkedArray<T>,
498    rhs: &ChunkedArray<U>,
499    mut op: F,
500) -> Result<ChunkedArray<V>, E>
501where
502    T: PolarsDataType,
503    U: PolarsDataType,
504    V: PolarsDataType<Array = Arr>,
505    Arr: Array,
506    F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
507    E: Error,
508{
509    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
510    let iter = lhs
511        .downcast_iter()
512        .zip(rhs.downcast_iter())
513        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
514    ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
515}
516
517/// Applies a kernel that produces `ArrayRef` of the same type.
518///
519/// # Safety
520/// Caller must ensure that the returned `ArrayRef` belongs to `T: PolarsDataType`.
521#[inline]
522pub unsafe fn binary_unchecked_same_type<T, U, F>(
523    lhs: &ChunkedArray<T>,
524    rhs: &ChunkedArray<U>,
525    mut op: F,
526    keep_sorted: bool,
527    keep_fast_explode: bool,
528) -> ChunkedArray<T>
529where
530    T: PolarsDataType,
531    U: PolarsDataType,
532    F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
533{
534    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
535    let chunks = lhs
536        .downcast_iter()
537        .zip(rhs.downcast_iter())
538        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
539        .collect();
540
541    let mut ca = lhs.copy_with_chunks(chunks);
542
543    let mut retain_flags = StatisticsFlags::empty();
544    use StatisticsFlags as F;
545    retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
546    retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
547    ca.retain_flags_from(lhs.as_ref(), retain_flags);
548
549    ca
550}
551
552#[inline]
553pub fn binary_to_series<T, U, F>(
554    lhs: &ChunkedArray<T>,
555    rhs: &ChunkedArray<U>,
556    mut op: F,
557) -> PolarsResult<Series>
558where
559    T: PolarsDataType,
560    U: PolarsDataType,
561    F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
562{
563    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
564    let chunks = lhs
565        .downcast_iter()
566        .zip(rhs.downcast_iter())
567        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
568        .collect::<Vec<_>>();
569    Series::try_from((lhs.name().clone(), chunks))
570}
571
572/// Applies a kernel that produces `ArrayRef` of the same type.
573///
574/// # Safety
575/// Caller must ensure that the returned `ArrayRef` belongs to `T: PolarsDataType`.
576#[inline]
577pub unsafe fn try_binary_unchecked_same_type<T, U, F, E>(
578    lhs: &ChunkedArray<T>,
579    rhs: &ChunkedArray<U>,
580    mut op: F,
581    keep_sorted: bool,
582    keep_fast_explode: bool,
583) -> Result<ChunkedArray<T>, E>
584where
585    T: PolarsDataType,
586    U: PolarsDataType,
587    F: FnMut(&T::Array, &U::Array) -> Result<Box<dyn Array>, E>,
588    E: Error,
589{
590    let (lhs, rhs) = align_chunks_binary(lhs, rhs);
591    let chunks = lhs
592        .downcast_iter()
593        .zip(rhs.downcast_iter())
594        .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
595        .collect::<Result<Vec<_>, E>>()?;
596    let mut ca = lhs.copy_with_chunks(chunks);
597
598    let mut retain_flags = StatisticsFlags::empty();
599    use StatisticsFlags as F;
600    retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
601    retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
602    ca.retain_flags_from(lhs.as_ref(), retain_flags);
603
604    Ok(ca)
605}
606
607#[inline]
608pub fn try_ternary_elementwise<T, U, V, G, F, K, E>(
609    ca1: &ChunkedArray<T>,
610    ca2: &ChunkedArray<U>,
611    ca3: &ChunkedArray<G>,
612    mut op: F,
613) -> Result<ChunkedArray<V>, E>
614where
615    T: PolarsDataType,
616    U: PolarsDataType,
617    V: PolarsDataType,
618    G: PolarsDataType,
619    F: for<'a> FnMut(
620        Option<T::Physical<'a>>,
621        Option<U::Physical<'a>>,
622        Option<G::Physical<'a>>,
623    ) -> Result<Option<K>, E>,
624    V::Array: ArrayFromIter<Option<K>>,
625{
626    let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
627    let iter = ca1
628        .downcast_iter()
629        .zip(ca2.downcast_iter())
630        .zip(ca3.downcast_iter())
631        .map(|((ca1_arr, ca2_arr), ca3_arr)| {
632            let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
633                |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
634                    op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
635                },
636            );
637            element_iter.try_collect_arr()
638        });
639    ChunkedArray::try_from_chunk_iter(ca1.name().clone(), iter)
640}
641
642#[inline]
643pub fn ternary_elementwise<T, U, V, G, F>(
644    ca1: &ChunkedArray<T>,
645    ca2: &ChunkedArray<U>,
646    ca3: &ChunkedArray<G>,
647    mut op: F,
648) -> ChunkedArray<V>
649where
650    T: PolarsDataType,
651    U: PolarsDataType,
652    G: PolarsDataType,
653    V: PolarsDataType,
654    F: for<'a> TernaryFnMut<
655        Option<T::Physical<'a>>,
656        Option<U::Physical<'a>>,
657        Option<G::Physical<'a>>,
658    >,
659    V::Array: for<'a> ArrayFromIter<
660        <F as TernaryFnMut<
661            Option<T::Physical<'a>>,
662            Option<U::Physical<'a>>,
663            Option<G::Physical<'a>>,
664        >>::Ret,
665    >,
666{
667    let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
668    let iter = ca1
669        .downcast_iter()
670        .zip(ca2.downcast_iter())
671        .zip(ca3.downcast_iter())
672        .map(|((ca1_arr, ca2_arr), ca3_arr)| {
673            let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
674                |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
675                    op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
676                },
677            );
678            element_iter.collect_arr()
679        });
680    ChunkedArray::from_chunk_iter(ca1.name().clone(), iter)
681}
682
683pub fn broadcast_binary_elementwise<T, U, V, F>(
684    lhs: &ChunkedArray<T>,
685    rhs: &ChunkedArray<U>,
686    mut op: F,
687) -> ChunkedArray<V>
688where
689    T: PolarsDataType,
690    U: PolarsDataType,
691    V: PolarsDataType,
692    F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
693    V::Array: for<'a> ArrayFromIter<
694        <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
695    >,
696{
697    match (lhs.len(), rhs.len()) {
698        (1, _) => {
699            let a = unsafe { lhs.get_unchecked(0) };
700            unary_elementwise(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
701        },
702        (_, 1) => {
703            let b = unsafe { rhs.get_unchecked(0) };
704            unary_elementwise(lhs, |a| op(a, b.clone()))
705        },
706        _ => binary_elementwise(lhs, rhs, op),
707    }
708}
709
710pub fn broadcast_try_binary_elementwise<T, U, V, F, K, E>(
711    lhs: &ChunkedArray<T>,
712    rhs: &ChunkedArray<U>,
713    mut op: F,
714) -> Result<ChunkedArray<V>, E>
715where
716    T: PolarsDataType,
717    U: PolarsDataType,
718    V: PolarsDataType,
719    F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
720    V::Array: ArrayFromIter<Option<K>>,
721{
722    match (lhs.len(), rhs.len()) {
723        (1, _) => {
724            let a = unsafe { lhs.get_unchecked(0) };
725            Ok(try_unary_elementwise(rhs, |b| op(a.clone(), b))?.with_name(lhs.name().clone()))
726        },
727        (_, 1) => {
728            let b = unsafe { rhs.get_unchecked(0) };
729            try_unary_elementwise(lhs, |a| op(a, b.clone()))
730        },
731        _ => try_binary_elementwise(lhs, rhs, op),
732    }
733}
734
735pub fn broadcast_binary_elementwise_values<T, U, V, F, K>(
736    lhs: &ChunkedArray<T>,
737    rhs: &ChunkedArray<U>,
738    mut op: F,
739) -> ChunkedArray<V>
740where
741    T: PolarsDataType,
742    U: PolarsDataType,
743    V: PolarsDataType,
744    F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
745    V::Array: ArrayFromIter<K>,
746{
747    if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
748        let min = lhs.len().min(rhs.len());
749        let max = lhs.len().max(rhs.len());
750        let len = if min == 1 { max } else { min };
751        let arr = V::Array::full_null(len, V::get_dtype().to_arrow(CompatLevel::newest()));
752
753        return ChunkedArray::with_chunk(lhs.name().clone(), arr);
754    }
755
756    match (lhs.len(), rhs.len()) {
757        (1, _) => {
758            let a = unsafe { lhs.value_unchecked(0) };
759            unary_elementwise_values(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
760        },
761        (_, 1) => {
762            let b = unsafe { rhs.value_unchecked(0) };
763            unary_elementwise_values(lhs, |a| op(a, b.clone()))
764        },
765        _ => binary_elementwise_values(lhs, rhs, op),
766    }
767}
768
769pub fn apply_binary_kernel_broadcast<'l, 'r, L, R, O, K, LK, RK>(
770    lhs: &'l ChunkedArray<L>,
771    rhs: &'r ChunkedArray<R>,
772    kernel: K,
773    lhs_broadcast_kernel: LK,
774    rhs_broadcast_kernel: RK,
775) -> ChunkedArray<O>
776where
777    L: PolarsDataType,
778    R: PolarsDataType,
779    O: PolarsDataType,
780    K: Fn(&L::Array, &R::Array) -> O::Array,
781    LK: Fn(L::Physical<'l>, &R::Array) -> O::Array,
782    RK: Fn(&L::Array, R::Physical<'r>) -> O::Array,
783{
784    let name = lhs.name();
785    let out = match (lhs.len(), rhs.len()) {
786        (a, b) if a == b => binary(lhs, rhs, |lhs, rhs| kernel(lhs, rhs)),
787        // broadcast right path
788        (_, 1) => {
789            let opt_rhs = rhs.get(0);
790            match opt_rhs {
791                None => {
792                    let arr = O::Array::full_null(
793                        lhs.len(),
794                        O::get_dtype().to_arrow(CompatLevel::newest()),
795                    );
796                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
797                },
798                Some(rhs) => unary_kernel(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
799            }
800        },
801        (1, _) => {
802            let opt_lhs = lhs.get(0);
803            match opt_lhs {
804                None => {
805                    let arr = O::Array::full_null(
806                        rhs.len(),
807                        O::get_dtype().to_arrow(CompatLevel::newest()),
808                    );
809                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
810                },
811                Some(lhs) => unary_kernel(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
812            }
813        },
814        _ => panic!("Cannot apply operation on arrays of different lengths"),
815    };
816    out.with_name(name.clone())
817}
818
819pub fn apply_binary_kernel_broadcast_owned<L, R, O, K, LK, RK>(
820    lhs: ChunkedArray<L>,
821    rhs: ChunkedArray<R>,
822    kernel: K,
823    lhs_broadcast_kernel: LK,
824    rhs_broadcast_kernel: RK,
825) -> ChunkedArray<O>
826where
827    L: PolarsDataType,
828    R: PolarsDataType,
829    O: PolarsDataType,
830    K: Fn(L::Array, R::Array) -> O::Array,
831    for<'a> LK: Fn(L::Physical<'a>, R::Array) -> O::Array,
832    for<'a> RK: Fn(L::Array, R::Physical<'a>) -> O::Array,
833{
834    let name = lhs.name().to_owned();
835    let out = match (lhs.len(), rhs.len()) {
836        (a, b) if a == b => binary_owned(lhs, rhs, kernel),
837        // broadcast right path
838        (_, 1) => {
839            let opt_rhs = rhs.get(0);
840            match opt_rhs {
841                None => {
842                    let arr = O::Array::full_null(
843                        lhs.len(),
844                        O::get_dtype().to_arrow(CompatLevel::newest()),
845                    );
846                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
847                },
848                Some(rhs) => unary_kernel_owned(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
849            }
850        },
851        (1, _) => {
852            let opt_lhs = lhs.get(0);
853            match opt_lhs {
854                None => {
855                    let arr = O::Array::full_null(
856                        rhs.len(),
857                        O::get_dtype().to_arrow(CompatLevel::newest()),
858                    );
859                    ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
860                },
861                Some(lhs) => unary_kernel_owned(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
862            }
863        },
864        _ => panic!("Cannot apply operation on arrays of different lengths"),
865    };
866    out.with_name(name)
867}