Skip to content

fenic.api.session

Session module for managing query execution context and state.

Classes:

AnthropicModelConfig

Bases: BaseModel

Configuration for Anthropic models.

This class defines the configuration settings for Anthropic language models, including model selection and separate rate limiting parameters for input and output tokens.

Attributes:

  • model_name (ANTHROPIC_AVAILABLE_LANGUAGE_MODELS) –

    The name of the Anthropic model to use.

  • rpm (int) –

    Requests per minute limit; must be greater than 0.

  • input_tpm (int) –

    Input tokens per minute limit; must be greater than 0.

  • output_tpm (int) –

    Output tokens per minute limit; must be greater than 0.

Examples:

Configuring an Anthropic model with separate input/output rate limits:

config = AnthropicModelConfig(
    model_name="claude-3-5-haiku-latest",
    rpm=100,
    input_tpm=100,
    output_tpm=100
)

CloudConfig

Bases: BaseModel

Configuration for cloud-based execution.

This class defines settings for running operations in a cloud environment, allowing for scalable and distributed processing of language model operations.

Attributes:

  • size (Optional[CloudExecutorSize]) –

    Size of the cloud executor instance. If None, the default size will be used.

CloudExecutorSize

Bases: str, Enum

Enum defining available cloud executor sizes.

This enum represents the different size options available for cloud-based execution environments.

Attributes:

  • SMALL

    Small instance size.

  • MEDIUM

    Medium instance size.

  • LARGE

    Large instance size.

  • XLARGE

    Extra large instance size.

GoogleGLAModelConfig

Bases: BaseModel

Configuration for Google GenerativeLAnguage (GLA) models.

This class defines the configuration settings for models available in Google Developer AI Studio, including model selection and rate limiting parameters. These models are accessible using a GEMINI_API_KEY environment variable.

GoogleVertexModelConfig

Bases: BaseModel

Configuration for Google Vertex models.

This class defines the configuration settings for models available in Google Vertex AI, including model selection and rate limiting parameters. In order to use these models, you must have a Google Cloud service account, or use the gcloud cli tool to authenticate your local environment.

OpenAIModelConfig

Bases: BaseModel

Configuration for OpenAI models.

This class defines the configuration settings for OpenAI language and embedding models, including model selection and rate limiting parameters.

Attributes:

  • model_name (Union[OPENAI_AVAILABLE_LANGUAGE_MODELS, OPENAI_AVAILABLE_EMBEDDING_MODELS]) –

    The name of the OpenAI model to use.

  • rpm (int) –

    Requests per minute limit; must be greater than 0.

  • tpm (int) –

    Tokens per minute limit; must be greater than 0.

Examples:

Configuring an OpenAI Language model with rate limits:

config = OpenAIModelConfig(model_name="gpt-4.1-nano", rpm=100, tpm=100)

Configuring an OpenAI Embedding model with rate limits:

config = OpenAIModelConfig(model_name="text-embedding-3-small", rpm=100, tpm=100)

SemanticConfig

Bases: BaseModel

Configuration for semantic language and embedding models.

This class defines the configuration for both language models and optional embedding models used in semantic operations. It ensures that all configured models are valid and supported by their respective providers.

Attributes:

  • language_models (dict[str, ModelConfig]) –

    Mapping of model aliases to language model configurations.

  • default_language_model (Optional[str]) –

    The alias of the default language model to use for semantic operations. Not required if only one language model is configured.

  • embedding_models (Optional[dict[str, ModelConfig]]) –

    Optional mapping of model aliases to embedding model configurations.

  • default_embedding_model (Optional[str]) –

    The alias of the default embedding model to use for semantic operations.

Note

The embedding model is optional and only required for operations that need semantic search or embedding capabilities.

Methods:

model_post_init

model_post_init(__context) -> None

Post initialization hook to set defaults.

This hook runs after the model is initialized and validated. It sets the default language and embedding models if they are not set and there is only one model available.

