1use std::collections::VecDeque;
3use std::sync::Arc;
4
5mod array;
6
7use arrow_format::ipc::planus::ReadAsRoot;
8use arrow_format::ipc::{Block, DictionaryBatchRef, MessageRef, RecordBatchRef};
9use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};
10use polars_utils::pl_str::PlSmallStr;
11
12use crate::array::Array;
13use crate::datatypes::{ArrowDataType, ArrowSchema, Field};
14use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch};
15use crate::io::ipc::read::{
16 first_dict_field, Dictionaries, FileMetadata, IpcBuffer, Node, OutOfSpecKind,
17};
18use crate::io::ipc::{IpcField, CONTINUATION_MARKER};
19use crate::record_batch::RecordBatchT;
20
21fn read_message(
22 mut bytes: &[u8],
23 block: arrow_format::ipc::Block,
24) -> PolarsResult<(MessageRef, usize)> {
25 let offset: usize = block.offset.try_into().map_err(
26 |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
27 )?;
28
29 let block_length: usize = block.meta_data_length.try_into().map_err(
30 |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
31 )?;
32
33 bytes = &bytes[offset..];
34 let mut message_length = bytes[..4].try_into().unwrap();
35 bytes = &bytes[4..];
36
37 if message_length == CONTINUATION_MARKER {
38 message_length = bytes[..4].try_into().unwrap();
40 bytes = &bytes[4..];
41 };
42
43 let message_length: usize = i32::from_le_bytes(message_length).try_into().map_err(
44 |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
45 )?;
46
47 let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length])
48 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
49
50 Ok((message, offset + block_length))
51}
52
53fn get_buffers_nodes(batch: RecordBatchRef) -> PolarsResult<(VecDeque<IpcBuffer>, VecDeque<Node>)> {
54 let compression = batch.compression().map_err(to_compute_err)?;
55 if compression.is_some() {
56 polars_bail!(ComputeError: "memory_map can only be done on uncompressed IPC files")
57 }
58
59 let buffers = batch
60 .buffers()
61 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
62 .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageBuffers))?;
63 let buffers = buffers.iter().collect::<VecDeque<_>>();
64
65 let field_nodes = batch
66 .nodes()
67 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferNodes(err)))?
68 .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageNodes))?;
69 let field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
70
71 Ok((buffers, field_nodes))
72}
73
74pub(crate) unsafe fn mmap_record<T: AsRef<[u8]>>(
75 fields: &ArrowSchema,
76 ipc_fields: &[IpcField],
77 data: Arc<T>,
78 batch: RecordBatchRef,
79 offset: usize,
80 dictionaries: &Dictionaries,
81) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
82 let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?;
83 let mut variadic_buffer_counts = batch
84 .variadic_buffer_counts()
85 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
86 .map(|v| v.iter().map(|v| v as usize).collect::<VecDeque<usize>>())
87 .unwrap_or_else(VecDeque::new);
88
89 let length = batch
90 .length()
91 .map_err(|_| polars_err!(oos = OutOfSpecKind::MissingData))
92 .unwrap()
93 .try_into()
94 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
95
96 fields
97 .iter_values()
98 .map(|f| &f.dtype)
99 .cloned()
100 .zip(ipc_fields)
101 .map(|(dtype, ipc_field)| {
102 array::mmap(
103 data.clone(),
104 offset,
105 dtype,
106 ipc_field,
107 dictionaries,
108 &mut field_nodes,
109 &mut variadic_buffer_counts,
110 &mut buffers,
111 )
112 })
113 .collect::<PolarsResult<_>>()
114 .and_then(|arr| {
115 RecordBatchT::try_new(
116 length,
117 Arc::new(fields.iter_values().cloned().collect()),
118 arr,
119 )
120 })
121}
122
123pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
135 metadata: &FileMetadata,
136 dictionaries: &Dictionaries,
137 data: Arc<T>,
138 chunk: usize,
139) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
140 let block = metadata.blocks[chunk];
141
142 let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
143 let batch = get_record_batch(message)?;
144 mmap_record(
145 &metadata.schema,
146 &metadata.ipc_schema.fields,
147 data.clone(),
148 batch,
149 offset,
150 dictionaries,
151 )
152}
153
154unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
155 schema: &ArrowSchema,
156 ipc_fields: &[IpcField],
157 data: Arc<T>,
158 block: Block,
159 dictionaries: &mut Dictionaries,
160) -> PolarsResult<()> {
161 let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
162 let batch = get_dictionary_batch(&message)?;
163 mmap_dictionary_from_batch(schema, ipc_fields, &data, batch, dictionaries, offset)
164}
165
166pub(crate) unsafe fn mmap_dictionary_from_batch<T: AsRef<[u8]>>(
167 schema: &ArrowSchema,
168 ipc_fields: &[IpcField],
169 data: &Arc<T>,
170 batch: DictionaryBatchRef,
171 dictionaries: &mut Dictionaries,
172 offset: usize,
173) -> PolarsResult<()> {
174 let id = batch
175 .id()
176 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferId(err)))?;
177 let (first_field, first_ipc_field) = first_dict_field(id, schema, ipc_fields)?;
178
179 let batch = batch
180 .data()
181 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferData(err)))?
182 .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingData))?;
183
184 let value_type = if let ArrowDataType::Dictionary(_, value_type, _) =
185 first_field.dtype.to_logical_type()
186 {
187 value_type.as_ref()
188 } else {
189 polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidIdDataType {requested_id: id} )
190 };
191
192 let field = Field::new(PlSmallStr::EMPTY, value_type.clone(), false);
194
195 let chunk = mmap_record(
196 &std::iter::once((field.name.clone(), field)).collect(),
197 &[first_ipc_field.clone()],
198 data.clone(),
199 batch,
200 offset,
201 dictionaries,
202 )?;
203
204 dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
205
206 Ok(())
207}
208
209pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
215 metadata: &FileMetadata,
216 data: Arc<T>,
217) -> PolarsResult<Dictionaries> {
218 mmap_dictionaries_unchecked2(
219 metadata.schema.as_ref(),
220 &metadata.ipc_schema.fields,
221 metadata.dictionaries.as_ref(),
222 data,
223 )
224}
225
226pub(crate) unsafe fn mmap_dictionaries_unchecked2<T: AsRef<[u8]>>(
227 schema: &ArrowSchema,
228 ipc_fields: &[IpcField],
229 dictionaries: Option<&Vec<arrow_format::ipc::Block>>,
230 data: Arc<T>,
231) -> PolarsResult<Dictionaries> {
232 let blocks = if let Some(blocks) = &dictionaries {
233 blocks
234 } else {
235 return Ok(Default::default());
236 };
237
238 let mut dictionaries = Default::default();
239
240 blocks.iter().cloned().try_for_each(|block| {
241 mmap_dictionary(schema, ipc_fields, data.clone(), block, &mut dictionaries)
242 })?;
243 Ok(dictionaries)
244}