polars_arrow/io/ipc/append/
mod.rs

1//! A struct adapter of Read+Seek+Write to append to IPC files
2// read header and convert to writer information
3// seek to first byte of header - 1
4// write new batch
5// write new footer
6use std::io::{Read, Seek, SeekFrom, Write};
7
8use polars_error::{polars_bail, polars_err, PolarsResult};
9
10use super::endianness::is_native_little_endian;
11use super::read::{self, FileMetadata};
12use super::write::common::DictionaryTracker;
13use super::write::writer::*;
14use super::write::*;
15
16impl<R: Read + Seek + Write> FileWriter<R> {
17    /// Creates a new [`FileWriter`] from an existing file, seeking to the last message
18    /// and appending new messages afterwards. Users call `finish` to write the footer (with both)
19    /// the existing and appended messages on it.
20    /// # Error
21    /// This function errors iff:
22    /// * the file's endianness is not the native endianness (not yet supported)
23    /// * the file is not a valid Arrow IPC file
24    pub fn try_from_file(
25        mut writer: R,
26        metadata: FileMetadata,
27        options: WriteOptions,
28    ) -> PolarsResult<FileWriter<R>> {
29        if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
30            polars_bail!(ComputeError: "appending to a file of a non-native endianness is not supported")
31        }
32
33        let dictionaries =
34            read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?;
35
36        let last_block = metadata.blocks.last().ok_or_else(|| {
37            polars_err!(oos = "an Arrow IPC file must have at least 1 message (the schema message)")
38        })?;
39        let offset: u64 = last_block
40            .offset
41            .try_into()
42            .map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
43        let meta_data_length: u64 = last_block
44            .meta_data_length
45            .try_into()
46            .map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
47        let body_length: u64 = last_block
48            .body_length
49            .try_into()
50            .map_err(|_| polars_err!(oos = "the block's body length must be a positive number"))?;
51        let offset: u64 = offset + meta_data_length + body_length;
52
53        writer.seek(SeekFrom::Start(offset))?;
54
55        Ok(FileWriter {
56            writer,
57            options,
58            schema: metadata.schema,
59            ipc_fields: metadata.ipc_schema.fields,
60            block_offsets: offset as usize,
61            dictionary_blocks: metadata.dictionaries.unwrap_or_default(),
62            record_blocks: metadata.blocks,
63            state: State::Started, // file already exists, so we are ready
64            dictionary_tracker: DictionaryTracker {
65                dictionaries,
66                cannot_replace: true,
67            },
68            encoded_message: Default::default(),
69            custom_schema_metadata: None,
70        })
71    }
72}