Skip to content

fenic.api.session.session

Main session class for interacting with the DataFrame API.

Classes:

  • Session

    The entry point to programming with the DataFrame API. Similar to PySpark's SparkSession.

Session

The entry point to programming with the DataFrame API. Similar to PySpark's SparkSession.

Create a session with default configuration
session = Session.get_or_create(SessionConfig(app_name="my_app"))
Create a session with cloud configuration
config = SessionConfig(
    app_name="my_app",
    cloud=True,
    api_key="your_api_key"
)
session = Session.get_or_create(config)

Methods:

  • create_dataframe

    Create a DataFrame from a variety of Python-native data formats.

  • get_or_create

    Gets an existing Session or creates a new one with the configured settings.

  • sql

    Execute a read-only SQL query against one or more DataFrames using named placeholders.

  • stop

    Stops the session and closes all connections.

  • table

    Returns the specified table as a DataFrame.

  • view

    Returns the specified view as a DataFrame.

Attributes:

  • catalog (Catalog) –

    Interface for catalog operations on the Session.

  • read (DataFrameReader) –

    Returns a DataFrameReader that can be used to read data in as a DataFrame.

catalog property

catalog: Catalog

Interface for catalog operations on the Session.

read property

read: DataFrameReader

Returns a DataFrameReader that can be used to read data in as a DataFrame.

Returns:

  • DataFrameReader ( DataFrameReader ) –

    A reader interface to read data into DataFrame

Raises:

  • RuntimeError

    If the session has been stopped

create_dataframe

create_dataframe(data: DataLike) -> DataFrame

Create a DataFrame from a variety of Python-native data formats.

Parameters:

  • data (DataLike) –

    Input data. Must be one of: - Polars DataFrame - Pandas DataFrame - dict of column_name -> list of values - list of dicts (each dict representing a row) - pyarrow Table

Returns:

Raises:

  • ValueError

    If the input format is unsupported or inconsistent with provided column names.

Create from Polars DataFrame
import polars as pl
df = pl.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
session.create_dataframe(df)
Create from Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
session.create_dataframe(df)
Create from dictionary
session.create_dataframe({"col1": [1, 2], "col2": ["a", "b"]})
Create from list of dictionaries
session.create_dataframe([
    {"col1": 1, "col2": "a"},
    {"col1": 2, "col2": "b"}
])
Create from pyarrow Table
import pyarrow as pa
table = pa.Table.from_pydict({"col1": [1, 2], "col2": ["a", "b"]})
session.create_dataframe(table)
Source code in src/fenic/api/session/session.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def create_dataframe(
    self,
    data: DataLike,
) -> DataFrame:
    """Create a DataFrame from a variety of Python-native data formats.

    Args:
        data: Input data. Must be one of:
            - Polars DataFrame
            - Pandas DataFrame
            - dict of column_name -> list of values
            - list of dicts (each dict representing a row)
            - pyarrow Table

    Returns:
        A new DataFrame instance

    Raises:
        ValueError: If the input format is unsupported or inconsistent with provided column names.

    Example: Create from Polars DataFrame
        ```python
        import polars as pl
        df = pl.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
        session.create_dataframe(df)
        ```

    Example: Create from Pandas DataFrame
        ```python
        import pandas as pd
        df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
        session.create_dataframe(df)
        ```

    Example: Create from dictionary
        ```python
        session.create_dataframe({"col1": [1, 2], "col2": ["a", "b"]})
        ```

    Example: Create from list of dictionaries
        ```python
        session.create_dataframe([
            {"col1": 1, "col2": "a"},
            {"col1": 2, "col2": "b"}
        ])
        ```

    Example: Create from pyarrow Table
        ```python
        import pyarrow as pa
        table = pa.Table.from_pydict({"col1": [1, 2], "col2": ["a", "b"]})
        session.create_dataframe(table)
        ```
    """
    try:
        if isinstance(data, pl.DataFrame):
            pl_df = data
        elif isinstance(data, pd.DataFrame):
            pl_df = pl.from_pandas(data)
        elif isinstance(data, dict):
            pl_df = pl.DataFrame(data)
        elif isinstance(data, list):
            if not data:
                raise ValidationError(
                    "Cannot create DataFrame from empty list. Provide a non-empty list of dictionaries, lists, or other supported data types."
                )

            if not isinstance(data[0], dict):
                raise ValidationError(
                    "Cannot create DataFrame from list of non-dict values. Provide a list of dictionaries."
                )
            pl_df = pl.DataFrame(data)
        elif isinstance(data, pa.Table):
            pl_df = pl.from_arrow(data)

        else:
            raise ValidationError(
                f"Unsupported data type: {type(data)}. Supported types are: Polars DataFrame, Pandas DataFrame, dict, or list."
            )

    except ValidationError:
        raise
    except Exception as e:
        raise PlanError(f"Failed to create DataFrame from {data}") from e

    return DataFrame._from_logical_plan(
        InMemorySource.from_session_state(pl_df, self._session_state),
        self._session_state,
    )

