polars_core/utils/
flatten.rs1use arrow::bitmap::MutableBitmap;
2use polars_utils::sync::SyncPtr;
3
4use super::*;
5
6pub fn flatten_df_iter(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
7 df.iter_chunks_physical().flat_map(|chunk| {
8 let columns = df
9 .iter()
10 .zip(chunk.into_arrays())
11 .map(|(s, arr)| {
12 let mut out = unsafe {
15 Series::from_chunks_and_dtype_unchecked(s.name().clone(), vec![arr], s.dtype())
16 };
17 out.set_sorted_flag(s.is_sorted_flag());
18 Column::from(out)
19 })
20 .collect::<Vec<_>>();
21
22 let height = DataFrame::infer_height(&columns);
23 let df = unsafe { DataFrame::new_no_checks(height, columns) };
24 if df.is_empty() {
25 None
26 } else {
27 Some(df)
28 }
29 })
30}
31
32pub fn flatten_series(s: &Series) -> Vec<Series> {
33 let name = s.name();
34 let dtype = s.dtype();
35 unsafe {
36 s.chunks()
37 .iter()
38 .map(|arr| {
39 Series::from_chunks_and_dtype_unchecked(name.clone(), vec![arr.clone()], dtype)
40 })
41 .collect()
42 }
43}
44
45pub fn cap_and_offsets<I>(v: &[Vec<I>]) -> (usize, Vec<usize>) {
46 let cap = v.iter().map(|v| v.len()).sum::<usize>();
47 let offsets = v
48 .iter()
49 .scan(0_usize, |acc, v| {
50 let out = *acc;
51 *acc += v.len();
52 Some(out)
53 })
54 .collect::<Vec<_>>();
55 (cap, offsets)
56}
57
58pub fn flatten_par<T: Send + Sync + Copy, S: AsRef<[T]>>(bufs: &[S]) -> Vec<T> {
59 let mut len = 0;
60 let mut offsets = Vec::with_capacity(bufs.len());
61 let bufs = bufs
62 .iter()
63 .map(|s| {
64 offsets.push(len);
65 let slice = s.as_ref();
66 len += slice.len();
67 slice
68 })
69 .collect::<Vec<_>>();
70 flatten_par_impl(&bufs, len, offsets)
71}
72
73fn flatten_par_impl<T: Send + Sync + Copy>(
74 bufs: &[&[T]],
75 len: usize,
76 offsets: Vec<usize>,
77) -> Vec<T> {
78 let mut out = Vec::with_capacity(len);
79 let out_ptr = unsafe { SyncPtr::new(out.as_mut_ptr()) };
80
81 POOL.install(|| {
82 offsets.into_par_iter().enumerate().for_each(|(i, offset)| {
83 let buf = bufs[i];
84 let ptr: *mut T = out_ptr.get();
85 unsafe {
86 let dst = ptr.add(offset);
87 let src = buf.as_ptr();
88 std::ptr::copy_nonoverlapping(src, dst, buf.len())
89 }
90 })
91 });
92 unsafe {
93 out.set_len(len);
94 }
95 out
96}
97
98pub fn flatten_nullable<S: AsRef<[NullableIdxSize]> + Send + Sync>(
99 bufs: &[S],
100) -> PrimitiveArray<IdxSize> {
101 let a = || flatten_par(bufs);
102 let b = || {
103 let cap = bufs.iter().map(|s| s.as_ref().len()).sum::<usize>();
104 let mut validity = MutableBitmap::with_capacity(cap);
105 validity.extend_constant(cap, true);
106
107 let mut count = 0usize;
108 for s in bufs {
109 let s = s.as_ref();
110
111 for id in s {
112 if id.is_null_idx() {
113 unsafe { validity.set_unchecked(count, false) };
114 }
115
116 count += 1;
117 }
118 }
119 validity.freeze()
120 };
121
122 let (a, b) = POOL.join(a, b);
123 PrimitiveArray::from_vec(bytemuck::cast_vec::<_, IdxSize>(a)).with_validity(Some(b))
124}