polars_arrow/io/ipc/append/
mod.rs1use std::io::{Read, Seek, SeekFrom, Write};
7
8use polars_error::{polars_bail, polars_err, PolarsResult};
9
10use super::endianness::is_native_little_endian;
11use super::read::{self, FileMetadata};
12use super::write::common::DictionaryTracker;
13use super::write::writer::*;
14use super::write::*;
15
16impl<R: Read + Seek + Write> FileWriter<R> {
17 pub fn try_from_file(
25 mut writer: R,
26 metadata: FileMetadata,
27 options: WriteOptions,
28 ) -> PolarsResult<FileWriter<R>> {
29 if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
30 polars_bail!(ComputeError: "appending to a file of a non-native endianness is not supported")
31 }
32
33 let dictionaries =
34 read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?;
35
36 let last_block = metadata.blocks.last().ok_or_else(|| {
37 polars_err!(oos = "an Arrow IPC file must have at least 1 message (the schema message)")
38 })?;
39 let offset: u64 = last_block
40 .offset
41 .try_into()
42 .map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
43 let meta_data_length: u64 = last_block
44 .meta_data_length
45 .try_into()
46 .map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
47 let body_length: u64 = last_block
48 .body_length
49 .try_into()
50 .map_err(|_| polars_err!(oos = "the block's body length must be a positive number"))?;
51 let offset: u64 = offset + meta_data_length + body_length;
52
53 writer.seek(SeekFrom::Start(offset))?;
54
55 Ok(FileWriter {
56 writer,
57 options,
58 schema: metadata.schema,
59 ipc_fields: metadata.ipc_schema.fields,
60 block_offsets: offset as usize,
61 dictionary_blocks: metadata.dictionaries.unwrap_or_default(),
62 record_blocks: metadata.blocks,
63 state: State::Started, dictionary_tracker: DictionaryTracker {
65 dictionaries,
66 cannot_replace: true,
67 },
68 encoded_message: Default::default(),
69 custom_schema_metadata: None,
70 })
71 }
72}