Source code in src/fenic/api/session/config.py
154
155
156
157
158
159
160
161
162
163
164
165
166
def model_post_init(self, __context) -> None:
    """Post initialization hook to set defaults.

    This hook runs after the model is initialized and validated.
    It sets the default language and embedding models if they are not set
    and there is only one model available.
    """
    # Set default language model if not set and only one model exists
    if self.default_language_model is None and len(self.language_models) == 1:
        self.default_language_model = list(self.language_models.keys())[0]
    # Set default embedding model if not set and only one model exists
    if self.embedding_models is not None and self.default_embedding_model is None and len(self.embedding_models) == 1:
        self.default_embedding_model = list(self.embedding_models.keys())[0]

validate_models

validate_models() -> SemanticConfig

Validates that the selected models are supported by the system.

This validator checks that both the language model and embedding model (if provided) are valid and supported by their respective providers.

Returns:

Raises:

Source code in src/fenic/api/session/config.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
@model_validator(mode="after")
def validate_models(self) -> SemanticConfig:
    """Validates that the selected models are supported by the system.

    This validator checks that both the language model and embedding model (if provided)
    are valid and supported by their respective providers.

    Returns:
        The validated SemanticConfig instance.

    Raises:
        ConfigurationError: If any of the models are not supported.
    """
    if len(self.language_models) == 0:
        raise ConfigurationError("You must specify at least one language model configuration.")
    available_language_model_aliases = list(self.language_models.keys())
    if self.default_language_model is None and len(self.language_models) > 1:
        raise ConfigurationError(f"default_language_model is not set, and multiple language models are configured. Please specify one of: {available_language_model_aliases} as a default_language_model.")

    if self.default_language_model is not None and self.default_language_model not in self.language_models:
        raise ConfigurationError(f"default_language_model {self.default_language_model} is not in configured map of language models. Available models: {available_language_model_aliases} .")

    for model_alias, language_model in self.language_models.items():
        if isinstance(language_model, OpenAIModelConfig):
            language_model_provider = ModelProvider.OPENAI
            language_model_name = language_model.model_name
        elif isinstance(language_model, AnthropicModelConfig):
            language_model_provider = ModelProvider.ANTHROPIC
            language_model_name = language_model.model_name
        elif isinstance(language_model, GoogleGLAModelConfig):
            language_model_provider = ModelProvider.GOOGLE_GLA
            language_model_name = language_model.model_name
        elif isinstance(language_model, GoogleVertexModelConfig):
            language_model_provider = ModelProvider.GOOGLE_VERTEX
            language_model_name = language_model.model_name
        else:
            raise ConfigurationError(
                f"Invalid language model: {model_alias}: {language_model} unsupported model type.")

        completion_model = model_catalog.get_completion_model_parameters(language_model_provider,
                                                                         language_model_name)
        if completion_model is None:
            raise ConfigurationError(
                model_catalog.generate_unsupported_completion_model_error_message(
                    language_model_provider,
                    language_model_name
                )
            )
    if self.embedding_models is not None:
        if self.default_embedding_model is None and len(self.embedding_models) > 1:
            raise ConfigurationError("embedding_models is set but default_embedding_model is missing (ambiguous).")

        if self.default_embedding_model is not None and self.default_embedding_model not in self.embedding_models:
            raise ConfigurationError(
                f"default_embedding_model {self.default_embedding_model} is not in embedding_models")
        for model_alias, embedding_model in self.embedding_models.items():
            if isinstance(embedding_model, OpenAIModelConfig):
                embedding_model_provider = ModelProvider.OPENAI
                embedding_model_name = embedding_model.model_name
            else:
                raise ConfigurationError(
                    f"Invalid embedding model: {model_alias}: {embedding_model} unsupported model type")
            embedding_model_parameters = model_catalog.get_embedding_model_parameters(embedding_model_provider,
                                                                                 embedding_model_name)
            if embedding_model_parameters is None:
                raise ConfigurationError(model_catalog.generate_unsupported_embedding_model_error_message(
                    embedding_model_provider,
                    embedding_model_name
                ))

    return self