get_or_create classmethod

get_or_create(config: SessionConfig) -> Session

Gets an existing Session or creates a new one with the configured settings.

Returns:

  • Session

    A Session instance configured with the provided settings

Source code in src/fenic/api/session/session.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def get_or_create(
    cls,
    config: SessionConfig,
) -> Session:
    """Gets an existing Session or creates a new one with the configured settings.

    Returns:
        A Session instance configured with the provided settings
    """
    if config.cloud:
        from fenic._backends.cloud.manager import CloudSessionManager

        cloud_session_manager = CloudSessionManager()
        if not cloud_session_manager.initialized:
            session_manager_dependencies = (
                CloudSessionManager.create_global_session_dependencies()
            )
            cloud_session_manager.configure(session_manager_dependencies)
        future = asyncio.run_coroutine_threadsafe(
            cloud_session_manager.get_or_create_session_state(config),
            cloud_session_manager._asyncio_loop,
        )
        cloud_session_state = future.result()
        return Session._create_cloud_session(cloud_session_state)

    local_session_state: LocalSessionState = LocalSessionManager().get_or_create_session_state(config._to_resolved_config())
    return Session._create_local_session(local_session_state)

sql

sql(query: str, /, **tables: DataFrame) -> DataFrame

Execute a read-only SQL query against one or more DataFrames using named placeholders.

This allows you to execute ad hoc SQL queries using familiar syntax when it's more convenient than the DataFrame API. Placeholders in the SQL string (e.g. {df}) should correspond to keyword arguments (e.g. df=my_dataframe).

For supported SQL syntax and functions, refer to the DuckDB SQL documentation: https://duckdb.org/docs/sql/introduction.

Parameters:

  • query (str) –

    A SQL query string with placeholders like {df}

  • **tables (DataFrame, default: {} ) –

    Keyword arguments mapping placeholder names to DataFrames

Returns:

  • DataFrame

    A lazy DataFrame representing the result of the SQL query

Raises:

  • ValidationError

    If a placeholder is used in the query but not passed as a keyword argument

Simple join between two DataFrames
df1 = session.create_dataframe({"id": [1, 2]})
df2 = session.create_dataframe({"id": [2, 3]})
result = session.sql(
    "SELECT * FROM {df1} JOIN {df2} USING (id)",
    df1=df1,
    df2=df2
)
Complex query with multiple DataFrames
users = session.create_dataframe({"user_id": [1, 2], "name": ["Alice", "Bob"]})
orders = session.create_dataframe({"order_id": [1, 2], "user_id": [1, 2]})
products = session.create_dataframe({"product_id": [1, 2], "name": ["Widget", "Gadget"]})

