polars_core/chunked_array/ops/
explode_and_offsets.rs

1use arrow::offset::OffsetsBuffer;
2use polars_compute::gather::take_unchecked;
3
4use super::*;
5
6impl ListChunked {
7    fn specialized(
8        &self,
9        values: ArrayRef,
10        offsets: &[i64],
11        offsets_buf: OffsetsBuffer<i64>,
12    ) -> (Series, OffsetsBuffer<i64>) {
13        // SAFETY: inner_dtype should be correct
14        let values = unsafe {
15            Series::from_chunks_and_dtype_unchecked(
16                self.name().clone(),
17                vec![values],
18                &self.inner_dtype().to_physical(),
19            )
20        };
21
22        use crate::chunked_array::ops::explode::ExplodeByOffsets;
23
24        let mut values = match values.dtype() {
25            DataType::Boolean => {
26                let t = values.bool().unwrap();
27                ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
28            },
29            DataType::Null => {
30                let t = values.null().unwrap();
31                ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
32            },
33            dtype => {
34                with_match_physical_numeric_polars_type!(dtype, |$T| {
35                    let t: &ChunkedArray<$T> = values.as_ref().as_ref();
36                    ExplodeByOffsets::explode_by_offsets(t, offsets).into_series()
37                })
38            },
39        };
40
41        // let mut values = values.explode_by_offsets(offsets);
42        // restore logical type
43        values = unsafe { values.from_physical_unchecked(self.inner_dtype()) }.unwrap();
44
45        (values, offsets_buf)
46    }
47}
48
49impl ChunkExplode for ListChunked {
50    fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
51        let ca = self.rechunk();
52        let listarr: &LargeListArray = ca.downcast_iter().next().unwrap();
53        let offsets = listarr.offsets().clone();
54
55        Ok(offsets)
56    }
57
58    fn explode_and_offsets(&self) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
59        // A list array's memory layout is actually already 'exploded', so we can just take the
60        // values array of the list. And we also return a slice of the offsets. This slice can be
61        // used to find the old list layout or indexes to expand a DataFrame in the same manner as
62        // the `explode` operation.
63        let ca = self.rechunk();
64        let listarr: &LargeListArray = ca.downcast_iter().next().unwrap();
65        let offsets_buf = listarr.offsets().clone();
66        let offsets = listarr.offsets().as_slice();
67        let mut values = listarr.values().clone();
68
69        let (mut s, offsets) = if ca._can_fast_explode() {
70            // ensure that the value array is sliced
71            // as a list only slices its offsets on a slice operation
72
73            // we only do this in fast-explode as for the other
74            // branch the offsets must coincide with the values.
75            if !offsets.is_empty() {
76                let start = offsets[0] as usize;
77                let len = offsets[offsets.len() - 1] as usize - start;
78                // SAFETY:
79                // we are in bounds
80                values = unsafe { values.sliced_unchecked(start, len) };
81            }
82            // SAFETY: inner_dtype should be correct
83            (
84                unsafe {
85                    Series::from_chunks_and_dtype_unchecked(
86                        self.name().clone(),
87                        vec![values],
88                        &self.inner_dtype().to_physical(),
89                    )
90                },
91                offsets_buf,
92            )
93        } else {
94            // during tests
95            // test that this code branch is not hit with list arrays that could be fast exploded
96            #[cfg(test)]
97            {
98                let mut last = offsets[0];
99                let mut has_empty = false;
100                for &o in &offsets[1..] {
101                    if o == last {
102                        has_empty = true;
103                    }
104                    last = o;
105                }
106                if !has_empty && offsets[0] == 0 {
107                    panic!("could have fast exploded")
108                }
109            }
110            let (indices, new_offsets) = if listarr.null_count() == 0 {
111                // SPECIALIZED path.
112                let inner_phys = self.inner_dtype().to_physical();
113                if inner_phys.is_primitive_numeric() || inner_phys.is_null() || inner_phys.is_bool()
114                {
115                    return Ok(self.specialized(values, offsets, offsets_buf));
116                }
117                // Use gather
118                let mut indices =
119                    MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
120                let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
121                let mut current_offset = 0i64;
122                let mut iter = offsets.iter();
123                if let Some(mut previous) = iter.next().copied() {
124                    new_offsets.push(current_offset);
125                    iter.for_each(|&offset| {
126                        let len = offset - previous;
127                        let start = previous as IdxSize;
128                        let end = offset as IdxSize;
129
130                        if len == 0 {
131                            indices.push_null();
132                        } else {
133                            indices.extend_trusted_len_values(start..end);
134                        }
135                        current_offset += len;
136                        previous = offset;
137                        new_offsets.push(current_offset);
138                    })
139                }
140                (indices, new_offsets)
141            } else {
142                // we have already ensure that validity is not none.
143                let validity = listarr.validity().unwrap();
144
145                let mut indices =
146                    MutablePrimitiveArray::<IdxSize>::with_capacity(*offsets_buf.last() as usize);
147                let mut new_offsets = Vec::with_capacity(listarr.len() + 1);
148                let mut current_offset = 0i64;
149                let mut iter = offsets.iter();
150                if let Some(mut previous) = iter.next().copied() {
151                    new_offsets.push(current_offset);
152                    iter.enumerate().for_each(|(i, &offset)| {
153                        let len = offset - previous;
154                        let start = previous as IdxSize;
155                        let end = offset as IdxSize;
156                        // SAFETY: we are within bounds
157                        if unsafe { validity.get_bit_unchecked(i) } {
158                            // explode expects null value if sublist is empty.
159                            if len == 0 {
160                                indices.push_null();
161                            } else {
162                                indices.extend_trusted_len_values(start..end);
163                            }
164                            current_offset += len;
165                        } else {
166                            indices.push_null();
167                        }
168                        previous = offset;
169                        new_offsets.push(current_offset);
170                    })
171                }
172                (indices, new_offsets)
173            };
174
175            // SAFETY: the indices we generate are in bounds
176            let chunk = unsafe { take_unchecked(values.as_ref(), &indices.into()) };
177            // SAFETY: inner_dtype should be correct
178            let s = unsafe {
179                Series::from_chunks_and_dtype_unchecked(
180                    self.name().clone(),
181                    vec![chunk],
182                    &self.inner_dtype().to_physical(),
183                )
184            };
185            // SAFETY: monotonically increasing
186            let new_offsets = unsafe { OffsetsBuffer::new_unchecked(new_offsets.into()) };
187            (s, new_offsets)
188        };
189        debug_assert_eq!(s.name(), self.name());
190        // restore logical type
191        s = unsafe { s.from_physical_unchecked(self.inner_dtype()) }.unwrap();
192
193        Ok((s, offsets))
194    }
195}
196
197#[cfg(feature = "dtype-array")]
198impl ChunkExplode for ArrayChunked {
199    fn offsets(&self) -> PolarsResult<OffsetsBuffer<i64>> {
200        // fast-path for non-null array.
201        if self.null_count() == 0 {
202            let width = self.width() as i64;
203            let offsets = (0..self.len() + 1)
204                .map(|i| {
205                    let i = i as i64;
206                    i * width
207                })
208                .collect::<Vec<_>>();
209            // SAFETY: monotonically increasing
210            let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
211
212            return Ok(offsets);
213        }
214
215        let ca = self.rechunk();
216        let arr = ca.downcast_iter().next().unwrap();
217        // we have already ensure that validity is not none.
218        let validity = arr.validity().unwrap();
219        let width = arr.size();
220
221        let mut current_offset = 0i64;
222        let offsets = (0..=arr.len())
223            .map(|i| {
224                if i == 0 {
225                    return current_offset;
226                }
227                // SAFETY: we are within bounds
228                if unsafe { validity.get_bit_unchecked(i - 1) } {
229                    current_offset += width as i64
230                }
231                current_offset
232            })
233            .collect::<Vec<_>>();
234        // SAFETY: monotonically increasing
235        let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
236        Ok(offsets)
237    }
238
239    fn explode_and_offsets(&self) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
240        let ca = self.rechunk();
241        let arr = ca.downcast_iter().next().unwrap();
242        // fast-path for non-null array.
243        if arr.null_count() == 0 {
244            let s = Series::try_from((self.name().clone(), arr.values().clone()))
245                .unwrap()
246                .cast(ca.inner_dtype())?;
247            let width = self.width() as i64;
248            let offsets = (0..self.len() + 1)
249                .map(|i| {
250                    let i = i as i64;
251                    i * width
252                })
253                .collect::<Vec<_>>();
254            // SAFETY: monotonically increasing
255            let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
256            return Ok((s, offsets));
257        }
258
259        // we have already ensure that validity is not none.
260        let validity = arr.validity().unwrap();
261        let values = arr.values();
262        let width = arr.size();
263
264        let mut indices = MutablePrimitiveArray::<IdxSize>::with_capacity(
265            values.len() - arr.null_count() * (width - 1),
266        );
267        let mut offsets = Vec::with_capacity(arr.len() + 1);
268        let mut current_offset = 0i64;
269        offsets.push(current_offset);
270        (0..arr.len()).for_each(|i| {
271            // SAFETY: we are within bounds
272            if unsafe { validity.get_bit_unchecked(i) } {
273                let start = (i * width) as IdxSize;
274                let end = start + width as IdxSize;
275                indices.extend_trusted_len_values(start..end);
276                current_offset += width as i64;
277            } else {
278                indices.push_null();
279            }
280            offsets.push(current_offset);
281        });
282
283        // SAFETY: the indices we generate are in bounds
284        let chunk = unsafe { take_unchecked(&**values, &indices.into()) };
285        // SAFETY: monotonically increasing
286        let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
287
288        Ok((
289            // SAFETY: inner_dtype should be correct
290            unsafe {
291                Series::from_chunks_and_dtype_unchecked(
292                    ca.name().clone(),
293                    vec![chunk],
294                    ca.inner_dtype(),
295                )
296            },
297            offsets,
298        ))
299    }
300}