Session

The entry point to programming with the DataFrame API. Similar to PySpark's SparkSession.

Create a session with default configuration
session = Session.get_or_create(SessionConfig(app_name="my_app"))
Create a session with cloud configuration
config = SessionConfig(
    app_name="my_app",
    cloud=True,
    api_key="your_api_key"
)
session = Session.get_or_create(config)

Methods:

  • create_dataframe

    Create a DataFrame from a variety of Python-native data formats.

  • get_or_create

    Gets an existing Session or creates a new one with the configured settings.

  • sql

    Execute a read-only SQL query against one or more DataFrames using named placeholders.

  • stop

    Stops the session and closes all connections.

  • table

    Returns the specified table as a DataFrame.

Attributes:

  • catalog (Catalog) –

    Interface for catalog operations on the Session.

  • read (DataFrameReader) –

    Returns a DataFrameReader that can be used to read data in as a DataFrame.

catalog property

catalog: Catalog

Interface for catalog operations on the Session.

read property

read: DataFrameReader

Returns a DataFrameReader that can be used to read data in as a DataFrame.

Returns:

  • DataFrameReader ( DataFrameReader ) –

    A reader interface to read data into DataFrame

Raises:

  • RuntimeError

    If the session has been stopped

create_dataframe

create_dataframe(data: DataLike) -> DataFrame

Create a DataFrame from a variety of Python-native data formats.

Parameters:

  • data (DataLike) –

    Input data. Must be one of: - Polars DataFrame - Pandas DataFrame - dict of column_name -> list of values - list of dicts (each dict representing a row) - pyarrow Table

Returns:

Raises:

  • ValueError

    If the input format is unsupported or inconsistent with provided column names.

Create from Polars DataFrame
import polars as pl
df = pl.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
session.create_dataframe(df)
Create from Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
session.create_dataframe(df)
Create from dictionary
session.create_dataframe({"col1": [1, 2], "col2": ["a", "b"]})
Create from list of dictionaries
session.create_dataframe([
    {"col1": 1, "col2": "a"},
    {"col1": 2, "col2": "b"}
])
Create from pyarrow Table
import pyarrow as pa
table = pa.Table.from_pydict({"col1": [1, 2], "col2": ["a", "b"]})
session.create_dataframe(table)
Source code in src/fenic/api/session/session.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def create_dataframe(
    self,
    data: DataLike,
) -> DataFrame:
    """Create a DataFrame from a variety of Python-native data formats.

    Args:
        data: Input data. Must be one of:
            - Polars DataFrame
            - Pandas DataFrame
            - dict of column_name -> list of values
            - list of dicts (each dict representing a row)
            - pyarrow Table

    Returns:
        A new DataFrame instance

    Raises:
        ValueError: If the input format is unsupported or inconsistent with provided column names.

    Example: Create from Polars DataFrame
        ```python
        import polars as pl
        df = pl.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
        session.create_dataframe(df)
        ```

    Example: Create from Pandas DataFrame
        ```python
        import pandas as pd
        df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
        session.create_dataframe(df)
        ```

    Example: Create from dictionary
        ```python
        session.create_dataframe({"col1": [1, 2], "col2": ["a", "b"]})
        ```

    Example: Create from list of dictionaries
        ```python
        session.create_dataframe([
            {"col1": 1, "col2": "a"},
            {"col1": 2, "col2": "b"}
        ])
        ```

    Example: Create from pyarrow Table
        ```python
        import pyarrow as pa
        table = pa.Table.from_pydict({"col1": [1, 2], "col2": ["a", "b"]})
        session.create_dataframe(table)
        ```
    """
    try:
        if isinstance(data, pl.DataFrame):
            pl_df = data
        elif isinstance(data, pd.DataFrame):
            pl_df = pl.from_pandas(data)
        elif isinstance(data, dict):
            pl_df = pl.DataFrame(data)
        elif isinstance(data, list):
            if not data:
                raise ValidationError(
                    "Cannot create DataFrame from empty list. Provide a non-empty list of dictionaries, lists, or other supported data types."
                )

            if not isinstance(data[0], dict):
                raise ValidationError(
                    "Cannot create DataFrame from list of non-dict values. Provide a list of dictionaries."
                )
            pl_df = pl.DataFrame(data)
        elif isinstance(data, pa.Table):
            pl_df = pl.from_arrow(data)

        else:
            raise ValidationError(
                f"Unsupported data type: {type(data)}. Supported types are: Polars DataFrame, Pandas DataFrame, dict, or list."
            )

    except ValidationError:
        raise
    except Exception as e:
        raise PlanError(f"Failed to create DataFrame from {data}") from e

    return DataFrame._from_logical_plan(
        InMemorySource(pl_df, self._session_state)
    )

