1use std::error::Error;
2
3use arrow::array::{Array, MutablePlString, StaticArray};
4use arrow::compute::utils::combine_validities_and;
5use polars_error::PolarsResult;
6use polars_utils::pl_str::PlSmallStr;
7
8use crate::chunked_array::flags::StatisticsFlags;
9use crate::datatypes::{ArrayCollectIterExt, ArrayFromIter};
10use crate::prelude::{ChunkedArray, CompatLevel, PolarsDataType, Series, StringChunked};
11use crate::utils::{align_chunks_binary, align_chunks_binary_owned, align_chunks_ternary};
12
13pub trait UnaryFnMut<A1>: FnMut(A1) -> Self::Ret {
16 type Ret;
17}
18
19impl<A1, R, T: FnMut(A1) -> R> UnaryFnMut<A1> for T {
20 type Ret = R;
21}
22
23pub trait TernaryFnMut<A1, A2, A3>: FnMut(A1, A2, A3) -> Self::Ret {
26 type Ret;
27}
28
29impl<A1, A2, A3, R, T: FnMut(A1, A2, A3) -> R> TernaryFnMut<A1, A2, A3> for T {
30 type Ret = R;
31}
32
33pub trait BinaryFnMut<A1, A2>: FnMut(A1, A2) -> Self::Ret {
36 type Ret;
37}
38
39impl<A1, A2, R, T: FnMut(A1, A2) -> R> BinaryFnMut<A1, A2> for T {
40 type Ret = R;
41}
42
43#[inline]
45pub fn unary_kernel<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
46where
47 T: PolarsDataType,
48 V: PolarsDataType<Array = Arr>,
49 Arr: Array,
50 F: FnMut(&T::Array) -> Arr,
51{
52 let iter = ca.downcast_iter().map(op);
53 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
54}
55
56#[inline]
58pub fn unary_kernel_owned<T, V, F, Arr>(ca: ChunkedArray<T>, op: F) -> ChunkedArray<V>
59where
60 T: PolarsDataType,
61 V: PolarsDataType<Array = Arr>,
62 Arr: Array,
63 F: FnMut(T::Array) -> Arr,
64{
65 let name = ca.name().clone();
66 let iter = ca.downcast_into_iter().map(op);
67 ChunkedArray::from_chunk_iter(name, iter)
68}
69
70#[inline]
71pub fn unary_elementwise<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
72where
73 T: PolarsDataType,
74 V: PolarsDataType,
75 F: UnaryFnMut<Option<T::Physical<'a>>>,
76 V::Array: ArrayFromIter<<F as UnaryFnMut<Option<T::Physical<'a>>>>::Ret>,
77{
78 if ca.has_nulls() {
79 let iter = ca
80 .downcast_iter()
81 .map(|arr| arr.iter().map(&mut op).collect_arr());
82 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
83 } else {
84 let iter = ca
85 .downcast_iter()
86 .map(|arr| arr.values_iter().map(|x| op(Some(x))).collect_arr());
87 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
88 }
89}
90
91#[inline]
92pub fn try_unary_elementwise<'a, T, V, F, K, E>(
93 ca: &'a ChunkedArray<T>,
94 mut op: F,
95) -> Result<ChunkedArray<V>, E>
96where
97 T: PolarsDataType,
98 V: PolarsDataType,
99 F: FnMut(Option<T::Physical<'a>>) -> Result<Option<K>, E>,
100 V::Array: ArrayFromIter<Option<K>>,
101{
102 let iter = ca
103 .downcast_iter()
104 .map(|arr| arr.iter().map(&mut op).try_collect_arr());
105 ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
106}
107
108#[inline]
109pub fn unary_elementwise_values<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
110where
111 T: PolarsDataType,
112 V: PolarsDataType,
113 F: UnaryFnMut<T::Physical<'a>>,
114 V::Array: ArrayFromIter<<F as UnaryFnMut<T::Physical<'a>>>::Ret>,
115{
116 if ca.null_count() == ca.len() {
117 let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow(CompatLevel::newest()));
118 return ChunkedArray::with_chunk(ca.name().clone(), arr);
119 }
120
121 let iter = ca.downcast_iter().map(|arr| {
122 let validity = arr.validity().cloned();
123 let arr: V::Array = arr.values_iter().map(&mut op).collect_arr();
124 arr.with_validity_typed(validity)
125 });
126 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
127}
128
129#[inline]
130pub fn try_unary_elementwise_values<'a, T, V, F, K, E>(
131 ca: &'a ChunkedArray<T>,
132 mut op: F,
133) -> Result<ChunkedArray<V>, E>
134where
135 T: PolarsDataType,
136 V: PolarsDataType,
137 F: FnMut(T::Physical<'a>) -> Result<K, E>,
138 V::Array: ArrayFromIter<K>,
139{
140 if ca.null_count() == ca.len() {
141 let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow(CompatLevel::newest()));
142 return Ok(ChunkedArray::with_chunk(ca.name().clone(), arr));
143 }
144
145 let iter = ca.downcast_iter().map(|arr| {
146 let validity = arr.validity().cloned();
147 let arr: V::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
148 Ok(arr.with_validity_typed(validity))
149 });
150 ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
151}
152
153#[inline]
158pub fn unary_mut_values<T, V, F, Arr>(ca: &ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
159where
160 T: PolarsDataType,
161 V: PolarsDataType<Array = Arr>,
162 Arr: Array + StaticArray,
163 F: FnMut(&T::Array) -> Arr,
164{
165 let iter = ca
166 .downcast_iter()
167 .map(|arr| op(arr).with_validity_typed(arr.validity().cloned()));
168 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
169}
170
171#[inline]
173pub fn unary_mut_with_options<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
174where
175 T: PolarsDataType,
176 V: PolarsDataType<Array = Arr>,
177 Arr: Array + StaticArray,
178 F: FnMut(&T::Array) -> Arr,
179{
180 ChunkedArray::from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
181}
182
183#[inline]
184pub fn try_unary_mut_with_options<T, V, F, Arr, E>(
185 ca: &ChunkedArray<T>,
186 op: F,
187) -> Result<ChunkedArray<V>, E>
188where
189 T: PolarsDataType,
190 V: PolarsDataType<Array = Arr>,
191 Arr: Array + StaticArray,
192 F: FnMut(&T::Array) -> Result<Arr, E>,
193 E: Error,
194{
195 ChunkedArray::try_from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
196}
197
198#[inline]
199pub fn binary_elementwise<T, U, V, F>(
200 lhs: &ChunkedArray<T>,
201 rhs: &ChunkedArray<U>,
202 mut op: F,
203) -> ChunkedArray<V>
204where
205 T: PolarsDataType,
206 U: PolarsDataType,
207 V: PolarsDataType,
208 F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
209 V::Array: for<'a> ArrayFromIter<
210 <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
211 >,
212{
213 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
214 let iter = lhs
215 .downcast_iter()
216 .zip(rhs.downcast_iter())
217 .map(|(lhs_arr, rhs_arr)| {
218 let element_iter = lhs_arr
219 .iter()
220 .zip(rhs_arr.iter())
221 .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
222 element_iter.collect_arr()
223 });
224 ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
225}
226
227#[inline]
228pub fn binary_elementwise_for_each<'a, 'b, T, U, F>(
229 lhs: &'a ChunkedArray<T>,
230 rhs: &'b ChunkedArray<U>,
231 mut op: F,
232) where
233 T: PolarsDataType,
234 U: PolarsDataType,
235 F: FnMut(Option<T::Physical<'a>>, Option<U::Physical<'b>>),
236{
237 let mut lhs_arr_iter = lhs.downcast_iter();
238 let mut rhs_arr_iter = rhs.downcast_iter();
239
240 let lhs_arr = lhs_arr_iter.next().unwrap();
241 let rhs_arr = rhs_arr_iter.next().unwrap();
242
243 let mut lhs_remaining = lhs_arr.len();
244 let mut rhs_remaining = rhs_arr.len();
245 let mut lhs_iter = lhs_arr.iter();
246 let mut rhs_iter = rhs_arr.iter();
247
248 loop {
249 let range = std::cmp::min(lhs_remaining, rhs_remaining);
250
251 for _ in 0..range {
252 let lhs_opt_val = unsafe { lhs_iter.next().unwrap_unchecked() };
254 let rhs_opt_val = unsafe { rhs_iter.next().unwrap_unchecked() };
255 op(lhs_opt_val, rhs_opt_val)
256 }
257 lhs_remaining -= range;
258 rhs_remaining -= range;
259
260 if lhs_remaining == 0 {
261 let Some(new_arr) = lhs_arr_iter.next() else {
262 return;
263 };
264 lhs_remaining = new_arr.len();
265 lhs_iter = new_arr.iter();
266 }
267 if rhs_remaining == 0 {
268 let Some(new_arr) = rhs_arr_iter.next() else {
269 return;
270 };
271 rhs_remaining = new_arr.len();
272 rhs_iter = new_arr.iter();
273 }
274 }
275}
276
277#[inline]
278pub fn try_binary_elementwise<T, U, V, F, K, E>(
279 lhs: &ChunkedArray<T>,
280 rhs: &ChunkedArray<U>,
281 mut op: F,
282) -> Result<ChunkedArray<V>, E>
283where
284 T: PolarsDataType,
285 U: PolarsDataType,
286 V: PolarsDataType,
287 F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
288 V::Array: ArrayFromIter<Option<K>>,
289{
290 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
291 let iter = lhs
292 .downcast_iter()
293 .zip(rhs.downcast_iter())
294 .map(|(lhs_arr, rhs_arr)| {
295 let element_iter = lhs_arr
296 .iter()
297 .zip(rhs_arr.iter())
298 .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
299 element_iter.try_collect_arr()
300 });
301 ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
302}
303
304#[inline]
305pub fn binary_elementwise_values<T, U, V, F, K>(
306 lhs: &ChunkedArray<T>,
307 rhs: &ChunkedArray<U>,
308 mut op: F,
309) -> ChunkedArray<V>
310where
311 T: PolarsDataType,
312 U: PolarsDataType,
313 V: PolarsDataType,
314 F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
315 V::Array: ArrayFromIter<K>,
316{
317 if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
318 let len = lhs.len().min(rhs.len());
319 let arr = V::Array::full_null(len, V::get_dtype().to_arrow(CompatLevel::newest()));
320
321 return ChunkedArray::with_chunk(lhs.name().clone(), arr);
322 }
323
324 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
325
326 let iter = lhs
327 .downcast_iter()
328 .zip(rhs.downcast_iter())
329 .map(|(lhs_arr, rhs_arr)| {
330 let validity = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
331
332 let element_iter = lhs_arr
333 .values_iter()
334 .zip(rhs_arr.values_iter())
335 .map(|(lhs_val, rhs_val)| op(lhs_val, rhs_val));
336
337 let array: V::Array = element_iter.collect_arr();
338 array.with_validity_typed(validity)
339 });
340 ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
341}
342
343#[inline]
347pub fn binary_elementwise_into_string_amortized<T, U, F>(
348 lhs: &ChunkedArray<T>,
349 rhs: &ChunkedArray<U>,
350 mut op: F,
351) -> StringChunked
352where
353 T: PolarsDataType,
354 U: PolarsDataType,
355 F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>, &mut String),
356{
357 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
358 let mut buf = String::new();
359 let iter = lhs
360 .downcast_iter()
361 .zip(rhs.downcast_iter())
362 .map(|(lhs_arr, rhs_arr)| {
363 let mut mutarr = MutablePlString::with_capacity(lhs_arr.len());
364 lhs_arr
365 .iter()
366 .zip(rhs_arr.iter())
367 .for_each(|(lhs_opt, rhs_opt)| match (lhs_opt, rhs_opt) {
368 (None, _) | (_, None) => mutarr.push_null(),
369 (Some(lhs_val), Some(rhs_val)) => {
370 buf.clear();
371 op(lhs_val, rhs_val, &mut buf);
372 mutarr.push_value(&buf)
373 },
374 });
375 mutarr.freeze()
376 });
377 ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
378}
379
380#[inline]
385pub fn binary_mut_values<T, U, V, F, Arr>(
386 lhs: &ChunkedArray<T>,
387 rhs: &ChunkedArray<U>,
388 mut op: F,
389 name: PlSmallStr,
390) -> ChunkedArray<V>
391where
392 T: PolarsDataType,
393 U: PolarsDataType,
394 V: PolarsDataType<Array = Arr>,
395 Arr: Array + StaticArray,
396 F: FnMut(&T::Array, &U::Array) -> Arr,
397{
398 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
399 let iter = lhs
400 .downcast_iter()
401 .zip(rhs.downcast_iter())
402 .map(|(lhs_arr, rhs_arr)| {
403 let ret = op(lhs_arr, rhs_arr);
404 let inp_val = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
405 let val = combine_validities_and(inp_val.as_ref(), ret.validity());
406 ret.with_validity_typed(val)
407 });
408 ChunkedArray::from_chunk_iter(name, iter)
409}
410
411#[inline]
413pub fn binary_mut_with_options<T, U, V, F, Arr>(
414 lhs: &ChunkedArray<T>,
415 rhs: &ChunkedArray<U>,
416 mut op: F,
417 name: PlSmallStr,
418) -> ChunkedArray<V>
419where
420 T: PolarsDataType,
421 U: PolarsDataType,
422 V: PolarsDataType<Array = Arr>,
423 Arr: Array,
424 F: FnMut(&T::Array, &U::Array) -> Arr,
425{
426 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
427 let iter = lhs
428 .downcast_iter()
429 .zip(rhs.downcast_iter())
430 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
431 ChunkedArray::from_chunk_iter(name, iter)
432}
433
434#[inline]
435pub fn try_binary_mut_with_options<T, U, V, F, Arr, E>(
436 lhs: &ChunkedArray<T>,
437 rhs: &ChunkedArray<U>,
438 mut op: F,
439 name: PlSmallStr,
440) -> Result<ChunkedArray<V>, E>
441where
442 T: PolarsDataType,
443 U: PolarsDataType,
444 V: PolarsDataType<Array = Arr>,
445 Arr: Array,
446 F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
447 E: Error,
448{
449 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
450 let iter = lhs
451 .downcast_iter()
452 .zip(rhs.downcast_iter())
453 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
454 ChunkedArray::try_from_chunk_iter(name, iter)
455}
456
457pub fn binary<T, U, V, F, Arr>(
459 lhs: &ChunkedArray<T>,
460 rhs: &ChunkedArray<U>,
461 op: F,
462) -> ChunkedArray<V>
463where
464 T: PolarsDataType,
465 U: PolarsDataType,
466 V: PolarsDataType<Array = Arr>,
467 Arr: Array,
468 F: FnMut(&T::Array, &U::Array) -> Arr,
469{
470 binary_mut_with_options(lhs, rhs, op, lhs.name().clone())
471}
472
473pub fn binary_owned<L, R, V, F, Arr>(
475 lhs: ChunkedArray<L>,
476 rhs: ChunkedArray<R>,
477 mut op: F,
478) -> ChunkedArray<V>
479where
480 L: PolarsDataType,
481 R: PolarsDataType,
482 V: PolarsDataType<Array = Arr>,
483 Arr: Array,
484 F: FnMut(L::Array, R::Array) -> Arr,
485{
486 let name = lhs.name().clone();
487 let (lhs, rhs) = align_chunks_binary_owned(lhs, rhs);
488 let iter = lhs
489 .downcast_into_iter()
490 .zip(rhs.downcast_into_iter())
491 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
492 ChunkedArray::from_chunk_iter(name, iter)
493}
494
495pub fn try_binary<T, U, V, F, Arr, E>(
497 lhs: &ChunkedArray<T>,
498 rhs: &ChunkedArray<U>,
499 mut op: F,
500) -> Result<ChunkedArray<V>, E>
501where
502 T: PolarsDataType,
503 U: PolarsDataType,
504 V: PolarsDataType<Array = Arr>,
505 Arr: Array,
506 F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
507 E: Error,
508{
509 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
510 let iter = lhs
511 .downcast_iter()
512 .zip(rhs.downcast_iter())
513 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
514 ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
515}
516
517#[inline]
522pub unsafe fn binary_unchecked_same_type<T, U, F>(
523 lhs: &ChunkedArray<T>,
524 rhs: &ChunkedArray<U>,
525 mut op: F,
526 keep_sorted: bool,
527 keep_fast_explode: bool,
528) -> ChunkedArray<T>
529where
530 T: PolarsDataType,
531 U: PolarsDataType,
532 F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
533{
534 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
535 let chunks = lhs
536 .downcast_iter()
537 .zip(rhs.downcast_iter())
538 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
539 .collect();
540
541 let mut ca = lhs.copy_with_chunks(chunks);
542
543 let mut retain_flags = StatisticsFlags::empty();
544 use StatisticsFlags as F;
545 retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
546 retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
547 ca.retain_flags_from(lhs.as_ref(), retain_flags);
548
549 ca
550}
551
552#[inline]
553pub fn binary_to_series<T, U, F>(
554 lhs: &ChunkedArray<T>,
555 rhs: &ChunkedArray<U>,
556 mut op: F,
557) -> PolarsResult<Series>
558where
559 T: PolarsDataType,
560 U: PolarsDataType,
561 F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
562{
563 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
564 let chunks = lhs
565 .downcast_iter()
566 .zip(rhs.downcast_iter())
567 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
568 .collect::<Vec<_>>();
569 Series::try_from((lhs.name().clone(), chunks))
570}
571
572#[inline]
577pub unsafe fn try_binary_unchecked_same_type<T, U, F, E>(
578 lhs: &ChunkedArray<T>,
579 rhs: &ChunkedArray<U>,
580 mut op: F,
581 keep_sorted: bool,
582 keep_fast_explode: bool,
583) -> Result<ChunkedArray<T>, E>
584where
585 T: PolarsDataType,
586 U: PolarsDataType,
587 F: FnMut(&T::Array, &U::Array) -> Result<Box<dyn Array>, E>,
588 E: Error,
589{
590 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
591 let chunks = lhs
592 .downcast_iter()
593 .zip(rhs.downcast_iter())
594 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
595 .collect::<Result<Vec<_>, E>>()?;
596 let mut ca = lhs.copy_with_chunks(chunks);
597
598 let mut retain_flags = StatisticsFlags::empty();
599 use StatisticsFlags as F;
600 retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
601 retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
602 ca.retain_flags_from(lhs.as_ref(), retain_flags);
603
604 Ok(ca)
605}
606
607#[inline]
608pub fn try_ternary_elementwise<T, U, V, G, F, K, E>(
609 ca1: &ChunkedArray<T>,
610 ca2: &ChunkedArray<U>,
611 ca3: &ChunkedArray<G>,
612 mut op: F,
613) -> Result<ChunkedArray<V>, E>
614where
615 T: PolarsDataType,
616 U: PolarsDataType,
617 V: PolarsDataType,
618 G: PolarsDataType,
619 F: for<'a> FnMut(
620 Option<T::Physical<'a>>,
621 Option<U::Physical<'a>>,
622 Option<G::Physical<'a>>,
623 ) -> Result<Option<K>, E>,
624 V::Array: ArrayFromIter<Option<K>>,
625{
626 let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
627 let iter = ca1
628 .downcast_iter()
629 .zip(ca2.downcast_iter())
630 .zip(ca3.downcast_iter())
631 .map(|((ca1_arr, ca2_arr), ca3_arr)| {
632 let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
633 |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
634 op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
635 },
636 );
637 element_iter.try_collect_arr()
638 });
639 ChunkedArray::try_from_chunk_iter(ca1.name().clone(), iter)
640}
641
642#[inline]
643pub fn ternary_elementwise<T, U, V, G, F>(
644 ca1: &ChunkedArray<T>,
645 ca2: &ChunkedArray<U>,
646 ca3: &ChunkedArray<G>,
647 mut op: F,
648) -> ChunkedArray<V>
649where
650 T: PolarsDataType,
651 U: PolarsDataType,
652 G: PolarsDataType,
653 V: PolarsDataType,
654 F: for<'a> TernaryFnMut<
655 Option<T::Physical<'a>>,
656 Option<U::Physical<'a>>,
657 Option<G::Physical<'a>>,
658 >,
659 V::Array: for<'a> ArrayFromIter<
660 <F as TernaryFnMut<
661 Option<T::Physical<'a>>,
662 Option<U::Physical<'a>>,
663 Option<G::Physical<'a>>,
664 >>::Ret,
665 >,
666{
667 let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
668 let iter = ca1
669 .downcast_iter()
670 .zip(ca2.downcast_iter())
671 .zip(ca3.downcast_iter())
672 .map(|((ca1_arr, ca2_arr), ca3_arr)| {
673 let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
674 |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
675 op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
676 },
677 );
678 element_iter.collect_arr()
679 });
680 ChunkedArray::from_chunk_iter(ca1.name().clone(), iter)
681}
682
683pub fn broadcast_binary_elementwise<T, U, V, F>(
684 lhs: &ChunkedArray<T>,
685 rhs: &ChunkedArray<U>,
686 mut op: F,
687) -> ChunkedArray<V>
688where
689 T: PolarsDataType,
690 U: PolarsDataType,
691 V: PolarsDataType,
692 F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
693 V::Array: for<'a> ArrayFromIter<
694 <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
695 >,
696{
697 match (lhs.len(), rhs.len()) {
698 (1, _) => {
699 let a = unsafe { lhs.get_unchecked(0) };
700 unary_elementwise(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
701 },
702 (_, 1) => {
703 let b = unsafe { rhs.get_unchecked(0) };
704 unary_elementwise(lhs, |a| op(a, b.clone()))
705 },
706 _ => binary_elementwise(lhs, rhs, op),
707 }
708}
709
710pub fn broadcast_try_binary_elementwise<T, U, V, F, K, E>(
711 lhs: &ChunkedArray<T>,
712 rhs: &ChunkedArray<U>,
713 mut op: F,
714) -> Result<ChunkedArray<V>, E>
715where
716 T: PolarsDataType,
717 U: PolarsDataType,
718 V: PolarsDataType,
719 F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
720 V::Array: ArrayFromIter<Option<K>>,
721{
722 match (lhs.len(), rhs.len()) {
723 (1, _) => {
724 let a = unsafe { lhs.get_unchecked(0) };
725 Ok(try_unary_elementwise(rhs, |b| op(a.clone(), b))?.with_name(lhs.name().clone()))
726 },
727 (_, 1) => {
728 let b = unsafe { rhs.get_unchecked(0) };
729 try_unary_elementwise(lhs, |a| op(a, b.clone()))
730 },
731 _ => try_binary_elementwise(lhs, rhs, op),
732 }
733}
734
735pub fn broadcast_binary_elementwise_values<T, U, V, F, K>(
736 lhs: &ChunkedArray<T>,
737 rhs: &ChunkedArray<U>,
738 mut op: F,
739) -> ChunkedArray<V>
740where
741 T: PolarsDataType,
742 U: PolarsDataType,
743 V: PolarsDataType,
744 F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
745 V::Array: ArrayFromIter<K>,
746{
747 if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
748 let min = lhs.len().min(rhs.len());
749 let max = lhs.len().max(rhs.len());
750 let len = if min == 1 { max } else { min };
751 let arr = V::Array::full_null(len, V::get_dtype().to_arrow(CompatLevel::newest()));
752
753 return ChunkedArray::with_chunk(lhs.name().clone(), arr);
754 }
755
756 match (lhs.len(), rhs.len()) {
757 (1, _) => {
758 let a = unsafe { lhs.value_unchecked(0) };
759 unary_elementwise_values(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
760 },
761 (_, 1) => {
762 let b = unsafe { rhs.value_unchecked(0) };
763 unary_elementwise_values(lhs, |a| op(a, b.clone()))
764 },
765 _ => binary_elementwise_values(lhs, rhs, op),
766 }
767}
768
769pub fn apply_binary_kernel_broadcast<'l, 'r, L, R, O, K, LK, RK>(
770 lhs: &'l ChunkedArray<L>,
771 rhs: &'r ChunkedArray<R>,
772 kernel: K,
773 lhs_broadcast_kernel: LK,
774 rhs_broadcast_kernel: RK,
775) -> ChunkedArray<O>
776where
777 L: PolarsDataType,
778 R: PolarsDataType,
779 O: PolarsDataType,
780 K: Fn(&L::Array, &R::Array) -> O::Array,
781 LK: Fn(L::Physical<'l>, &R::Array) -> O::Array,
782 RK: Fn(&L::Array, R::Physical<'r>) -> O::Array,
783{
784 let name = lhs.name();
785 let out = match (lhs.len(), rhs.len()) {
786 (a, b) if a == b => binary(lhs, rhs, |lhs, rhs| kernel(lhs, rhs)),
787 (_, 1) => {
789 let opt_rhs = rhs.get(0);
790 match opt_rhs {
791 None => {
792 let arr = O::Array::full_null(
793 lhs.len(),
794 O::get_dtype().to_arrow(CompatLevel::newest()),
795 );
796 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
797 },
798 Some(rhs) => unary_kernel(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
799 }
800 },
801 (1, _) => {
802 let opt_lhs = lhs.get(0);
803 match opt_lhs {
804 None => {
805 let arr = O::Array::full_null(
806 rhs.len(),
807 O::get_dtype().to_arrow(CompatLevel::newest()),
808 );
809 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
810 },
811 Some(lhs) => unary_kernel(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
812 }
813 },
814 _ => panic!("Cannot apply operation on arrays of different lengths"),
815 };
816 out.with_name(name.clone())
817}
818
819pub fn apply_binary_kernel_broadcast_owned<L, R, O, K, LK, RK>(
820 lhs: ChunkedArray<L>,
821 rhs: ChunkedArray<R>,
822 kernel: K,
823 lhs_broadcast_kernel: LK,
824 rhs_broadcast_kernel: RK,
825) -> ChunkedArray<O>
826where
827 L: PolarsDataType,
828 R: PolarsDataType,
829 O: PolarsDataType,
830 K: Fn(L::Array, R::Array) -> O::Array,
831 for<'a> LK: Fn(L::Physical<'a>, R::Array) -> O::Array,
832 for<'a> RK: Fn(L::Array, R::Physical<'a>) -> O::Array,
833{
834 let name = lhs.name().to_owned();
835 let out = match (lhs.len(), rhs.len()) {
836 (a, b) if a == b => binary_owned(lhs, rhs, kernel),
837 (_, 1) => {
839 let opt_rhs = rhs.get(0);
840 match opt_rhs {
841 None => {
842 let arr = O::Array::full_null(
843 lhs.len(),
844 O::get_dtype().to_arrow(CompatLevel::newest()),
845 );
846 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
847 },
848 Some(rhs) => unary_kernel_owned(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
849 }
850 },
851 (1, _) => {
852 let opt_lhs = lhs.get(0);
853 match opt_lhs {
854 None => {
855 let arr = O::Array::full_null(
856 rhs.len(),
857 O::get_dtype().to_arrow(CompatLevel::newest()),
858 );
859 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
860 },
861 Some(lhs) => unary_kernel_owned(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
862 }
863 },
864 _ => panic!("Cannot apply operation on arrays of different lengths"),
865 };
866 out.with_name(name)
867}