Skip to content

fenic.api.io.writer

Writer interface for saving DataFrames to external storage systems.

Classes:

  • DataFrameWriter

    Interface used to write a DataFrame to external storage systems.

DataFrameWriter

DataFrameWriter(dataframe: DataFrame)

Interface used to write a DataFrame to external storage systems.

Similar to PySpark's DataFrameWriter.

Initialize a DataFrameWriter.

Parameters:

  • dataframe (DataFrame) –

    The DataFrame to write.

Methods:

  • csv

    Saves the content of the DataFrame as a single CSV file with comma as the delimiter and headers in the first row.

  • parquet

    Saves the content of the DataFrame as a single Parquet file.

  • save_as_table

    Saves the content of the DataFrame as the specified table.

Source code in src/fenic/api/io/writer.py
27
28
29
30
31
32
33
def __init__(self, dataframe: DataFrame):
    """Initialize a DataFrameWriter.

    Args:
        dataframe: The DataFrame to write.
    """
    self._dataframe = dataframe

csv

csv(file_path: Union[str, Path], mode: Literal['error', 'overwrite', 'ignore'] = 'overwrite') -> QueryMetrics

Saves the content of the DataFrame as a single CSV file with comma as the delimiter and headers in the first row.

Parameters:

  • file_path (Union[str, Path]) –

    Path to save the CSV file to

  • mode (Literal['error', 'overwrite', 'ignore'], default: 'overwrite' ) –

    Write mode. Default is "overwrite". - error: Raises an error if file exists - overwrite: Overwrites the file if it exists - ignore: Silently ignores operation if file exists

Returns:

Save with overwrite mode (default)
df.write.csv("output.csv")  # Overwrites if exists
Save with error mode
df.write.csv("output.csv", mode="error")  # Raises error if exists
Save with ignore mode
df.write.csv("output.csv", mode="ignore")  # Skips if exists
Source code in src/fenic/api/io/writer.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def csv(
    self,
    file_path: Union[str, Path],
    mode: Literal["error", "overwrite", "ignore"] = "overwrite",
) -> QueryMetrics:
    """Saves the content of the DataFrame as a single CSV file with comma as the delimiter and headers in the first row.

    Args:
        file_path: Path to save the CSV file to
        mode: Write mode. Default is "overwrite".
             - error: Raises an error if file exists
             - overwrite: Overwrites the file if it exists
             - ignore: Silently ignores operation if file exists

    Returns:
        QueryMetrics: The query metrics

    Example: Save with overwrite mode (default)
        ```python
        df.write.csv("output.csv")  # Overwrites if exists
        ```

    Example: Save with error mode
        ```python
        df.write.csv("output.csv", mode="error")  # Raises error if exists
        ```

    Example: Save with ignore mode
        ```python
        df.write.csv("output.csv", mode="ignore")  # Skips if exists
        ```
    """
    file_path = str(file_path)
    if not file_path.endswith(".csv"):
        raise ValidationError(
            f"CSV writer requires a '.csv' file extension. "
            f"Your path '{file_path}' is missing the extension."
        )

    sink_plan = FileSink(
        child=self._dataframe._logical_plan,
        sink_type="csv",
        path=file_path,
        mode=mode,
    )

    metrics = self._dataframe._logical_plan.session_state.execution.save_to_file(
        sink_plan, file_path=file_path, mode=mode
    )
    logger.info(metrics.get_summary())
    return metrics

parquet

parquet(file_path: Union[str, Path], mode: Literal['error', 'overwrite', 'ignore'] = 'overwrite') -> QueryMetrics

Saves the content of the DataFrame as a single Parquet file.

Parameters:

  • file_path (Union[str, Path]) –

    Path to save the Parquet file to

  • mode (Literal['error', 'overwrite', 'ignore'], default: 'overwrite' ) –

    Write mode. Default is "overwrite". - error: Raises an error if file exists - overwrite: Overwrites the file if it exists - ignore: Silently ignores operation if file exists