get_or_create classmethod

get_or_create(config: SessionConfig) -> Session

Gets an existing Session or creates a new one with the configured settings.

Returns:

  • Session

    A Session instance configured with the provided settings

Source code in src/fenic/api/session/session.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@classmethod
def get_or_create(
    cls,
    config: SessionConfig,
) -> Session:
    """Gets an existing Session or creates a new one with the configured settings.

    Returns:
        A Session instance configured with the provided settings
    """
    if config.cloud:
        from fenic._backends.cloud.manager import CloudSessionManager

        cloud_session_manager = CloudSessionManager()
        if not cloud_session_manager.initialized:
            session_manager_dependencies = (
                CloudSessionManager.create_global_session_dependencies()
            )
            cloud_session_manager.configure(session_manager_dependencies)
        future = asyncio.run_coroutine_threadsafe(
            cloud_session_manager.get_or_create_session_state(config),
            cloud_session_manager._asyncio_loop,
        )
        cloud_session_state = future.result()
        return Session._create_cloud_session(cloud_session_state)

    local_session_state: LocalSessionState = LocalSessionManager().get_or_create_session_state(config._to_resolved_config())
    return Session._create_local_session(local_session_state)

sql

sql(query: str, /, **tables: DataFrame) -> DataFrame

Execute a read-only SQL query against one or more DataFrames using named placeholders.

This allows you to execute ad hoc SQL queries using familiar syntax when it's more convenient than the DataFrame API. Placeholders in the SQL string (e.g. {df}) should correspond to keyword arguments (e.g. df=my_dataframe).

For supported SQL syntax and functions, refer to the DuckDB SQL documentation: https://duckdb.org/docs/sql/introduction.

Parameters:

  • query (str) –

    A SQL query string with placeholders like {df}

  • **tables (DataFrame, default: {} ) –

    Keyword arguments mapping placeholder names to DataFrames

Returns:

  • DataFrame

    A lazy DataFrame representing the result of the SQL query

Raises:

  • ValidationError

    If a placeholder is used in the query but not passed as a keyword argument

Simple join between two DataFrames
df1 = session.create_dataframe({"id": [1, 2]})
df2 = session.create_dataframe({"id": [2, 3]})
result = session.sql(
    "SELECT * FROM {df1} JOIN {df2} USING (id)",
    df1=df1,
    df2=df2
)
Complex query with multiple DataFrames
users = session.create_dataframe({"user_id": [1, 2], "name": ["Alice", "Bob"]})
orders = session.create_dataframe({"order_id": [1, 2], "user_id": [1, 2]})
products = session.create_dataframe({"product_id": [1, 2], "name": ["Widget", "Gadget"]})

