1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3
4use arrow::offset::OffsetsBuffer;
5use polars_utils::idx_vec::IdxVec;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::prelude::*;
8
9use crate::prelude::*;
10use crate::utils::{flatten, slice_slice, NoNull};
11use crate::POOL;
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
16pub struct GroupsIdx {
17 pub(crate) sorted: bool,
18 first: Vec<IdxSize>,
19 all: Vec<IdxVec>,
20}
21
22pub type IdxItem = (IdxSize, IdxVec);
23pub type BorrowIdxItem<'a> = (IdxSize, &'a IdxVec);
24
25impl Drop for GroupsIdx {
26 fn drop(&mut self) {
27 let v = std::mem::take(&mut self.all);
28 #[cfg(not(target_family = "wasm"))]
31 if v.len() > 1 << 16 {
32 std::thread::spawn(move || drop(v));
33 } else {
34 drop(v);
35 }
36
37 #[cfg(target_family = "wasm")]
38 drop(v);
39 }
40}
41
42impl From<Vec<IdxItem>> for GroupsIdx {
43 fn from(v: Vec<IdxItem>) -> Self {
44 v.into_iter().collect()
45 }
46}
47
48impl From<Vec<Vec<IdxItem>>> for GroupsIdx {
49 fn from(v: Vec<Vec<IdxItem>>) -> Self {
50 let (cap, offsets) = flatten::cap_and_offsets(&v);
53 let mut first = Vec::with_capacity(cap);
54 let first_ptr = first.as_ptr() as usize;
55 let mut all = Vec::with_capacity(cap);
56 let all_ptr = all.as_ptr() as usize;
57
58 POOL.install(|| {
59 v.into_par_iter()
60 .zip(offsets)
61 .for_each(|(mut inner, offset)| {
62 unsafe {
63 let first = (first_ptr as *const IdxSize as *mut IdxSize).add(offset);
64 let all = (all_ptr as *const IdxVec as *mut IdxVec).add(offset);
65
66 let inner_ptr = inner.as_mut_ptr();
67 for i in 0..inner.len() {
68 let (first_val, vals) = std::ptr::read(inner_ptr.add(i));
69 std::ptr::write(first.add(i), first_val);
70 std::ptr::write(all.add(i), vals);
71 }
72 inner.set_len(0);
75 }
76 });
77 });
78 unsafe {
79 all.set_len(cap);
80 first.set_len(cap);
81 }
82 GroupsIdx {
83 sorted: false,
84 first,
85 all,
86 }
87 }
88}
89
90impl GroupsIdx {
91 pub fn new(first: Vec<IdxSize>, all: Vec<IdxVec>, sorted: bool) -> Self {
92 Self { sorted, first, all }
93 }
94
95 pub fn sort(&mut self) {
96 if self.sorted {
97 return;
98 }
99 let mut idx = 0;
100 let first = std::mem::take(&mut self.first);
101 let mut idx_vals = first
103 .into_iter()
104 .map(|v| {
105 let out = [idx, v];
106 idx += 1;
107 out
108 })
109 .collect_trusted::<Vec<_>>();
110 idx_vals.sort_unstable_by_key(|v| v[1]);
111
112 let take_first = || idx_vals.iter().map(|v| v[1]).collect_trusted::<Vec<_>>();
113 let take_all = || {
114 idx_vals
115 .iter()
116 .map(|v| unsafe {
117 let idx = v[0] as usize;
118 std::mem::take(self.all.get_unchecked_mut(idx))
119 })
120 .collect_trusted::<Vec<_>>()
121 };
122 let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
123 self.first = first;
124 self.all = all;
125 self.sorted = true
126 }
127 pub fn is_sorted_flag(&self) -> bool {
128 self.sorted
129 }
130
131 pub fn iter(
132 &self,
133 ) -> std::iter::Zip<std::iter::Copied<std::slice::Iter<IdxSize>>, std::slice::Iter<IdxVec>>
134 {
135 self.into_iter()
136 }
137
138 pub fn all(&self) -> &[IdxVec] {
139 &self.all
140 }
141
142 pub fn first(&self) -> &[IdxSize] {
143 &self.first
144 }
145
146 pub fn first_mut(&mut self) -> &mut Vec<IdxSize> {
147 &mut self.first
148 }
149
150 pub(crate) fn len(&self) -> usize {
151 self.first.len()
152 }
153
154 pub(crate) unsafe fn get_unchecked(&self, index: usize) -> BorrowIdxItem {
155 let first = *self.first.get_unchecked(index);
156 let all = self.all.get_unchecked(index);
157 (first, all)
158 }
159}
160
161impl FromIterator<IdxItem> for GroupsIdx {
162 fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
163 let (first, all) = iter.into_iter().unzip();
164 GroupsIdx {
165 sorted: false,
166 first,
167 all,
168 }
169 }
170}
171
172impl<'a> IntoIterator for &'a GroupsIdx {
173 type Item = BorrowIdxItem<'a>;
174 type IntoIter = std::iter::Zip<
175 std::iter::Copied<std::slice::Iter<'a, IdxSize>>,
176 std::slice::Iter<'a, IdxVec>,
177 >;
178
179 fn into_iter(self) -> Self::IntoIter {
180 self.first.iter().copied().zip(self.all.iter())
181 }
182}
183
184impl IntoIterator for GroupsIdx {
185 type Item = IdxItem;
186 type IntoIter = std::iter::Zip<std::vec::IntoIter<IdxSize>, std::vec::IntoIter<IdxVec>>;
187
188 fn into_iter(mut self) -> Self::IntoIter {
189 let first = std::mem::take(&mut self.first);
190 let all = std::mem::take(&mut self.all);
191 first.into_iter().zip(all)
192 }
193}
194
195impl FromParallelIterator<IdxItem> for GroupsIdx {
196 fn from_par_iter<I>(par_iter: I) -> Self
197 where
198 I: IntoParallelIterator<Item = IdxItem>,
199 {
200 let (first, all) = par_iter.into_par_iter().unzip();
201 GroupsIdx {
202 sorted: false,
203 first,
204 all,
205 }
206 }
207}
208
209impl<'a> IntoParallelIterator for &'a GroupsIdx {
210 type Iter = rayon::iter::Zip<
211 rayon::iter::Copied<rayon::slice::Iter<'a, IdxSize>>,
212 rayon::slice::Iter<'a, IdxVec>,
213 >;
214 type Item = BorrowIdxItem<'a>;
215
216 fn into_par_iter(self) -> Self::Iter {
217 self.first.par_iter().copied().zip(self.all.par_iter())
218 }
219}
220
221impl IntoParallelIterator for GroupsIdx {
222 type Iter = rayon::iter::Zip<rayon::vec::IntoIter<IdxSize>, rayon::vec::IntoIter<IdxVec>>;
223 type Item = IdxItem;
224
225 fn into_par_iter(mut self) -> Self::Iter {
226 let first = std::mem::take(&mut self.first);
227 let all = std::mem::take(&mut self.all);
228 first.into_par_iter().zip(all.into_par_iter())
229 }
230}
231
232pub type GroupsSlice = Vec<[IdxSize; 2]>;
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub enum GroupsType {
243 Idx(GroupsIdx),
244 Slice {
246 groups: GroupsSlice,
248 rolling: bool,
250 },
251}
252
253impl Default for GroupsType {
254 fn default() -> Self {
255 GroupsType::Idx(GroupsIdx::default())
256 }
257}
258
259impl GroupsType {
260 pub fn into_idx(self) -> GroupsIdx {
261 match self {
262 GroupsType::Idx(groups) => groups,
263 GroupsType::Slice { groups, .. } => {
264 polars_warn!("Had to reallocate groups, missed an optimization opportunity. Please open an issue.");
265 groups
266 .iter()
267 .map(|&[first, len]| (first, (first..first + len).collect::<IdxVec>()))
268 .collect()
269 },
270 }
271 }
272
273 pub(crate) fn prepare_list_agg(
274 &self,
275 total_len: usize,
276 ) -> (Option<IdxCa>, OffsetsBuffer<i64>, bool) {
277 let mut can_fast_explode = true;
278 match self {
279 GroupsType::Idx(groups) => {
280 let mut list_offset = Vec::with_capacity(self.len() + 1);
281 let mut gather_offsets = Vec::with_capacity(total_len);
282
283 let mut len_so_far = 0i64;
284 list_offset.push(len_so_far);
285
286 for idx in groups {
287 let idx = idx.1;
288 gather_offsets.extend_from_slice(idx);
289 len_so_far += idx.len() as i64;
290 list_offset.push(len_so_far);
291 can_fast_explode &= !idx.is_empty();
292 }
293 unsafe {
294 (
295 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
296 OffsetsBuffer::new_unchecked(list_offset.into()),
297 can_fast_explode,
298 )
299 }
300 },
301 GroupsType::Slice { groups, .. } => {
302 let mut list_offset = Vec::with_capacity(self.len() + 1);
303 let mut gather_offsets = Vec::with_capacity(total_len);
304 let mut len_so_far = 0i64;
305 list_offset.push(len_so_far);
306
307 for g in groups {
308 let len = g[1];
309 let offset = g[0];
310 gather_offsets.extend(offset..offset + len);
311
312 len_so_far += len as i64;
313 list_offset.push(len_so_far);
314 can_fast_explode &= len > 0;
315 }
316
317 unsafe {
318 (
319 Some(IdxCa::from_vec(PlSmallStr::EMPTY, gather_offsets)),
320 OffsetsBuffer::new_unchecked(list_offset.into()),
321 can_fast_explode,
322 )
323 }
324 },
325 }
326 }
327
328 pub fn iter(&self) -> GroupsTypeIter {
329 GroupsTypeIter::new(self)
330 }
331
332 pub fn sort(&mut self) {
333 match self {
334 GroupsType::Idx(groups) => {
335 if !groups.is_sorted_flag() {
336 groups.sort()
337 }
338 },
339 GroupsType::Slice { .. } => {
340 },
342 }
343 }
344
345 pub(crate) fn is_sorted_flag(&self) -> bool {
346 match self {
347 GroupsType::Idx(groups) => groups.is_sorted_flag(),
348 GroupsType::Slice { .. } => true,
349 }
350 }
351
352 pub fn take_group_firsts(self) -> Vec<IdxSize> {
353 match self {
354 GroupsType::Idx(mut groups) => std::mem::take(&mut groups.first),
355 GroupsType::Slice { groups, .. } => {
356 groups.into_iter().map(|[first, _len]| first).collect()
357 },
358 }
359 }
360
361 pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
365 match self {
366 GroupsType::Idx(groups) => groups
367 .all
368 .iter()
369 .map(|idx| *idx.get_unchecked(idx.len() - 1))
370 .collect(),
371 GroupsType::Slice { groups, .. } => groups
372 .into_iter()
373 .map(|[first, len]| first + len - 1)
374 .collect(),
375 }
376 }
377
378 pub fn par_iter(&self) -> GroupsTypeParIter {
379 GroupsTypeParIter::new(self)
380 }
381
382 pub fn unwrap_idx(&self) -> &GroupsIdx {
388 match self {
389 GroupsType::Idx(groups) => groups,
390 GroupsType::Slice { .. } => panic!("groups are slices not index"),
391 }
392 }
393
394 pub fn unwrap_slice(&self) -> &GroupsSlice {
400 match self {
401 GroupsType::Slice { groups, .. } => groups,
402 GroupsType::Idx(_) => panic!("groups are index not slices"),
403 }
404 }
405
406 pub fn get(&self, index: usize) -> GroupsIndicator {
407 match self {
408 GroupsType::Idx(groups) => {
409 let first = groups.first[index];
410 let all = &groups.all[index];
411 GroupsIndicator::Idx((first, all))
412 },
413 GroupsType::Slice { groups, .. } => GroupsIndicator::Slice(groups[index]),
414 }
415 }
416
417 pub fn idx_mut(&mut self) -> &mut GroupsIdx {
423 match self {
424 GroupsType::Idx(groups) => groups,
425 GroupsType::Slice { .. } => panic!("groups are slices not index"),
426 }
427 }
428
429 pub fn len(&self) -> usize {
430 match self {
431 GroupsType::Idx(groups) => groups.len(),
432 GroupsType::Slice { groups, .. } => groups.len(),
433 }
434 }
435
436 pub fn is_empty(&self) -> bool {
437 self.len() == 0
438 }
439
440 pub fn group_count(&self) -> IdxCa {
441 match self {
442 GroupsType::Idx(groups) => {
443 let ca: NoNull<IdxCa> = groups
444 .iter()
445 .map(|(_first, idx)| idx.len() as IdxSize)
446 .collect_trusted();
447 ca.into_inner()
448 },
449 GroupsType::Slice { groups, .. } => {
450 let ca: NoNull<IdxCa> = groups.iter().map(|[_first, len]| *len).collect_trusted();
451 ca.into_inner()
452 },
453 }
454 }
455 pub fn as_list_chunked(&self) -> ListChunked {
456 match self {
457 GroupsType::Idx(groups) => groups
458 .iter()
459 .map(|(_first, idx)| {
460 let ca: NoNull<IdxCa> = idx.iter().map(|&v| v as IdxSize).collect();
461 ca.into_inner().into_series()
462 })
463 .collect_trusted(),
464 GroupsType::Slice { groups, .. } => groups
465 .iter()
466 .map(|&[first, len]| {
467 let ca: NoNull<IdxCa> = (first..first + len).collect_trusted();
468 ca.into_inner().into_series()
469 })
470 .collect_trusted(),
471 }
472 }
473
474 pub fn into_sliceable(self) -> GroupPositions {
475 let len = self.len();
476 slice_groups(Arc::new(self), 0, len)
477 }
478}
479
480impl From<GroupsIdx> for GroupsType {
481 fn from(groups: GroupsIdx) -> Self {
482 GroupsType::Idx(groups)
483 }
484}
485
486pub enum GroupsIndicator<'a> {
487 Idx(BorrowIdxItem<'a>),
488 Slice([IdxSize; 2]),
489}
490
491impl GroupsIndicator<'_> {
492 pub fn len(&self) -> usize {
493 match self {
494 GroupsIndicator::Idx(g) => g.1.len(),
495 GroupsIndicator::Slice([_, len]) => *len as usize,
496 }
497 }
498 pub fn first(&self) -> IdxSize {
499 match self {
500 GroupsIndicator::Idx(g) => g.0,
501 GroupsIndicator::Slice([first, _]) => *first,
502 }
503 }
504 pub fn is_empty(&self) -> bool {
505 self.len() == 0
506 }
507}
508
509pub struct GroupsTypeIter<'a> {
510 vals: &'a GroupsType,
511 len: usize,
512 idx: usize,
513}
514
515impl<'a> GroupsTypeIter<'a> {
516 fn new(vals: &'a GroupsType) -> Self {
517 let len = vals.len();
518 let idx = 0;
519 GroupsTypeIter { vals, len, idx }
520 }
521}
522
523impl<'a> Iterator for GroupsTypeIter<'a> {
524 type Item = GroupsIndicator<'a>;
525
526 fn nth(&mut self, n: usize) -> Option<Self::Item> {
527 self.idx = self.idx.saturating_add(n);
528 self.next()
529 }
530
531 fn next(&mut self) -> Option<Self::Item> {
532 if self.idx >= self.len {
533 return None;
534 }
535
536 let out = unsafe {
537 match self.vals {
538 GroupsType::Idx(groups) => {
539 let item = groups.get_unchecked(self.idx);
540 Some(GroupsIndicator::Idx(item))
541 },
542 GroupsType::Slice { groups, .. } => {
543 Some(GroupsIndicator::Slice(*groups.get_unchecked(self.idx)))
544 },
545 }
546 };
547 self.idx += 1;
548 out
549 }
550}
551
552pub struct GroupsTypeParIter<'a> {
553 vals: &'a GroupsType,
554 len: usize,
555}
556
557impl<'a> GroupsTypeParIter<'a> {
558 fn new(vals: &'a GroupsType) -> Self {
559 let len = vals.len();
560 GroupsTypeParIter { vals, len }
561 }
562}
563
564impl<'a> ParallelIterator for GroupsTypeParIter<'a> {
565 type Item = GroupsIndicator<'a>;
566
567 fn drive_unindexed<C>(self, consumer: C) -> C::Result
568 where
569 C: UnindexedConsumer<Self::Item>,
570 {
571 (0..self.len)
572 .into_par_iter()
573 .map(|i| unsafe {
574 match self.vals {
575 GroupsType::Idx(groups) => GroupsIndicator::Idx(groups.get_unchecked(i)),
576 GroupsType::Slice { groups, .. } => {
577 GroupsIndicator::Slice(*groups.get_unchecked(i))
578 },
579 }
580 })
581 .drive_unindexed(consumer)
582 }
583}
584
585#[derive(Debug)]
586pub struct GroupPositions {
587 sliced: ManuallyDrop<GroupsType>,
588 original: Arc<GroupsType>,
590 offset: i64,
591 len: usize,
592}
593
594impl Clone for GroupPositions {
595 fn clone(&self) -> Self {
596 let sliced = slice_groups_inner(&self.original, self.offset, self.len);
597
598 Self {
599 sliced,
600 original: self.original.clone(),
601 offset: self.offset,
602 len: self.len,
603 }
604 }
605}
606
607impl PartialEq for GroupPositions {
608 fn eq(&self, other: &Self) -> bool {
609 self.offset == other.offset && self.len == other.len && self.sliced == other.sliced
610 }
611}
612
613impl AsRef<GroupsType> for GroupPositions {
614 fn as_ref(&self) -> &GroupsType {
615 self.sliced.deref()
616 }
617}
618
619impl Deref for GroupPositions {
620 type Target = GroupsType;
621
622 fn deref(&self) -> &Self::Target {
623 self.sliced.deref()
624 }
625}
626
627impl Default for GroupPositions {
628 fn default() -> Self {
629 GroupsType::default().into_sliceable()
630 }
631}
632
633impl GroupPositions {
634 pub fn slice(&self, offset: i64, len: usize) -> Self {
635 let offset = self.offset + offset;
636 slice_groups(
637 self.original.clone(),
638 offset,
639 if len > self.len { self.len } else { len },
641 )
642 }
643
644 pub fn sort(&mut self) {
645 if !self.as_ref().is_sorted_flag() {
646 let original = Arc::make_mut(&mut self.original);
647 original.sort();
648
649 self.sliced = slice_groups_inner(original, self.offset, self.len);
650 }
651 }
652
653 pub fn unroll(mut self) -> GroupPositions {
654 match self.sliced.deref_mut() {
655 GroupsType::Idx(_) => self,
656 GroupsType::Slice { rolling: false, .. } => self,
657 GroupsType::Slice {
658 groups, rolling, ..
659 } => {
660 let mut offset = 0 as IdxSize;
661 for g in groups.iter_mut() {
662 g[0] = offset;
663 offset += g[1];
664 }
665 *rolling = false;
666 self
667 },
668 }
669 }
670}
671
672fn slice_groups_inner(g: &GroupsType, offset: i64, len: usize) -> ManuallyDrop<GroupsType> {
673 match g {
679 GroupsType::Idx(groups) => {
680 let first = unsafe {
681 let first = slice_slice(groups.first(), offset, len);
682 let ptr = first.as_ptr() as *mut _;
683 Vec::from_raw_parts(ptr, first.len(), first.len())
684 };
685
686 let all = unsafe {
687 let all = slice_slice(groups.all(), offset, len);
688 let ptr = all.as_ptr() as *mut _;
689 Vec::from_raw_parts(ptr, all.len(), all.len())
690 };
691 ManuallyDrop::new(GroupsType::Idx(GroupsIdx::new(
692 first,
693 all,
694 groups.is_sorted_flag(),
695 )))
696 },
697 GroupsType::Slice { groups, rolling } => {
698 let groups = unsafe {
699 let groups = slice_slice(groups, offset, len);
700 let ptr = groups.as_ptr() as *mut _;
701 Vec::from_raw_parts(ptr, groups.len(), groups.len())
702 };
703
704 ManuallyDrop::new(GroupsType::Slice {
705 groups,
706 rolling: *rolling,
707 })
708 },
709 }
710}
711
712fn slice_groups(g: Arc<GroupsType>, offset: i64, len: usize) -> GroupPositions {
713 let sliced = slice_groups_inner(g.as_ref(), offset, len);
714
715 GroupPositions {
716 sliced,
717 original: g,
718 offset,
719 len,
720 }
721}