result = session.sql("""
    SELECT u.name, p.name as product
    FROM {users} u
    JOIN {orders} o ON u.user_id = o.user_id
    JOIN {products} p ON o.product_id = p.product_id
""", users=users, orders=orders, products=products)
Source code in src/fenic/api/session/session.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def sql(self, query: str, /, **tables: DataFrame) -> DataFrame:
    """Execute a read-only SQL query against one or more DataFrames using named placeholders.

    This allows you to execute ad hoc SQL queries using familiar syntax when it's more convenient than the DataFrame API.
    Placeholders in the SQL string (e.g. `{df}`) should correspond to keyword arguments (e.g. `df=my_dataframe`).

    For supported SQL syntax and functions, refer to the DuckDB SQL documentation:
    https://duckdb.org/docs/sql/introduction.

    Args:
        query: A SQL query string with placeholders like `{df}`
        **tables: Keyword arguments mapping placeholder names to DataFrames

    Returns:
        A lazy DataFrame representing the result of the SQL query

    Raises:
        ValidationError: If a placeholder is used in the query but not passed
            as a keyword argument

    Example: Simple join between two DataFrames
        ```python
        df1 = session.create_dataframe({"id": [1, 2]})
        df2 = session.create_dataframe({"id": [2, 3]})
        result = session.sql(
            "SELECT * FROM {df1} JOIN {df2} USING (id)",
            df1=df1,
            df2=df2
        )
        ```

    Example: Complex query with multiple DataFrames
        ```python
        users = session.create_dataframe({"user_id": [1, 2], "name": ["Alice", "Bob"]})
        orders = session.create_dataframe({"order_id": [1, 2], "user_id": [1, 2]})
        products = session.create_dataframe({"product_id": [1, 2], "name": ["Widget", "Gadget"]})

        result = session.sql(\"\"\"
            SELECT u.name, p.name as product
            FROM {users} u
            JOIN {orders} o ON u.user_id = o.user_id
            JOIN {products} p ON o.product_id = p.product_id
        \"\"\", users=users, orders=orders, products=products)
        ```
    """
    query = query.strip()
    if not query:
        raise ValidationError("SQL query must not be empty.")

    placeholders = set(SQL_PLACEHOLDER_RE.findall(query))
    missing = placeholders - tables.keys()
    if missing:
        raise ValidationError(
            f"Missing DataFrames for placeholders in SQL query: {', '.join(sorted(missing))}. "
            f"Make sure to pass them as keyword arguments, e.g., sql(..., {next(iter(missing))}=df)."
        )

    logical_plans = []
    template_names = []
    input_session_states = []
    for name, table in tables.items():
        if name in placeholders:
            template_names.append(name)
            logical_plans.append(table._logical_plan)
            input_session_states.append(table._session_state)

    DataFrame._ensure_same_session(self._session_state, input_session_states)
    return DataFrame._from_logical_plan(
        SQL.from_session_state(logical_plans, template_names, query, self._session_state),
        self._session_state,
    )

stop

stop()

Stops the session and closes all connections.

Source code in src/fenic/api/session/session.py
337
338
339
def stop(self):
    """Stops the session and closes all connections."""
    self._session_state.stop()

table

table(table_name: str) -> DataFrame

Returns the specified table as a DataFrame.

Parameters:

  • table_name (str) –

    Name of the table

Returns:

Raises:

  • ValueError

    If the table does not exist

Load an existing table
df = session.table("my_table")
Source code in src/fenic/api/session/session.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def table(self, table_name: str) -> DataFrame:
    """Returns the specified table as a DataFrame.

    Args:
        table_name: Name of the table

    Returns:
        Table as a DataFrame

    Raises:
        ValueError: If the table does not exist

    Example: Load an existing table
        ```python
        df = session.table("my_table")
        ```
    """
    if not self._session_state.catalog.does_table_exist(table_name):
        raise ValueError(f"Table {table_name} does not exist")
    return DataFrame._from_logical_plan(
        TableSource.from_session_state(table_name, self._session_state),
        self._session_state,
    )

view

view(view_name: str) -> DataFrame

Returns the specified view as a DataFrame.

Parameters:

  • view_name (str) –

    Name of the view

Returns: DataFrame: Dataframe with the given view

Source code in src/fenic/api/session/session.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def view(self, view_name: str) -> DataFrame:
    """Returns the specified view as a DataFrame.

    Args:
        view_name: Name of the view
    Returns:
        DataFrame: Dataframe with the given view
    """
    if not self._session_state.catalog.does_view_exist(view_name):
        raise CatalogError(f"View {view_name} does not exist")

    view_plan = self._session_state.catalog.describe_view(view_name)
    validate_view(view_name, view_plan, self._session_state)

    return DataFrame._from_logical_plan(
        view_plan,
        self._session_state,
    )