result = session.sql("""
    SELECT u.name, p.name as product
    FROM {users} u
    JOIN {orders} o ON u.user_id = o.user_id
    JOIN {products} p ON o.product_id = p.product_id
""", users=users, orders=orders, products=products)
Source code in src/fenic/api/session/session.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def sql(self, query: str, /, **tables: DataFrame) -> DataFrame:
    """Execute a read-only SQL query against one or more DataFrames using named placeholders.

    This allows you to execute ad hoc SQL queries using familiar syntax when it's more convenient than the DataFrame API.
    Placeholders in the SQL string (e.g. `{df}`) should correspond to keyword arguments (e.g. `df=my_dataframe`).

    For supported SQL syntax and functions, refer to the DuckDB SQL documentation:
    https://duckdb.org/docs/sql/introduction.

    Args:
        query: A SQL query string with placeholders like `{df}`
        **tables: Keyword arguments mapping placeholder names to DataFrames

    Returns:
        A lazy DataFrame representing the result of the SQL query

    Raises:
        ValidationError: If a placeholder is used in the query but not passed
            as a keyword argument

    Example: Simple join between two DataFrames
        ```python
        df1 = session.create_dataframe({"id": [1, 2]})
        df2 = session.create_dataframe({"id": [2, 3]})
        result = session.sql(
            "SELECT * FROM {df1} JOIN {df2} USING (id)",
            df1=df1,
            df2=df2
        )
        ```

    Example: Complex query with multiple DataFrames
        ```python
        users = session.create_dataframe({"user_id": [1, 2], "name": ["Alice", "Bob"]})
        orders = session.create_dataframe({"order_id": [1, 2], "user_id": [1, 2]})
        products = session.create_dataframe({"product_id": [1, 2], "name": ["Widget", "Gadget"]})

        result = session.sql(\"\"\"
            SELECT u.name, p.name as product
            FROM {users} u
            JOIN {orders} o ON u.user_id = o.user_id
            JOIN {products} p ON o.product_id = p.product_id
        \"\"\", users=users, orders=orders, products=products)
        ```
    """
    query = query.strip()
    if not query:
        raise ValidationError("SQL query must not be empty.")

    placeholders = set(SQL_PLACEHOLDER_RE.findall(query))
    missing = placeholders - tables.keys()
    if missing:
        raise ValidationError(
            f"Missing DataFrames for placeholders in SQL query: {', '.join(sorted(missing))}. "
            f"Make sure to pass them as keyword arguments, e.g., sql(..., {next(iter(missing))}=df)."
        )

    logical_plans = []
    template_names = []
    for name, table in tables.items():
        if name in placeholders:
            template_names.append(name)
            logical_plans.append(table._logical_plan)

    return DataFrame._from_logical_plan(
        SQL(logical_plans, template_names, query, self._session_state),
    )

stop

stop()

Stops the session and closes all connections.

Source code in src/fenic/api/session/session.py
311
312
313
def stop(self):
    """Stops the session and closes all connections."""
    self._session_state.stop()

table

table(table_name: str) -> DataFrame

Returns the specified table as a DataFrame.

Parameters:

  • table_name (str) –

    Name of the table

Returns:

Raises:

  • ValueError

    If the table does not exist

Load an existing table
df = session.table("my_table")
Source code in src/fenic/api/session/session.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def table(self, table_name: str) -> DataFrame:
    """Returns the specified table as a DataFrame.

    Args:
        table_name: Name of the table

    Returns:
        Table as a DataFrame

    Raises:
        ValueError: If the table does not exist

    Example: Load an existing table
        ```python
        df = session.table("my_table")
        ```
    """
    if not self._session_state.catalog.does_table_exist(table_name):
        raise ValueError(f"Table {table_name} does not exist")
    return DataFrame._from_logical_plan(
        TableSource(table_name, self._session_state),
    )

SessionConfig

Bases: BaseModel

Configuration for a user session.

This class defines the complete configuration for a user session, including application settings, model configurations, and optional cloud settings. It serves as the central configuration object for all language model operations.

Attributes:

  • app_name (str) –

    Name of the application using this session. Defaults to "default_app".

  • db_path (Optional[Path]) –

    Optional path to a local database file for persistent storage.

  • semantic (SemanticConfig) –

    Configuration for semantic models (required).

  • cloud (Optional[CloudConfig]) –

    Optional configuration for cloud execution.

Note

The semantic configuration is required as it defines the language models that will be used for processing. The cloud configuration is optional and only needed for distributed processing.