Returns:

Save with overwrite mode (default)
df.write.parquet("output.parquet")  # Overwrites if exists
Save with error mode
df.write.parquet("output.parquet", mode="error")  # Raises error if exists
Save with ignore mode
df.write.parquet("output.parquet", mode="ignore")  # Skips if exists
Source code in src/fenic/api/io/writer.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def parquet(
    self,
    file_path: Union[str, Path],
    mode: Literal["error", "overwrite", "ignore"] = "overwrite",
) -> QueryMetrics:
    """Saves the content of the DataFrame as a single Parquet file.

    Args:
        file_path: Path to save the Parquet file to
        mode: Write mode. Default is "overwrite".
             - error: Raises an error if file exists
             - overwrite: Overwrites the file if it exists
             - ignore: Silently ignores operation if file exists

    Returns:
        QueryMetrics: The query metrics

    Example: Save with overwrite mode (default)
        ```python
        df.write.parquet("output.parquet")  # Overwrites if exists
        ```

    Example: Save with error mode
        ```python
        df.write.parquet("output.parquet", mode="error")  # Raises error if exists
        ```

    Example: Save with ignore mode
        ```python
        df.write.parquet("output.parquet", mode="ignore")  # Skips if exists
        ```
    """
    file_path = str(file_path)
    if not file_path.endswith(".parquet"):
        raise ValidationError(
            f"Parquet writer requires a '.parquet' file extension. "
            f"Your path '{file_path}' is missing the extension."
        )

    sink_plan = FileSink(
        child=self._dataframe._logical_plan,
        sink_type="parquet",
        path=file_path,
        mode=mode,
    )

    metrics = self._dataframe._logical_plan.session_state.execution.save_to_file(
        sink_plan, file_path=file_path, mode=mode
    )
    logger.info(metrics.get_summary())
    return metrics

save_as_table

save_as_table(table_name: str, mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error') -> QueryMetrics

Saves the content of the DataFrame as the specified table.

Parameters:

  • table_name (str) –

    Name of the table to save to

  • mode (Literal['error', 'append', 'overwrite', 'ignore'], default: 'error' ) –

    Write mode. Default is "error". - error: Raises an error if table exists - append: Appends data to table if it exists - overwrite: Overwrites existing table - ignore: Silently ignores operation if table exists

Returns:

Save with error mode (default)
df.write.save_as_table("my_table")  # Raises error if table exists
Save with append mode
df.write.save_as_table("my_table", mode="append")  # Adds to existing table
Save with overwrite mode
df.write.save_as_table("my_table", mode="overwrite")  # Replaces existing table
Source code in src/fenic/api/io/writer.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def save_as_table(
    self,
    table_name: str,
    mode: Literal["error", "append", "overwrite", "ignore"] = "error",
) -> QueryMetrics:
    """Saves the content of the DataFrame as the specified table.

    Args:
        table_name: Name of the table to save to
        mode: Write mode. Default is "error".
             - error: Raises an error if table exists
             - append: Appends data to table if it exists
             - overwrite: Overwrites existing table
             - ignore: Silently ignores operation if table exists

    Returns:
        QueryMetrics: The query metrics

    Example: Save with error mode (default)
        ```python
        df.write.save_as_table("my_table")  # Raises error if table exists
        ```

    Example: Save with append mode
        ```python
        df.write.save_as_table("my_table", mode="append")  # Adds to existing table
        ```

    Example: Save with overwrite mode
        ```python
        df.write.save_as_table("my_table", mode="overwrite")  # Replaces existing table
        ```
    """
    sink_plan = TableSink(
        child=self._dataframe._logical_plan, table_name=table_name, mode=mode
    )

    metrics = self._dataframe._logical_plan.session_state.execution.save_as_table(
        sink_plan, table_name=table_name, mode=mode
    )
    logger.info(metrics.get_summary())
    return metrics