API Reference¶
This page provides detailed documentation of the PSR Lakehouse Client API.
Client Class¶
The main interface for interacting with the PSR Lakehouse API. The client uses a singleton pattern to ensure only one instance exists throughout your application.
from psr.lakehouse import client
Initialization¶
The client is automatically initialized as a singleton. You can configure it using:
from psr.lakehouse import initialize
initialize(
base_url="https://api.example.com",
)
Or set environment variables before importing:
LAKEHOUSE_API_URL="https://api.example.com"
Data Fetching Methods¶
fetch_dataframe()¶
Fetch data from the API and return as a pandas DataFrame using a simplified interface.
Signature:
def fetch_dataframe(
table_name: str,
data_columns: list[str] | None = None,
filters: dict | None = None,
start_reference_date: str | None = None,
end_reference_date: str | None = None,
group_by: list[str] | None = None,
aggregation_method: str | None = None,
datetime_granularity: str | None = None,
order_by: list[dict] | None = None,
output_timezone: str = "America/Sao_Paulo",
) -> pd.DataFrame
Parameters:
table_name(str) - Name of the table to query in snake_case format (e.g.,"ccee_spot_price")data_columns(list[str], optional) - Columns to fetch. If not provided, all columns will be fetched.filters(dict, optional) - Dictionary of column-value pairs for equality filtering (e.g.,{"subsystem": "SOUTHEAST"})start_reference_date(str, optional) - Start date filter in ISO format (inclusive), e.g.,"2023-05-01"end_reference_date(str, optional) - End date filter in ISO format (inclusive), e.g.,"2023-05-02"group_by(list[str], optional) - List of columns to group by for aggregationaggregation_method(str, optional) - Aggregation method when usinggroup_by. Options:"sum","avg","min","max"datetime_granularity(str, optional) - Temporal aggregation level. Options:"hour","day","week","month"order_by(list[dict], optional) - Sort order as list of dictionaries withcolumnanddirection("asc"or"desc")output_timezone(str, optional) - Output timezone for datetime fields. Default:"America/Sao_Paulo"
Returns:
pd.DataFrame- Query results as a pandas DataFrame
Raises:
LakehouseError- Ifgroup_byandaggregation_methodare not provided together, or if an unsupported aggregation method is specified
Example:
# Basic fetch
df = client.fetch_dataframe(
table_name="ccee_spot_price",
data_columns=["reference_date", "subsystem", "spot_price"],
start_reference_date="2023-05-01",
end_reference_date="2023-05-02",
)
# With filtering
df = client.fetch_dataframe(
table_name="ons_stored_energy_subsystem",
data_columns=["reference_date", "verified_stored_energy_percentage"],
start_reference_date="2023-05-01",
end_reference_date="2023-05-02",
filters={"subsystem": "SOUTHEAST"},
)
# With aggregation
df = client.fetch_dataframe(
table_name="ccee_spot_price",
data_columns=["spot_price"],
start_reference_date="2023-01-01",
end_reference_date="2023-02-01",
group_by=["subsystem"],
aggregation_method="avg",
)
# With temporal aggregation
df = client.fetch_dataframe(
table_name="ons_power_plant_hourly_generation",
data_columns=["plant_type", "generation"],
start_reference_date="2025-01-01",
end_reference_date="2025-01-31",
group_by=["reference_date", "plant_type"],
aggregation_method="sum",
datetime_granularity="day",
)
# With ordering
df = client.fetch_dataframe(
table_name="ccee_spot_price",
data_columns=["spot_price"],
start_reference_date="2023-01-01",
end_reference_date="2023-02-01",
order_by=[
{"column": "reference_date", "direction": "desc"},
{"column": "subsystem", "direction": "asc"},
],
)
fetch_dataframe_from_query()¶
Fetch data using a custom JSON query body. This method provides access to advanced features like joins, custom operators, and complex ordering.
Signature:
def fetch_dataframe_from_query(
json_body: dict,
page_size: int = 1000
) -> pd.DataFrame
Parameters:
json_body(dict) - JSON request body for the query. See Query Structure below.page_size(int, optional) - Number of results per page. Default: 1000
Returns:
pd.DataFrame- Query results as a pandas DataFrame
Query Body Structure:
The json_body parameter accepts a dictionary with the following structure:
{
"query_data": [
"ModelName.column1",
"ModelName.column2"
],
"query_filters": [
{
"column": "ModelName.column_name",
"operator": ">=", # Operators: =, !=, >, <, >=, <=
"value": "value"
}
],
"group_by": {
"group_by_clause": ["ModelName.column1"],
"default_aggregation_method": "sum", # Options: sum, avg, min, max
"datetime_granularity": "day" # Options: hour, day, week, month
},
"order_by": [
{
"column": "ModelName.column_name",
"direction": "asc" # or "desc"
}
],
"joins": [
{
"join_model": "OtherModelName",
"join_filters": [
{
"column": "ModelName.key_column",
"value": "OtherModelName.key_column",
"operator": "="
}
],
"is_outer_join": False # or True
}
],
"output_timezone": "America/Sao_Paulo"
}
Notes:
Model names use PascalCase (e.g.,
ONSPowerPlantHourlyGeneration)All fields except
query_dataare optionalThe method automatically handles pagination and fetches all pages
Example:
df = client.fetch_dataframe_from_query({
"query_data": [
"ONSPowerPlantHourlyGeneration.reference_date",
"ONSPowerPlantHourlyGeneration.plant_type",
"ONSPowerPlantHourlyGeneration.generation"
],
"group_by": {
"group_by_clause": [
"ONSPowerPlantHourlyGeneration.reference_date",
"ONSPowerPlantHourlyGeneration.plant_type"
],
"default_aggregation_method": "sum",
"datetime_granularity": "day"
},
"query_filters": [
{
"column": "ONSPowerPlantHourlyGeneration.reference_date",
"operator": ">=",
"value": "2025-01-01"
}
]
})
Schema Discovery Methods¶
list_tables()¶
List all available table names in the API.
Signature:
def list_tables() -> list[str]
Returns:
list[str]- Sorted list of table names in PascalCase (e.g.,["CCEESpotPrice", "ONSEnergyLoadDaily", ...])
Example:
tables = client.list_tables()
print(f"Available tables: {len(tables)}")
for table in tables[:10]:
print(f" - {table}")
get_schema()¶
Get detailed schema information for a specific table.
Signature:
def get_schema(table_name: str) -> dict
Parameters:
table_name(str) - Table name in snake_case or PascalCase (e.g.,"ccee_spot_price"or"CCEESpotPrice")
Returns:
dict- Dictionary mapping field names to their metadata. Each field contains:type(str) - Field type:string,integer,number,enum,boolean, etc.nullable(bool) - Whether the field can be nulltitle(str, optional) - Human-readable field namedescription(str, optional) - Field descriptionformat(str, optional) - Format specification (e.g.,date-time)enum_values(list[str], optional) - Allowed values for enum fields
Example:
schema = client.get_schema("ccee_spot_price")
for field_name, field_info in schema.items():
print(f"\n{field_name}:")
print(f" Type: {field_info['type']}")
print(f" Nullable: {field_info['nullable']}")
if 'description' in field_info:
print(f" Description: {field_info['description']}")
if 'enum_values' in field_info:
print(f" Allowed values: {field_info['enum_values']}")
Example Output:
{
'id': {
'type': 'integer',
'nullable': True,
'title': 'Id'
},
'reference_date': {
'type': 'string',
'nullable': False,
'format': 'date-time',
'title': 'Reference Date',
'description': 'Timestamp of the spot price'
},
'subsystem': {
'type': 'enum',
'nullable': True,
'description': 'Subsystem identifier',
'enum_values': [
'NORTE',
'NORDESTE',
'SUDESTE',
'SUL',
'SISTEMA INTERLIGADO NACIONAL'
]
},
'spot_price': {
'type': 'number',
'nullable': False,
'title': 'Spot Price',
'description': 'Spot price in R$/MWh'
}
}
get_table_columns()¶
Get a list of column names for a given table.
Signature:
def get_table_columns(table_name: str) -> list[str]
Parameters:
table_name(str) - Table name in snake_case or PascalCase
Returns:
list[str]- List of column names
Example:
columns = client.get_table_columns("ccee_spot_price")
print(f"Columns: {columns}")
# Output: ['id', 'reference_date', 'subsystem', 'spot_price', 'updated_at', ...]
Exceptions¶
LakehouseError¶
Base exception class for all PSR Lakehouse errors.
Common Scenarios:
Invalid aggregation method specified
Missing required parameters (e.g.,
group_bywithoutaggregation_method)API connection errors
Authentication failures
Example:
from psr.lakehouse.exceptions import LakehouseError
try:
df = client.fetch_dataframe(
table_name="ccee_spot_price",
group_by=["subsystem"],
# Missing aggregation_method - will raise LakehouseError
)
except LakehouseError as e:
print(f"Error: {e}")
Best Practices¶
Use Appropriate Method
Use
fetch_dataframe()for simple queriesUse
fetch_dataframe_from_query()for complex queries with joins or advanced filtering
Schema Discovery
Always explore schema with
get_schema()before querying unfamiliar tablesUse
list_tables()to discover available data sources
Date Filtering
Always specify date ranges for large tables to avoid fetching excessive data
Use ISO format for dates:
"2023-05-01"
Pagination
The client automatically handles pagination
For very large datasets, consider breaking queries into smaller date ranges
Error Handling
Always wrap API calls in try-except blocks
Handle
LakehouseErrorfor application-specific errors
Performance
Use aggregation at the API level rather than fetching raw data
Specify only the columns you need in
data_columnsUse filters to reduce data transfer
Type Conversion¶
The client automatically handles type conversions:
Datetime columns: Columns named
reference_dateare automatically converted topd.TimestampIndex setting: If
reference_dateexists, it’s automatically set as the DataFrame indexNumeric types: Numeric fields are preserved as appropriate pandas dtypes
Connection Management¶
The HTTP connector validates connectivity during initialization by performing a health check against the API. The singleton pattern ensures connection resources are reused throughout your application lifecycle.