polars_arrow/mmap/
mod.rs

1//! Memory maps regions defined on the IPC format into [`Array`].
2use std::collections::VecDeque;
3use std::sync::Arc;
4
5mod array;
6
7use arrow_format::ipc::planus::ReadAsRoot;
8use arrow_format::ipc::{Block, DictionaryBatchRef, MessageRef, RecordBatchRef};
9use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};
10use polars_utils::pl_str::PlSmallStr;
11
12use crate::array::Array;
13use crate::datatypes::{ArrowDataType, ArrowSchema, Field};
14use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch};
15use crate::io::ipc::read::{
16    first_dict_field, Dictionaries, FileMetadata, IpcBuffer, Node, OutOfSpecKind,
17};
18use crate::io::ipc::{IpcField, CONTINUATION_MARKER};
19use crate::record_batch::RecordBatchT;
20
21fn read_message(
22    mut bytes: &[u8],
23    block: arrow_format::ipc::Block,
24) -> PolarsResult<(MessageRef, usize)> {
25    let offset: usize = block.offset.try_into().map_err(
26        |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
27    )?;
28
29    let block_length: usize = block.meta_data_length.try_into().map_err(
30        |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
31    )?;
32
33    bytes = &bytes[offset..];
34    let mut message_length = bytes[..4].try_into().unwrap();
35    bytes = &bytes[4..];
36
37    if message_length == CONTINUATION_MARKER {
38        // continuation marker encountered, read message next
39        message_length = bytes[..4].try_into().unwrap();
40        bytes = &bytes[4..];
41    };
42
43    let message_length: usize = i32::from_le_bytes(message_length).try_into().map_err(
44        |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
45    )?;
46
47    let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length])
48        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
49
50    Ok((message, offset + block_length))
51}
52
53fn get_buffers_nodes(batch: RecordBatchRef) -> PolarsResult<(VecDeque<IpcBuffer>, VecDeque<Node>)> {
54    let compression = batch.compression().map_err(to_compute_err)?;
55    if compression.is_some() {
56        polars_bail!(ComputeError: "memory_map can only be done on uncompressed IPC files")
57    }
58
59    let buffers = batch
60        .buffers()
61        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
62        .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageBuffers))?;
63    let buffers = buffers.iter().collect::<VecDeque<_>>();
64
65    let field_nodes = batch
66        .nodes()
67        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferNodes(err)))?
68        .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageNodes))?;
69    let field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
70
71    Ok((buffers, field_nodes))
72}
73
74pub(crate) unsafe fn mmap_record<T: AsRef<[u8]>>(
75    fields: &ArrowSchema,
76    ipc_fields: &[IpcField],
77    data: Arc<T>,
78    batch: RecordBatchRef,
79    offset: usize,
80    dictionaries: &Dictionaries,
81) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
82    let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?;
83    let mut variadic_buffer_counts = batch
84        .variadic_buffer_counts()
85        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
86        .map(|v| v.iter().map(|v| v as usize).collect::<VecDeque<usize>>())
87        .unwrap_or_else(VecDeque::new);
88
89    let length = batch
90        .length()
91        .map_err(|_| polars_err!(oos = OutOfSpecKind::MissingData))
92        .unwrap()
93        .try_into()
94        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
95
96    fields
97        .iter_values()
98        .map(|f| &f.dtype)
99        .cloned()
100        .zip(ipc_fields)
101        .map(|(dtype, ipc_field)| {
102            array::mmap(
103                data.clone(),
104                offset,
105                dtype,
106                ipc_field,
107                dictionaries,
108                &mut field_nodes,
109                &mut variadic_buffer_counts,
110                &mut buffers,
111            )
112        })
113        .collect::<PolarsResult<_>>()
114        .and_then(|arr| {
115            RecordBatchT::try_new(
116                length,
117                Arc::new(fields.iter_values().cloned().collect()),
118                arr,
119            )
120        })
121}
122
123/// Memory maps an record batch from an IPC file into a [`RecordBatchT`].
124/// # Errors
125/// This function errors when:
126/// * The IPC file is not valid
127/// * the buffers on the file are un-aligned with their corresponding data. This can happen when:
128///     * the file was written with 8-bit alignment
129///     * the file contains type decimal 128 or 256
130/// # Safety
131/// The caller must ensure that `data` contains a valid buffers, for example:
132/// * Offsets in variable-sized containers must be in-bounds and increasing
133/// * Utf8 data is valid
134pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
135    metadata: &FileMetadata,
136    dictionaries: &Dictionaries,
137    data: Arc<T>,
138    chunk: usize,
139) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
140    let block = metadata.blocks[chunk];
141
142    let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
143    let batch = get_record_batch(message)?;
144    mmap_record(
145        &metadata.schema,
146        &metadata.ipc_schema.fields,
147        data.clone(),
148        batch,
149        offset,
150        dictionaries,
151    )
152}
153
154unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
155    schema: &ArrowSchema,
156    ipc_fields: &[IpcField],
157    data: Arc<T>,
158    block: Block,
159    dictionaries: &mut Dictionaries,
160) -> PolarsResult<()> {
161    let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
162    let batch = get_dictionary_batch(&message)?;
163    mmap_dictionary_from_batch(schema, ipc_fields, &data, batch, dictionaries, offset)
164}
165
166pub(crate) unsafe fn mmap_dictionary_from_batch<T: AsRef<[u8]>>(
167    schema: &ArrowSchema,
168    ipc_fields: &[IpcField],
169    data: &Arc<T>,
170    batch: DictionaryBatchRef,
171    dictionaries: &mut Dictionaries,
172    offset: usize,
173) -> PolarsResult<()> {
174    let id = batch
175        .id()
176        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferId(err)))?;
177    let (first_field, first_ipc_field) = first_dict_field(id, schema, ipc_fields)?;
178
179    let batch = batch
180        .data()
181        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferData(err)))?
182        .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingData))?;
183
184    let value_type = if let ArrowDataType::Dictionary(_, value_type, _) =
185        first_field.dtype.to_logical_type()
186    {
187        value_type.as_ref()
188    } else {
189        polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidIdDataType {requested_id: id} )
190    };
191
192    // Make a fake schema for the dictionary batch.
193    let field = Field::new(PlSmallStr::EMPTY, value_type.clone(), false);
194
195    let chunk = mmap_record(
196        &std::iter::once((field.name.clone(), field)).collect(),
197        &[first_ipc_field.clone()],
198        data.clone(),
199        batch,
200        offset,
201        dictionaries,
202    )?;
203
204    dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
205
206    Ok(())
207}
208
209/// Memory maps dictionaries from an IPC file into
210/// # Safety
211/// The caller must ensure that `data` contains a valid buffers, for example:
212/// * Offsets in variable-sized containers must be in-bounds and increasing
213/// * Utf8 data is valid
214pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
215    metadata: &FileMetadata,
216    data: Arc<T>,
217) -> PolarsResult<Dictionaries> {
218    mmap_dictionaries_unchecked2(
219        metadata.schema.as_ref(),
220        &metadata.ipc_schema.fields,
221        metadata.dictionaries.as_ref(),
222        data,
223    )
224}
225
226pub(crate) unsafe fn mmap_dictionaries_unchecked2<T: AsRef<[u8]>>(
227    schema: &ArrowSchema,
228    ipc_fields: &[IpcField],
229    dictionaries: Option<&Vec<arrow_format::ipc::Block>>,
230    data: Arc<T>,
231) -> PolarsResult<Dictionaries> {
232    let blocks = if let Some(blocks) = &dictionaries {
233        blocks
234    } else {
235        return Ok(Default::default());
236    };
237
238    let mut dictionaries = Default::default();
239
240    blocks.iter().cloned().try_for_each(|block| {
241        mmap_dictionary(schema, ipc_fields, data.clone(), block, &mut dictionaries)
242    })?;
243    Ok(dictionaries)
244}