polars_core/chunked_array/ops/
row_encode.rs1use arrow::compute::utils::combine_validities_and_many;
2use polars_row::{
3 convert_columns, RowEncodingCategoricalContext, RowEncodingContext, RowEncodingOptions,
4 RowsEncoded,
5};
6use polars_utils::itertools::Itertools;
7use rayon::prelude::*;
8
9use crate::prelude::*;
10use crate::utils::_split_offsets;
11use crate::POOL;
12
13pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
14 let n_threads = POOL.current_num_threads();
15 let len = by[0].len();
16 let splits = _split_offsets(len, n_threads);
17
18 let chunks = splits.into_par_iter().map(|(offset, len)| {
19 let sliced = by
20 .iter()
21 .map(|s| s.slice(offset as i64, len))
22 .collect::<Vec<_>>();
23 let rows = _get_rows_encoded_unordered(&sliced)?;
24 Ok(rows.into_array())
25 });
26 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
27
28 Ok(BinaryOffsetChunked::from_chunk_iter(
29 PlSmallStr::EMPTY,
30 chunks?,
31 ))
32}
33
34pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
36 by: &[Column],
37) -> PolarsResult<BinaryOffsetChunked> {
38 let n_threads = POOL.current_num_threads();
39 let len = by[0].len();
40 let splits = _split_offsets(len, n_threads);
41
42 let chunks = splits.into_par_iter().map(|(offset, len)| {
43 let sliced = by
44 .iter()
45 .map(|s| s.slice(offset as i64, len))
46 .collect::<Vec<_>>();
47 let rows = _get_rows_encoded_unordered(&sliced)?;
48
49 let validities = sliced
50 .iter()
51 .flat_map(|s| {
52 let s = s.rechunk();
53 #[allow(clippy::unnecessary_to_owned)]
54 s.as_materialized_series()
55 .chunks()
56 .to_vec()
57 .into_iter()
58 .map(|arr| arr.validity().cloned())
59 })
60 .collect::<Vec<_>>();
61
62 let validity = combine_validities_and_many(&validities);
63 Ok(rows.into_array().with_validity_typed(validity))
64 });
65 let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());
66
67 Ok(BinaryOffsetChunked::from_chunk_iter(
68 PlSmallStr::EMPTY,
69 chunks?,
70 ))
71}
72
73pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
78 match dtype {
79 DataType::Boolean
80 | DataType::UInt8
81 | DataType::UInt16
82 | DataType::UInt32
83 | DataType::UInt64
84 | DataType::Int8
85 | DataType::Int16
86 | DataType::Int32
87 | DataType::Int64
88 | DataType::Int128
89 | DataType::Float32
90 | DataType::Float64
91 | DataType::String
92 | DataType::Binary
93 | DataType::BinaryOffset
94 | DataType::Null
95 | DataType::Time
96 | DataType::Date
97 | DataType::Datetime(_, _)
98 | DataType::Duration(_) => None,
99
100 DataType::Unknown(_) => panic!("Unsupported in row encoding"),
101
102 #[cfg(feature = "object")]
103 DataType::Object(_, _) => panic!("Unsupported in row encoding"),
104
105 #[cfg(feature = "dtype-decimal")]
106 DataType::Decimal(precision, _) => {
107 Some(RowEncodingContext::Decimal(precision.unwrap_or(38)))
108 },
109
110 #[cfg(feature = "dtype-array")]
111 DataType::Array(dtype, _) => get_row_encoding_context(dtype),
112 DataType::List(dtype) => get_row_encoding_context(dtype),
113 #[cfg(feature = "dtype-categorical")]
114 DataType::Categorical(revmap, ordering) | DataType::Enum(revmap, ordering) => {
115 let revmap = revmap.as_ref().unwrap();
116
117 let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
118 RevMapping::Global(map, _, _) => {
119 let num_known_categories = map.keys().max().copied().map_or(0, |m| m + 1);
120
121 let lexical_sort_idxs =
123 matches!(ordering, CategoricalOrdering::Lexical).then(|| {
124 let read_map = crate::STRING_CACHE.read_map();
125 let payloads = read_map.get_current_payloads();
126 assert!(payloads.len() >= num_known_categories as usize);
127
128 let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
129 idxs.sort_by_key(|&k| payloads[k as usize].as_str());
130 let mut sort_idxs = vec![0; num_known_categories as usize];
131 for (i, idx) in idxs.into_iter().enumerate_u32() {
132 sort_idxs[idx as usize] = i;
133 }
134 sort_idxs
135 });
136
137 (num_known_categories, lexical_sort_idxs)
138 },
139 RevMapping::Local(values, _) => {
140 let lexical_sort_idxs =
142 matches!(ordering, CategoricalOrdering::Lexical).then(|| {
143 assert_eq!(values.null_count(), 0);
144 let values: Vec<&str> = values.values_iter().collect();
145
146 let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
147 idxs.sort_by_key(|&k| values[k as usize]);
148 let mut sort_idxs = vec![0; values.len()];
149 for (i, idx) in idxs.into_iter().enumerate_u32() {
150 sort_idxs[idx as usize] = i;
151 }
152 sort_idxs
153 });
154
155 (values.len() as u32, lexical_sort_idxs)
156 },
157 };
158
159 let ctx = RowEncodingCategoricalContext {
160 num_known_categories,
161 is_enum: matches!(dtype, DataType::Enum(_, _)),
162 lexical_sort_idxs,
163 };
164 Some(RowEncodingContext::Categorical(ctx))
165 },
166 #[cfg(feature = "dtype-struct")]
167 DataType::Struct(fs) => {
168 let mut ctxts = Vec::new();
169
170 for (i, f) in fs.iter().enumerate() {
171 if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
172 ctxts.reserve(fs.len());
173 ctxts.extend(std::iter::repeat_n(None, i));
174 ctxts.push(Some(ctxt));
175 break;
176 }
177 }
178
179 if ctxts.is_empty() {
180 return None;
181 }
182
183 ctxts.extend(
184 fs[ctxts.len()..]
185 .iter()
186 .map(|f| get_row_encoding_context(f.dtype())),
187 );
188
189 Some(RowEncodingContext::Struct(ctxts))
190 },
191 }
192}
193
194pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
195 let rows = _get_rows_encoded_unordered(by)?;
196 Ok(BinaryOffsetChunked::with_chunk(
197 PlSmallStr::EMPTY,
198 rows.into_array(),
199 ))
200}
201
202pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
203 let mut cols = Vec::with_capacity(by.len());
204 let mut opts = Vec::with_capacity(by.len());
205 let mut ctxts = Vec::with_capacity(by.len());
206
207 let num_rows = by.first().map_or(0, |c| c.len());
210
211 for by in by {
212 debug_assert_eq!(by.len(), num_rows);
213
214 let by = by.as_materialized_series();
215 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
216 let opt = RowEncodingOptions::new_unsorted();
217 let ctxt = get_row_encoding_context(by.dtype());
218
219 cols.push(arr);
220 opts.push(opt);
221 ctxts.push(ctxt);
222 }
223 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
224}
225
226pub fn _get_rows_encoded(
227 by: &[Column],
228 descending: &[bool],
229 nulls_last: &[bool],
230) -> PolarsResult<RowsEncoded> {
231 debug_assert_eq!(by.len(), descending.len());
232 debug_assert_eq!(by.len(), nulls_last.len());
233
234 let mut cols = Vec::with_capacity(by.len());
235 let mut opts = Vec::with_capacity(by.len());
236 let mut ctxts = Vec::with_capacity(by.len());
237
238 let num_rows = by.first().map_or(0, |c| c.len());
241
242 for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
243 debug_assert_eq!(by.len(), num_rows);
244
245 let by = by.as_materialized_series();
246 let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
247 let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
248 let ctxt = get_row_encoding_context(by.dtype());
249
250 cols.push(arr);
251 opts.push(opt);
252 ctxts.push(ctxt);
253 }
254 Ok(convert_columns(num_rows, &cols, &opts, &ctxts))
255}
256
257pub fn _get_rows_encoded_ca(
258 name: PlSmallStr,
259 by: &[Column],
260 descending: &[bool],
261 nulls_last: &[bool],
262) -> PolarsResult<BinaryOffsetChunked> {
263 _get_rows_encoded(by, descending, nulls_last)
264 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
265}
266
267pub fn _get_rows_encoded_arr(
268 by: &[Column],
269 descending: &[bool],
270 nulls_last: &[bool],
271) -> PolarsResult<BinaryArray<i64>> {
272 _get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
273}
274
275pub fn _get_rows_encoded_ca_unordered(
276 name: PlSmallStr,
277 by: &[Column],
278) -> PolarsResult<BinaryOffsetChunked> {
279 _get_rows_encoded_unordered(by)
280 .map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
281}