Applications and Logging#

gantry.init(api_key: Optional[str] = None, logging_level: Optional[typing_extensions.Literal[DEBUG, INFO, WARNING, CRITICAL, ERROR]] = None, environment: Optional[str] = None, send_in_background: Optional[bool] = None)#

Initialize gantry services. This is always the first step in using the SDK.

Example:

import gantry
gantry.init(api_key="foobar")
Parameters
  • api_key (optional, str) – Your Gantry API Key. You can also set this parameter by setting the env variable GANTRY_API_KEY.

  • logging_level (optional, str) – Set logging level for Gantry system logging. You can also set this parameter by setting the env variable GANTRY_LOGGING_LEVEL. Options are: DEBUG, INFO, WARNING, CRITICAL or ERROR. If not specified, it defaults to INFO.

  • environment (optional, str) – Set the value for the environment label attached to data instrumented. You can also set this parameter by setting the env variable GANTRY_ENVIRONMENT. If not provided, it defaults to “dev”. The environment is a tag attached to data. To override this value on ingestion, you set the ‘env’ tag in the tags parameter.

  • send_in_background (optional, bool) – Set whether Gantry logging methods should run synchronously. You can also set this value by setting the env variable GANTRY_SEND_IN_BACKGROUND. If not provided, it defaults to True unless running in an AWS lambda.

class gantry.Application(name: str, api_client: gantry.api_client.APIClient, id: Optional[uuid.UUID] = None, organization_id: Optional[uuid.UUID] = None)#

A class to represent a single application. This class is not directly initialized, but is instead initialized by calling gantry.create_application(“my-app”), or gantry.get_application(“my-app”).

__init__(name: str, api_client: gantry.api_client.APIClient, id: Optional[uuid.UUID] = None, organization_id: Optional[uuid.UUID] = None)#

Note: Please use gantry.create_application() to create an application instead of defining an application directly by calling this method.

log(inputs: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, outputs: Optional[Union[Any, List[Any], dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, feedbacks: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, ignore_inputs: Optional[List[str]] = None, timestamps: Optional[Union[datetime.datetime, List[datetime.datetime], pandas.core.indexes.datetimes.DatetimeIndex, numpy.ndarray]] = None, sort_on_timestamp: bool = True, sample_rate: float = 1.0, as_batch: Optional[bool] = False, version: Optional[Union[str, int]] = None, tags: Optional[Union[Dict[str, str], List[Dict[str, str]]]] = None, row_tags: Optional[Union[Dict[str, str], List[Dict[str, str]], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, global_tags: Optional[Dict[str, str]] = None, join_keys: Optional[Union[str, List[str], pandas.core.series.Series]] = None)#

Ingests an event or a batch of events containing predictions (inputs and outputs), feedback, or both.

Example:

app.log(
    inputs=[{'A': 100}, {'A': 101}],
    outputs=[{'B': 'foo'}, {'B': 'bar'}],
    version=1,
    feedbacks=[{'B': 'bar'}, {'B': 'foo'}],
    global_tags={"env": "environment1"},
    join_keys=["12345", "67890"]
)
Parameters
  • inputs (optional, Union[dict, List[dict], pd.DataFrame, pd.Series, np.ndarray]) – A list of prediction inputs. inputs[i] is a dict of the features for the i-th prediction to be logged.

  • outputs (optional, Union[Any, List[Any], dict, List[dict], pd.DataFrame, pd.Series, np.ndarray) – A list of prediction outputs. outputs[i] should be the application output for the prediction with features inputs[i].

  • feedbacks (optional, Union[dict, List[dict], pd.DataFrame, pd.Series, np.ndarray]) – A list of feedbacks. feedbacks[i] is a dict of the features for the i-th prediction to be logged.

  • ignore_inputs (optional, List[str]) – A list of names of input features that should not be logged.

  • timestamps (optional, Union[List[datetime.datetime], pd.DatetimeIndex, np.array[datetime.datetime]) – A list of prediction timestamps. If specified, timestamps[i] should be the timestamps for the i-th prediction. If timestamps = None (default), then the prediction timestamp defaults to the time when log_records is called.

  • sort_on_timestamp (bool, defaults to True) – Works when timestamps are provided. Sort using the given timestamp. Default to True.

  • sample_rate – Used for down-sampling. The probability as a float that each record will be sent to Gantry.

  • as_batch (bool, defaults to False) – Whether to add batch metadata and tracking in the ‘batch’ section of the dashboard

  • version (optional, Union[int, str]) – Version of the function schema to use for validation by Gantry. If not provided, Gantry will use the latest version. If the version doesn’t exist yet, Gantry will create it by auto-generating the schema based on data it has seen. Providing an int or its value stringified has no difference (e.g. version=10 will be the same as version=’10’).

  • tags (optional, Optional[Union[Dict[str, str], List[Dict[str, str]]]) – A tag is a label that you assign to your data. E.g. you can specify which environment the data belongs to by setting “env” tag like this tags = {“env”: “environment1”} if not assigned we will use Gantry client’s environment value as the defaults environment. If this parameter is a dict, tags will apply to all records. Alternatively, you can pass a list of dicts to apply tags to each record independantly. IMPORTANT: this parameter is in deprecation mode and it will be removed soon. Use row_tags and global_tags instead.

  • row_tags (optional, Optional[Union[List[Dict[str, str]], pd.DataFrame, pd.Series, np.ndarray]]) –

    Specify row level tags. If provided, this parameter should be a list of tags to be applied to each of the records. row_tags[i] will contain a dictionary of strings containing the tags to attach to the i-th record. Alternatively, tags may be specified by passing in a DataFrame, Series, or Array, like inputs.

    For batch global tags, use the ‘global_tags’ parameter instead.

  • global_tags (optional, Dict[str, str]) – Specify a set of tags that will be attached to all ingested records from this batch. For record specific tags, you can use ‘row_tags’ param. Only used when log is not called within a run.

  • join_keys (optional, Union[List[str], pd.Series[str]]) – provide keys to identify each record. These keys can be used later to provide feedback. If not provided, a random record key will be generated for each record.

Returns

The batch_id will be None if records are not logged as batch. The list of join_keys will be the records keys.

Return type

Tuple ([batch_id, list[join_keys]])

query(time_window: Optional[Union[gantry.query.time_window.TimeWindow, gantry.query.time_window.RelativeTimeWindow]] = None, version: Optional[Union[str, int]] = None, env: Optional[str] = None, filters: Optional[List[dict]] = None, tags: Optional[dict] = None)#

Query for a window of data for this application, with given parameters.

Example:

time_window = RelativeTimeWindow(window_length = datetime.timedelta(days=1),
    offset = datetime.timedelta(minutes=1))
query = app.query(
    time_window,
    tags = {"champion": "messi"},
    filters=[
        {
            "feature_name": "inputs.speed",
            "lower_bound": 0,
            "upper_bound": 3
        }
    ]
)
Parameters
  • time_window (optional, Union[TimeWindow, RelativeTimeWindow]) –

    A time window object. If a TimeWindow is passed in, the query is saved with fixed timestamps. If a RelativeTimeWindow is passed in, the query is saved with relative time

    window and offset.

  • version (optional, Union[int, str]) – The version of the application to query.

  • env (optional, str) – The environment of the application to query.

  • filters (optional, list[dict]) – A list of filters to apply to the query.

  • tags (optional, dict) – A dictionary of tags to apply to the query.

Returns

Gantry Dataframe with all the query information to fetch data.

save_query(name: str, query: gantry.query.core.dataframe.GantryDataFrame)#

Save a query to the Gantry database.

Example:

query = app.query(....)
app.save_query("demo_query", query)
Parameters
  • name (str) – The name of the query.

  • query (GantryDataFrame) – The query to save.

get_query(name: str) gantry.query.core.dataframe.GantryDataFrame#

Get a saved query from the Gantry database.

Example:

query = app.get_query("demo_query")
Parameters

name (str) – The name of the query.

Returns

Gantry Dataframe with all the query information to fetch data.

list_queries()#

List all saved queries for this application. Example:

queries = app.list_queries()
for query in queries:
    print(query["name"])
Returns

A list of queries (in Python dict).

start_run(name: str, tags: Optional[dict] = None)#

Start a run for this application.

Example:

with app.start_run(name="Demo_run", tags = {"champion": "messi"}) as run:
    app.log(inputs=inputs, outputs=outputs, feedbacks=feedback, join_keys=join_keys)
    app.log(inputs={...}, outputs={...}, join_keys='12345')
    app.log(....)
Parameters
  • name (str) – Name of the run.

  • tags (optional, dict) – A dictionary of tags to apply to the run.

Returns

A Run object.

get_run(name: Optional[str] = None, tags: Optional[dict] = None)#

Get a run in this application, filtered by tags or name.

Example:

job_info = app.get_run(name="Demo_run_prediction")
jobs_info = app.get_run(tags={"champion": "argentina"})
Parameters
  • name (optional, str) – Name of the run.

  • tags (optional, dict) – Tags of the run.

Returns

An ingestion job object associated with the requested run.

add_automation(automation: gantry.automations.automations.Automation)#

Add an automation to this application.

Example:

automation = Automation(...)
app.add_automation(automation)
Parameters

automation (Automation) – An Automation object.

Returns

None

get_automation(automation_name: Optional[str] = None)#

Get an automation by name

Example:

automation = app.get_automation(automation_name="demo_automation")
Parameters

automation_name (optional, str) – Name of the automation.

Returns

An Automation object.

list_automations()#

List all automations for this application.

Example:

automations = app.list_automations()
for automation in automations:
    print(automation["name"])
Returns

A list of Automation objects.

list_workspaces()#

Get all workspaces for this application.

Example:

workspaces = app.list_workspaces()
for workspace in workspaces:
    print(workspace["name"])
Returns

A list of Workspace objects.

get_schema()#

Get the schema for this application.

Example:

schema = app.get_schema()
print(schema["func_name"]) -> application name
Returns

The application schema (in Python dict).

create_dataset(name: str)#

Create a dataset for this application.

Example:

dataset = app.create_dataset(name="demo_dataset")
Parameters

name (str) – The name of the dataset.

Returns

an object representing the created dataset.

Return type

gantry.dataset.GantryDataset

list_datasets(include_deleted: bool = False) List[Dict[str, Any]]#

List all datasets for this application.

Example:

datasets = app.list_datasets()
for dataset in datasets:
    print(dataset["name"])
Parameters

include_deleted (optional, bool) – Will include deleted datasets if set to true and will omit them otherwise.

Returns

List of dictionaries, each representing one dataset and associated metadata.

Return type

List[Dict[str, Any]]

class gantry.CompletionApplication(name: str, api_client: gantry.api_client.APIClient, id: typing.Optional[uuid.UUID] = None, organization_id: typing.Optional[uuid.UUID] = None, log_store_factory: typing.Callable[[gantry.api_client.APIClient], gantry.logger.stores.BaseLogStore] = <class 'gantry.applications.llm.CompletionAPILogStore'>)#

A class representing a completion application, a subclass of Application. This has special logging methods for logging completions.

__init__(name: str, api_client: gantry.api_client.APIClient, id: typing.Optional[uuid.UUID] = None, organization_id: typing.Optional[uuid.UUID] = None, log_store_factory: typing.Callable[[gantry.api_client.APIClient], gantry.logger.stores.BaseLogStore] = <class 'gantry.applications.llm.CompletionAPILogStore'>)#

Note: Please use gantry.create_application() to create an application instead of defining an application directly by calling this method.

create_version(prompt_template: str, description: str, model: str, model_params: Optional[dict] = None, prompt_inputs: Optional[List[str]] = None) gantry.applications.llm.VersionDetails#

Creates a new version of a prompt template associated to this application.

Example usage for a version using OpenAI Completion model:

import gantry

my_llm_app = gantry.get_application("my_llm_app")

my_llm_app.create_version(
    prompt_template="This is a prompt template. {{input1}} {{input2}}",
    description="My first version",
    model="text-davinci-001",
    model_params={"temperature": 0.5},
    prompt_inputs=["input1", "input2"],
)

Example usage for a version using OpenAI Chat model with function support:

import gantry

my_llm_app = gantry.get_application("my_llm_app")

my_llm_app.create_version(
    prompt_template=(
        "Assistant: you are a helpful assistant\n\n"
        "User: What's the weather in Boston?"
    ),
    description="My first version",
    model="gpt-4",
    model_params={
        "temperature": 1,
        "function_call": "auto",
        "functions": [
            {
                "name": "get_current_weather",
                "description": "Get the current weather in a given location",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": {
                            "type": "string",
                            "description": "The city and state, e.g. San Francisco, CA"
                        },
                        "unit": {
                            "type": "string",
                            "enum": ["celsius", "fahrenheit"]
                        }
                    },
                    "required": ["location"]
                }
            }
        ]
    }
)

Example usage for a version using Cohere model:

import gantry

my_llm_app = gantry.get_application("my_llm_app")

my_llm_app.create_version(
    prompt_template="This is a prompt template. {{input1}} {{input2}}",
    description="My first version",
    model="command",
    model_params={"max_tokens": 500},
    prompt_inputs=["input1", "input2"],
)

Example output:

VersionDetails(
    config={
        "model": "text-davinci-002",
        "params": {
            "frequency_penalty": 0.0,
            "max_tokens": 16,
            "presence_penalty": 0.0,
            "temperature": 0.5,
            "top_p": 1.0
        },
        "prompt": "This is a prompt template. {{input1}} {{input2}}",
        "prompt_inputs": ["input1", "input2"],
    },
    description="My first version",
    app_name="my_llm_app",
    version_id=UUID("a1b2c3d4-e5f6-4a3b-8c2d-1e2f3a4b5c6d"),
    version_number=1,
)
IMPORTANT: ‘functions’ and ‘function_call’ are valid model_params only

for ‘openai-chat’ vendor.

Parameters
  • prompt_template (str) – The prompt template to use for this version. Denote variables like this: {{variable}}

  • description (str) – A description of this version.

  • model (str) – The model to use for this version. Must be a valid OpenAI completion text model.

  • model_params (dict, optional) – A dictionary of model parameters to use for this prompt. See vendor’s documentation for more details.

  • prompt_inputs (List[str], optional) – A list of input names to use for this prompt, when using prompt variables. If not provided, the function will attempt to extract the input names from the prompt template.

Returns

A dataclass containing the configuration data, description, application name, version ID, and version number of the newly created version.

Return type

VersionDetails

get_version(version: Literal['prod', 'test', 'latest']) Optional[gantry.applications.llm.VersionDetails]#

Returns the prompt configuration data for a given version of this application.

Example usage:

import gantry

my_llm_app = gantry.get_application("my_llm_app")

my_llm_app.get_version("prod")

Example output:

VersionDetails(
    config={
        "model": "text-davinci-002",
        "params": {
            "frequency_penalty": 0.0,
            "max_tokens": 16,
            "presence_penalty": 0.0,
            "temperature": 0.5,
            "top_p": 1.0
        },
        "prompt": "This is a prompt template. {{input1}} {{input2}}",
        "prompt_inputs": ["input1", "input2"],
    },
    description="My first version",
    app_name="my_llm_app",
    version_id=UUID("a1b2c3d4-e5f6-4a3b-8c2d-1e2f3a4b5c6d"),
    version_number=1,
)
Parameters

version (Literal["prod", "test", "latest"]) – The version to get. Can be one of “prod”, “test”, or “latest”. When “latest” is used, the latest version will be returned, regardless of deployment status. When “prod” or “test” is used, the latest version that has been deployed to that environment will be returned.

Returns

A dataclass containing the configuration data, description, application name, version ID, and version number of the requested version. If no version is found, returns None.

Return type

Optional[VersionDetails]

log_llm_data(api_request: Dict, api_response: Dict, request_attributes: Optional[Dict] = None, response_attributes: Optional[Dict] = None, feedback: Optional[Dict] = None, selected_choice_index: int = 0, session_id: Optional[Union[str, uuid.UUID]] = None, tags: Optional[Dict] = None, version: Optional[int] = None, vendor: str = 'openai')#

Log request/response LLM completion pair into Gantry from several vendors. See documentation on supported vendors.

Example usage:

import gantry
from gantry.applications.llm_utils import fill_prompt
import openai
from openai.util import convert_to_dict

gantry.init()
my_llm_app = gantry.get_application("my-llm-app")

version = my_llm_app.get_version("test")
# "latest", "prod", or "test"
config = version.config
prompt = config['prompt']

def generate(values):
    filled_in_prompt = fill_prompt(prompt, values)
    request = {
        "model": "text-davinci-002",
        "prompt": filled_in_prompt,
    }
    results = openai.Completion.create(**request)

    my_llm_app.log_llm_data(
        api_request=request,
        api_response=convert_to_dict(results),
        request_attributes={"prompt_values": values},
        version=version.version_number,
    )

    return results
Parameters
  • api_request (dict) – The LLM completion request.

  • response (dict) – The LLM completion response.

  • request_attributes (Optional[dict]) – Additional inputs

  • response_attributes (Optional[dict]) – Additional outputs

  • feedback (Optional[dict]) – Optional feedback data. See https://docs.gantry.io/docs/logging-feedback-actuals for more information about logging feedback in Gantry.

  • selected_choice_index (Optional[int]) – The selected choice index. Defaults to 1.

  • session_id (Optional[str or uuid.UUID]) – Optional session to group data together.

  • tags (Optional[dict]) – Optional tags to add to the record.

  • version (Optional[int]) – The version number used.

  • vendor (str, defaults to 'openai') – The vendor the data is from. See Gantry documentation for supported vendors.

deploy_version(version_id: uuid.UUID, env: Literal['prod', 'test']) None#

Deploys a version in a specific environment.

Example usage:

import gantry

my_llm_app = gantry.get_application("my_llm_app")
version = my_llm_app.get_version("latest")
my_llm_app.deploy_version(version.version_id, "prod")
Parameters
  • version_id (uuid) – The version id.

  • env (str, 'prod' or 'test') – The target environment, can be ‘prod’ or ‘test’.

class gantry.Run(application, tags: Optional[Dict], name: str)#

Context manager for logging.

__init__(application, tags: Optional[Dict], name: str)#
merge_events()#

Merge prediction and feedback events into a single list of events.

gantry.log(application: str, version: Optional[Union[str, int]] = None, inputs: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, outputs: Optional[Union[Any, List[Any], dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, feedbacks: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, ignore_inputs: Optional[List[str]] = None, tags: Optional[Union[Dict[str, str], List[Dict[str, str]]]] = None, timestamps: Optional[Union[datetime.datetime, List[datetime.datetime], pandas.core.indexes.datetimes.DatetimeIndex, numpy.ndarray]] = None, sort_on_timestamp: bool = True, sample_rate: float = 1.0, row_tags: Optional[Union[Dict[str, str], List[Dict[str, str]], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, global_tags: Optional[Dict[str, str]] = None, join_keys: Optional[Union[str, List[str], pandas.core.series.Series]] = None, as_batch: Optional[bool] = False, run_id: Optional[str] = None, run_tags: Optional[dict] = None)#

Alias for gantry.logger.client.Gantry.log()

A general log function to log inputs, outputs, and feedbacks regardless of single or multiple records that is not attached to a specific application. This is intended for use primarily in contexts production where contexts where avoiding initializing an instance of the application is preferable.

Parameters
  • application (str) – Name of the application. Gantry validates and monitors data by function.

  • version (optional, Union[int, str]) – Version of the function schema to use for validation by Gantry. If not provided, Gantry will use the latest version. If the version doesn’t exist yet, Gantry will create it by auto-generating the schema based on data it has seen. Providing an int or its value stringified has no difference (e.g. version=10 will be the same as version=’10’).

  • inputs (Union[List[dict], pd.Dataframe]) – A list of prediction inputs. inputs[i] is a dict of the features for the i-th prediction to be logged.

  • outputs (Union[List[dict], pd.Dataframe]) – A list of prediction outputs. outputs[i] should be the application output for the prediction with features inputs[i].

  • feedbacks (Union[List[dict], pd.DataFrame]) – A list of feedbacks. feedbacks[i] is a dict of the features for the i-th prediction to be logged.

  • ignore_inputs (optional, List[str]) – A list of names of input features that should not be monitored.

  • timestamps (optional, Union[List[datetime.datetime], pd.DatetimeIndex, np.array[datetime.datetime]) – A list of prediction timestamps. If specified, timestamps[i] should be the timestamps for the i-th prediction. If timestamps = None (default), then the prediction timestamp defaults to the time when log_records is called.

  • sort_on_timestamp (bool, defaults to True) – Works when timestamps are provided. Sort using the given timestamp. Default to True.

  • sample_rate – Used for down-sampling. The probability as a float that each record will be sent to Gantry.

  • as_batch (bool, defaults to False) – Whether to add batch metadata and tracking in the ‘batch’ section of the dashboard

  • tags (optional, Optional[Union[Dict[str, str], List[Dict[str, str]]]) – A tag is a label that you assign to your data. E.g. you can specify which environment the data belongs to by setting “env” tag like this tags = {“env”: “environment1”} if not assigned we will use Gantry client’s environment value as the defaults environment. If this parameter is a dict, tags will apply to all records. Alternatively, you can pass a list of dicts to apply tags to each record independantly. IMPORTANT: this parameter is in deprecation mode and it will be removed soon. Use row_tags and global_tags instead.

  • join_keys (optional, Union[List[str], pd.Series[str]]) – provide keys to identify each record. These keys can be used later to provide feedback. If not provided, a random record key will be generated for each record.

  • row_tags (optional, Optional[Union[List[Dict[str, str]], pd.DataFrame, pd.Series, np.ndarray]]) –

    Specify row level tags. If provided, this parameter should be a list of tags to be applied to each of the records. row_tags[i] will contain a dictionary of strings containing the tags to attach to the i-th record. Alternatively, tags may be specified by passing in a DataFrame, Series, or Array, like inputs.

    For batch global tags, use the ‘global_tags’ parameter instead.

  • global_tags (optional, Dict[str, str]) – Specify a set of tags that will be attached to all ingested records from this batch. For record specific tags, you can use ‘row_tags’ param. Only used when log is not called within a run.

  • run_id (optional, str) – This should never be provided by user. It will be populated automatically when logging within a run to group records together.

  • run_tags (optional, Dict[str, str]) – This should never be provided by user. It will be populated automatically when logging within a run to provide global tags for all records in the run.

  • Returns

  • Tuple[batch_id – The batch_id will be None if records are not

  • list[join_keys]] – The batch_id will be None if records are not

  • keys. (logged as batch. The list of join_keys will be the records) –

gantry.log_from_data_connector(application: str, source_data_connector_name: str, timestamp: Optional[str] = None, inputs: Optional[List[str]] = None, outputs: Optional[List[str]] = None, feedbacks: Optional[List[str]] = None, join_key: Optional[str] = None, row_tags: Optional[List[str]] = None, global_tags: Optional[Dict[str, str]] = None, schedule: Optional[gantry.logger.types.Schedule] = None, pipeline_name: Optional[str] = None) gantry.logger.types.IngestFromDataConnectorResponse#

Alias for gantry.logger.client.Gantry.log_from_data_connector()

Function to ingest source tabular records from a registered source data connector into Gantry.

To log predictions using this function, both column names of inputs and outputs must be passed. To log feedback using this function, both column names of join_key and feedback must be passed.

Example:

# Record an application's predictions.
gantry.log_from_data_connector(
    application="foobar",
    source_data_connector_name="my_snowflake_connector",
    inputs=["column_A", "column_B"],
    outputs=["column_C"],
    timestamp="column_T",
    global_tags = {"env":"dev", "version": "1.0.0"},
    row_tags=["column_D"],
    join_key="column_E",
)

# Record an application's feedbacks.
# to a previously ingested prediction.
gantry.log_from_data_connector(
    application="foobar",
    source_data_connector_name="my_snowflake_connector",
    feedbacks=["column_E", "column_F"],
    timestamp="column_T",
    global_tags = {"env":"dev", "version": "1.0.0"},
    row_tags=["column_D"],
    join_key="column_E",
)

# Trigger scheduled ingestion every 8 hours from a data connector incrementally.
from gantry.logger.types import (
    Schedule,
    ScheduleFrequency,
    ScheduleType,
    ScheduleOptions,
)

gantry.log_from_data_connector(
    application="foobar",
    source_data_connector_name="my_snowflake_connector",
    inputs=["column_A", "column_B"],
    outputs=["column_C"],
    timestamp="column_T",
    global_tags = {"env":"dev", "version": "1.0.0"},
    row_tags=["column_D"],
    join_key="column_E",
    schedule=Schedule(
        start_on="2023-01-14T17:00:00.000000",
        frequency=ScheduleFrequency.EVERY_8_HOURS,
        type=ScheduleType.INCREMENTAL_APPEND,
        options=ScheduleOptions(watermark_key="column_T"),
    )
)

# If the data are expected to arrive late in your source table/view, use delay_time
# to specify how long to wait before triggering the scheduled ingestion.
gantry.log_from_data_connector(
    application="foobar",
    source_data_connector_name="my_snowflake_connector",
    inputs=["column_A", "column_B"],
    outputs=["column_C"],
    timestamp="column_T",
    global_tags = {"env":"dev", "version": "1.0.0"},
    row_tags=["column_D"],
    join_key="column_E",
    schedule=Schedule(
        start_on="2023-01-14T17:00:00.000000",
        frequency=ScheduleFrequency.EVERY_8_HOURS,
        type=ScheduleType.INCREMENTAL_APPEND,
        options=ScheduleOptions(
            delay_time=300, # Delay 5 minutes before triggering ingestion.
            watermark_key="column_T"
        ),
    )
)
Parameters
  • application (str) – Name of the application to ingest data into

  • source_data_connector_name (str) – Name of the registered source data connector to ingest data from.

  • timestamp (optional, str) – by default, the timestamp values will be filled with the timestamp at ingestion. Set this parameter to reference a different column.

  • inputs (optional, list[str]) – A list of column names for inputs.

  • outputs (optional, list[str]) – A list ofcolumn names for outputs.

  • feedbacks (optional, list[str]) – A list of column names for feedbacks.

  • join_key (optional, str) – A column name to use as the join key to identify each record.

  • row_tags (optional, list[str]) – A list of column names to use the values of as tags for each row.

  • global_tags (optional, list[dict[str, str]]) – A list of dictionaries of string key and value pairs to tag the entire ingestion from this data connector

  • schedule (optional, Schedule) – An optional parameter to schedule the ingestion.

  • pipeline_name (optional, str) – An optional parameter to specify the ingestion pipeline’s name

gantry.ping() bool#

Alias for gantry.logger.client.Gantry.ping()

Pings the log store API server to check if it is alive. Returns True if alive, False if there is an error during ping process.

gantry.ready() bool#

Alias for gantry.logger.client.Gantry.ready()

Checks if the configured API key authenticates with the API. Returns True if ready, False otherwise.

class gantry.JoinKey#
classmethod from_dict(dict_: Dict) str#

Utility function to retrieve a deterministic value from a dictionary. Intended to be used for setting join_key when logging to Gantry.

Parameters

dict (Dict) – The dictionary from which the join key should be generated

Returns

The generated join key

Return type

str

gantry.create_application(application_name: str, application_type: Optional[str] = None) gantry.applications.core.Application#

Alias for gantry.applications.client.ApplicationClient.create_application()

Create an Application object by name and type. If the application already exists, an exception will be raised. If your organization does not allow you to create an application of a certain type, there will be an exception raised as well.

Example usage:

import gantry
application = gantry.create_application("demo", "Custom")
Parameters
  • application_name (str) – the name of the application.

  • application_type (str, optional) – the type of the application. Defaults to Custom.

  • type (You have the following options for application) – Custom Chat Completion

Returns

An object representing the application.

Return type

gantry.Application

gantry.get_application(application_name: str) gantry.applications.core.Application#

Alias for gantry.applications.client.ApplicationClient.get_application()

Get an Application object by name.

Example usage:

import gantry
application = gantry.get_application("demo")
Parameters

application_name (str) – the name of the application.

Returns

An object representing the application.

Return type

gantry.Application

gantry.delete_application(application_name: str) None#

Alias for gantry.applications.client.ApplicationClient.delete_application()

Hard delete an Application.

Example usage:

import gantry
application = gantry.delete_application("demo")
Parameters

application_name (str) – the name of the application.

Returns

None

gantry.archive_application(application_name: str) None#

Alias for gantry.applications.client.ApplicationClient.archive_application()

Archive an Application.

Example usage:

import gantry
application = gantry.archive_application("demo")
Parameters

application_name (str) – the name of the application.

Returns

None

Automations#

An automation consists of a trigger and an action.

class gantry.automations.Automation(name: str, trigger: gantry.automations.triggers.triggers.Trigger, action: gantry.automations.actions.actions.Action, api_client: Optional[gantry.api_client.APIClient] = None, application: Optional[str] = None)#

Automation is the process of automatically executing tasks. It consists of 2 main components: Trigger and Action.

Trigger is the condition that, when met, cause the Action to be executed.

Currently, we are supporting:

  • IntervalTrigger: based on time intervals & delayss

  • AggregationTrigger: same idea of Alert on the UI. It tracks if an aggregation is outside

of the range for the evaluation window and triggers when conditions are met.

  • SendSlackMessage (Action): This action sets up a notification setting that will send

users a slack notification when executed by the trigger.

  • Curator (Action): This action creates a new curator.

  • Note:
    • AggregationTrigger can only be paired with SendSlackMessage

    • IntervalTrigger can only be paired with Curator

To setup and start an automation process, there are a few steps:

  • Define the trigger
    from gantry.automations.triggers import AggregationTrigger
    from gantry.query.time_window import RelativeTimeWindow
    
    alert_trigger = AggregationTrigger(
        name = "alert-trigger",
        aggregation = "maximum",
        fields = ["inputs.speed"],
        lower_bound = 1,
        upper_bound = 5,
        evaluation_window=RelativeTimeWindow(
            window_length=datetime.timedelta(minutes=10)
        ),
        tags = None
    )
    
  • Define the action
    from gantry.automations.actions import SendSlackMessage
    # Set up action to send slack message to the specified webhook.
    slack_action = SendSlackMessage(
        name = "demo-slack",
        webhook_url="SLACK_WEBHOOK_URL"
    )
    
  • Define the automation and put trigger & action together
    from gantry.automations.automations import Automation
    # Define automation object and put trigger and action together.
    automation_alert = Automation(
        name = "alert-automation",
        trigger=alert_trigger,
        action=slack_action
    )
    
  • Add the automation to an application
    app.add_automation(automation_alert)
    
  • When you are done, stop the automation process
    automation_alert.stop()
    
__init__(name: str, trigger: gantry.automations.triggers.triggers.Trigger, action: gantry.automations.actions.actions.Action, api_client: Optional[gantry.api_client.APIClient] = None, application: Optional[str] = None)#

Initialize an automation object.

Parameters
stop()#

Stop the automation process and delete all relevant actions.

Triggers#

A way of broadcasting certain events that take place within Gantry, such as data threshold breach.

class gantry.automations.triggers.AggregationTrigger(name: str, aggregation: gantry.alerts.client.AlertsAggregation, fields: List[str], lower_bound: float, upper_bound: float, evaluation_window: gantry.query.time_window.RelativeTimeWindow, tags: Optional[Dict] = None)#
__init__(name: str, aggregation: gantry.alerts.client.AlertsAggregation, fields: List[str], lower_bound: float, upper_bound: float, evaluation_window: gantry.query.time_window.RelativeTimeWindow, tags: Optional[Dict] = None)#

Initialize an aggregation trigger.

Parameters
  • name (str) – the name of the trigger.

  • aggregation (AlertsAggregation) – the aggregation of the trigger.

  • fields (List[str]) – the fields of the trigger.

  • lower_bound (float) – the lower bound of the field.

  • upper_bound (float) – the upper bound of the field.

  • evaluation_window (gantry.query.time_window.RelativeTimeWindow) – evaluation window for the trigger.

  • tags (Optional[Dict]) – the tags of the trigger. Defaults to None.

class gantry.automations.triggers.IntervalTrigger(start_on: datetime.datetime, interval: datetime.timedelta, delay: datetime.timedelta = datetime.timedelta(0))#
__init__(start_on: datetime.datetime, interval: datetime.timedelta, delay: datetime.timedelta = datetime.timedelta(0))#

Initialize an interval trigger.

Parameters
  • start_on (datetime.datetime) – the start time of the interval.

  • interval (datetime.timedelta) – the interval of the trigger.

  • delay (datetime.timedelta) – the delay of the trigger. Defaults to 0.

class gantry.automations.triggers.QueriesAggregationTrigger(name: str, compare_fields: List[str], compare_aggregation: gantry.alerts.client.AlertsAggregation, trigger_range_type: str, queries: List[gantry.query.core.dataframe.GantryDataFrame], query_aggregation: List[gantry.alerts.client.AlertsAggregation], lower_bound: Optional[float] = None, upper_bound: Optional[float] = None)#
__init__(name: str, compare_fields: List[str], compare_aggregation: gantry.alerts.client.AlertsAggregation, trigger_range_type: str, queries: List[gantry.query.core.dataframe.GantryDataFrame], query_aggregation: List[gantry.alerts.client.AlertsAggregation], lower_bound: Optional[float] = None, upper_bound: Optional[float] = None)#

Initialize a trigger based on multiple queries. As of now, we only support trigger based on 2 queries.

Example:

Say I want to setup a trigger that:

  • Calculate max(inputs.latitude) of query1 and query2.

  • Calculate the difference between the 2 max values, as in:

  • max(inputs.latitude) of query1 - max(inputs.latitude) of query2.

  • Trigger an alert if the difference is within a range [0.1, 1.0].

first_query = app.get_query("first_query")
second_query = app.get_query("second_query")

query_trigger_alert = QueriesAggregationTrigger(
    name = "queries-trigger",
    compare_aggregation = AlertsAggregation.TOTAL_DIFFERENCE,
    compare_fields = ["inputs.latitude"],
    queries = [first_query, second_query],
    query_aggregation=[AlertsAggregation.MAX, AlertsAggregation.MAX],
    lower_bound = 0.1,
    upper_bound = 1.0,
trigger_range_type="within",
)
Parameters
  • name (str) – the name of the trigger.

  • compare_fields – (List[str]) The fields to calculate aggregated difference on between queries. As of now, we only support 1 field.

  • compare_aggregation (AlertsAggregation) – the aggregation of the difference between aggregated query values.

  • lower_bound (float) – the lower bound (inclusive) of the aggregated difference.

  • upper_bound (float) – the upper bound (inclusive) of the aggregated difference.

  • trigger_range_type (str) –

    the type of the trigger range. The options are:

    ”within”: triggered when aggregated difference is within the bounds

    ”outside”: triggered when aggregated difference is outside the bounds

    ”above”: triggered when aggregated difference is above the upper bound

    ”below”: triggered when aggregated difference is below the lower bound

  • queries (List[GantryDataFrame]) – the list of queries (in order) to compare.

  • query_aggregation (List[AlertsAggregation]) – the aggregation of the queries.

class gantry.automations.triggers.Trigger#

Parent class for all triggers.

__init__()#

Actions#

Actions that can be carried out when triggers fire, including Slack notifications.

class gantry.automations.actions.Action#

Parent class for all actions.

class gantry.automations.actions.SendSlackMessage(name: str, webhook_url: str, id: Optional[str] = None, notify_daily: bool = False, alert_id: Optional[str] = None)#
__init__(name: str, webhook_url: str, id: Optional[str] = None, notify_daily: bool = False, alert_id: Optional[str] = None)#

Initialize a slack notification action.

Parameters
  • name (str) – the name of the slack notification.

  • webhook_url (str) – the webhook url of the slack channel.

  • notify_daily (bool) – If true, send notification daily. Otherwise, send immediately.

Curators#

Curators are a type of automation for collecting data from your production data stream.

gantry.automations.curators.get_curator(name: str) Curator#

Alias for gantry.automations.curators.curators.CuratorClient.get_curator()

Get a curator by name. If the curator does not exist, an exception will be raised.

Parameters

name (str) – the name of the curator to be retrieved.

Returns

A Curator object representing the curator corresponding to the provided name.

Return type

gantry.automations.curators.Curator

gantry.automations.curators.get_all_curators(application_name: Optional[str] = None) List[Curator]#

Alias for gantry.automations.curators.curators.CuratorClient.get_all_curators()

Returns a list of the curators in the organization if no application_name is passed, and all the curators associated with the provided application if a name is passed.

Parameters

application_name (str, optional) – defaults to None, application must exist.

Returns

A list of the curators either in the organization or for a specific application.

Return type

List[Curator]

gantry.automations.curators.list_curators(application_name: Optional[str] = None) List[str]#

Alias for gantry.automations.curators.curators.CuratorClient.list_curators()

Returns a list of the names of all the curators in the organization if no application_name is passed, and all the curators associated with the provided application if a name is passed.

Parameters

application_name (str, optional) – defaults to None, application must exist.

Returns

A list of the curators either in the organization or for a specific application.

Return type

List[str]

class gantry.automations.curators.Curator(name: str, application_name: str, api_client: Optional[gantry.api_client.APIClient] = None, id: Optional[uuid.UUID] = None, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curation_delay: datetime.timedelta = datetime.timedelta(0), curate_past_intervals: bool = True, created_at: Optional[datetime.datetime] = None, selectors: Optional[List[gantry.automations.curators.selectors.Selector]] = None)#

A class representing a user defined curator, where the user provides the selection criteria using a list of selectors, as well as other metadata needed to create and manage the creation jobs.

__init__(name: str, application_name: str, api_client: Optional[gantry.api_client.APIClient] = None, id: Optional[uuid.UUID] = None, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curation_delay: datetime.timedelta = datetime.timedelta(0), curate_past_intervals: bool = True, created_at: Optional[datetime.datetime] = None, selectors: Optional[List[gantry.automations.curators.selectors.Selector]] = None)#

A Curator defines a process that runs periodically to curate a dataset. It uses selectors to select rows from the dataset and write them to a curated dataset. The curated dataset is a new dataset that is created by the curator. The curator will create the curated dataset if it does not already exist, or update it if it does exist.f

Instantiating a curator does not create the curator in Gantry. To create the curator, call Curator.create after instantiating the curator. Curators that have been created in Gantry can be instantiated as a Curator object using Curator.from_curator_info, although it is recommended to use the module-level get_curator function instead.

Curators can be updated using Curator.update. This will update the curator in Gantry and also update the curator object in memory.

Curators can be deleted using Curator.delete. This will delete the curator in Gantry but will not delete the curator object in memory.

Parameters
  • api_client (Optional[APIClient], optional) – APIClient to use. Defaults to None, in which case the Curator will use the global APIClient.

  • id (Optional[uuid.UUID], optional) – The id of the curator. Defaults to None. The id only exists in Gantry for curators that have been created. Typically, you will not need to set this. But some methods, such as Curator.from_curator_info, will set this for you.

  • str (name) – The name of the curator.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset. Defaults to None. If None, the curated dataset name will be the same as the curator name.

  • application_name (str) – The name of the application that the curator is running in.

  • start_on (Optional[datetime.datetime], optional) – The time at which the curator should start curating. Defaults to None. If None, the curator will start curating immediately, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval at which the curator should curate. Defaults to datetime.timedelta(days=1).

  • curate_past_intervals (bool, optional) – Whether the curator should curate past intervals. Defaults to True. If True, the curator will immediately curate all intervals that have passed since the start_on time.

  • created_at (Optional[datetime.datetime], optional) – The time at which the curator was created. Defaults to None. The created_at time only exists in Gantry for curators that have been created. Typically, you will not need to set this. But some methods, such as Curator.from_curator_info, will set this for you.

  • selectors (Optional[List[Selector]], optional) – The selectors that the curator should use to curate. Defaults to None. If None, the request to create the curator will fail.

property enabled: bool#

Enabling or disabling must be done via the .enable() or .disable() methods

update_curation_start_on(start_on: datetime.datetime) None#

Update the curation start on time. :param start_on: The new start on time :type start_on: datetime.datetime

update_curation_interval(curation_interval: datetime.timedelta) None#

Update the curation interval. :param curation_interval: The new curation interval :type curation_interval: datetime.timedelta

update_curation_delay(curation_delay: datetime.timedelta) None#

Update the curation delay. :param curation_delay: The new curation delay :type curation_delay: datetime.timedelta

start() None#

Trigger to create the curator. This function is called from Automation.

stop() None#

Trigger to delete the curator. This function is called from Automation.

create(enable=True) gantry.automations.curators.curators.Curator#

Creates the Curator. By default, the Curator is also enabled upon creation. Use the enable parameter to change this

Once created, the Curator will start curating the dataset according to the curation_interval. If curate_past_intervals is set to True, it will curate all past intervals and continue curating in the future. The Curator will exist in Gantry until it is deleted.

The results of the curation will be stored in a dataset with the name curated_dataset_name. If the dataset does not exist, it will be created. If it does exist, it will be appended to. Curation commits will be made by the user with the api key used when initializing gantry services.

The curated dataset can be accessed using the get_curated_dataset method..

Parameters

enable (Optional[bool]) – Whether to automatically enable the curator after creation. Defaults to True.

Returns

The created curator.

Return type

Curator

update(new_curator_name: Optional[str] = None, new_curated_dataset_name: Optional[str] = None, new_curation_interval: Optional[datetime.timedelta] = None, new_selectors: Optional[List[gantry.automations.curators.selectors.Selector]] = None, create_new_dataset: bool = False) gantry.automations.curators.curators.Curator#

Updates the curator. At least one of the update parameters must be provided. If a parameter is not provided, it will not be updated.

Parameters
  • new_curator_name (Optional[str], optional) – The new name of the curator. Defaults to None.

  • new_curated_dataset_name (Optional[str], optional) – The name of the new dataset to curate to. Defaults to None.

  • new_curation_interval (Optional[datetime.timedelta], optional) – The new curation interval. Defaults to None.

  • new_selectors (Optional[List[Selector]], optional) – The new selectors. Defaults to None.

  • create_new_dataset (bool, optional) – Whether to create a new dataset if the requested name does not correspond to an existing dataset. Defaults to False.

Raises
  • ValueError – If curator ID is not set.

  • ValueError – If no update parameters are provided.

Returns

The updated curator.

Return type

Curator

delete() str#

Deletes the curator using the curator id. The Curator object is not deleted, but the curator id is reset to None, as well as the created_at time.

Raises

ValueError – If curator id is not set.

Returns

A message indicating the curator was deleted. The message also contains the

time the curator was deleted.

Return type

str

get_curated_dataset() gantry.dataset.gantry_dataset.GantryDataset#

Gets the curated dataset associated with the Curator.

Returns

A GantryDataset object representing the curated dataset.

Return type

GantryDataset

enable() str#

Enable Curator - Gantry will resume curating data using this Curator. Intervals that have elapsed since the curator was disabled will be backfilled.

disable() str#

Disable Curator - Gantry will stop curating data using this Curator.

class gantry.automations.curators.AscendedSortCurator(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that sorts the application field specified by sort_field in ascending order and adds up to the top limit results to curated_dataset_name every curation_interval.

__init__(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • sort_field (str) – The field to sort.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.BalancedStratificationCurator(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that builds a balanced stratification of application records according to the field stratify_field and adds up to limit results to curated_dataset_name.

Currently, this curator only supports stratification of categorical fields. And the limit per stratum is determineed by the stratum with the fewest records in the current curation_interval.

__init__(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • stratify_field (str) – The field to stratify.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.BoundedRangeCurator(name: str, application_name: str, limit: int, bound_field: str, upper_bound: float, lower_bound: float, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True, sort_ascending: bool = True)#

Stock Curator that strictly bounds the application field specified by bound_field between upper_bound and lower_bound and adds up to limit results to curated_dataset_name every curation_interval. If the curation returns more results than limit, the results are sorted in ascending order by bound_field when sort_ascending is True (default) and in descending order when sort_ascending is False.

__init__(name: str, application_name: str, limit: int, bound_field: str, upper_bound: float, lower_bound: float, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True, sort_ascending: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • bound_field (str) – The field to bound.

  • upper_bound (float) – Upper bound on the field.

  • lower_bound (float) – Lower bound on the field.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

  • sort_ascending (bool, optional) – Whether or not to sort the results in ascending order. Defaults to True.

class gantry.automations.curators.DescendedSortCurator(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that sorts the application field specified by sort_field in descending order and adds up to the top limit results to curated_dataset_name every curation_interval.

__init__(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • sort_field (str) – The field to sort.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.NewestCurator(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that sorts the application records in descending time-order according to the internally logged Gantry timestamp, __TIME, and adds up to the top limit results to curated_dataset_name every curation_interval.

__init__(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.OldestCurator(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that sorts the application records in ascending time-order according to the internally logged Gantry timestamp, __TIME, and adds up to the top limit results to curated_dataset_name every curation_interval.

__init__(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.ProportionalStratificationCurator(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that builds a proportional stratification of application records according to the field stratify_field and adds up to limit results to curated_dataset_name.

The proportion of records in each stratum is respected, but the limit per curation may be violated, unlike the StrictStratificationCurator, which will respect the limit at the (potential) cost of violating the proportion of records in each stratum.

Additionally, it is worth noting that proportions are rounded to even numbers, so 1.5 will be rounded to 2, and 0.5 will be rounded to 0.

Example

Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.

If the limit is 10, then the curator will return 11 records, with 5 records with the value “A” for the field, 2 records with the value “B” for the field, 2 records with the value “C” for the field, 2 records with the value “D” for the field, and 0 records with the value “E”.

__init__(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • stratify_field (str) – The field to stratify.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.StrictStratificationCurator(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that builds a strict stratification of application records according to the field stratify_field and adds up to limit results to curated_dataset_name.

The limit of records per curation is respected, but the proportion of records in each stratum may be violated, unlike the ProportionalStratificationCurator, which will respect the proportion at the (potential) cost of violating the limit per curation.

Additionally, it is worth noting that the number of records to be added to each stratum is calulated exactly, leading to poentially fractional assignments. The integer part of the assignment is taken, and the total remainder is added, up to the limit, randomly.

Example

Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.

If the limit is 10, then the curator will return 10 records, with 5 records with the value “A” for the field, 1 record with “B” for the field, 1 record with “C” for the field, 1 record with “D” for the field, 0 records with “E” for the field, and the remaining 2 records will be randomly selected from the remaining records.

__init__(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • stratify_field (str) – The field to stratify.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

class gantry.automations.curators.UniformCurator(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#

Stock Curator that uniformly samples application records and adds up to the top limit results to curated_dataset_name every curation_interval.

__init__(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Parameters
  • name (str) – The name of the curator.

  • application_name (str) – The name of the application that the curator is running in.

  • limit (int) – The maximum number of results to return.

  • curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.

  • start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.

  • curation_interval (datetime.timedelta, optional) – The interval of time considered

  • datetime.timedelta (during each run of the curator. Defaults to) –

  • curate_past_intervals (bool, optional) – Whether or not to curate past intervals when

  • True. (start_on is in the past. Defaults to) –

Selectors and Filters#

A simple API for telling curators which data to sample when they run.

class gantry.automations.curators.selectors.OrderingDirection(value)#

Direction specifiying how to order a sort.

ASCENDING = 'ascending'#
DESCENDING = 'descending'#
class gantry.automations.curators.selectors.SamplingMethod(value)#

Sampling method to use when selecting a subset of data.

Selected data will be sampled using one of the methods enumerated here.

uniform = 'uniform'#
ordered = 'ordered'#
stratified = 'stratified'#
class gantry.automations.curators.selectors.StratificationMode(value)#

Stratification mode to use when selecting a subset of data. Currently, stratification is only supported for categorical data.

Selected data will be stratified using one of the modes enumerated here.

In “strict” mode, the limit of records per curation is respected, but the proportion of records in each stratum may be violated, unlike the ProportionalStratificationCurator, which will respect the proportion at the (potential) cost of violating the limit per curation.

Additionally, it is worth noting that the number of records to be added to each stratum is calulated exactly, leading to poentially fractional assignments. The integer part of the assignment is taken, and the total remainder is added, up to the limit, randomly.

Example

Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.

If the limit is 10, then the curator will return 10 records, with 5 records with the value “A” for the field, 1 record with “B” for the field, 1 record with “C” for the field, 1 record with “D” for the field, 0 records with “E” for the field, and the remaining 2 records will be randomly selected from the remaining records.

In “proportional” mode, the proportion of records in each stratum is respected, but the limit per curation may be violated, unlike the StrictStratificationCurator, which will respect the limit at the (potential) cost of violating the proportion of records in each stratum.

Additionally, it is worth noting that proportions are rounded to even numbers, so 1.5 will be rounded to 2, and 0.5 will be rounded to 0.

Example

Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.

If the limit is 10, then the curator will return 11 records, with 5 records with the value “A” for the field, 2 records with the value “B” for the field, 2 records with the value “C” for the field, 2 records with the value “D” for the field, and 0 records with the value “E”.

In “balanced” mode, the number of records per stratum is balanced with respect to the least represented stratum.

strict = 'strict'#
proportional = 'proportional'#
balanced = 'balanced'#
class gantry.automations.curators.selectors.TagFilter(*, name: str, value: str)#

Filter for tags

name: str#
value: str#
class gantry.automations.curators.selectors.Filter(*, field: str)#

Base class for filters. All filters must have a field.

field: str#
class gantry.automations.curators.selectors.BoundsFilter(*, field: str, upper: Optional[Union[float, int]] = None, inclusive_upper: Optional[Union[float, int]] = None, lower: Optional[Union[float, int]] = None, inclusive_lower: Optional[Union[float, int]] = None)#

Filter for bounds. Must have either an upper or lower bound; both can be specified. Default bounds are exclusive, but inclusive bounds can be given instead.

upper: Optional[Union[float, int]]#
inclusive_upper: Optional[Union[float, int]]#
lower: Optional[Union[float, int]]#
inclusive_lower: Optional[Union[float, int]]#
classmethod validate_bounds(values)#
field: str#
class gantry.automations.curators.selectors.EqualsFilter(*, field: str, equals: Union[bool, float, int, str])#

Filter for equality. Must have an equals field.

The equals field can be a boolean, string, float, or int.

equals: Union[bool, float, int, str]#
field: str#
class gantry.automations.curators.selectors.ContainsFilter(*, field: str, contains: str)#

Filter for string containment. The specified field must be a string.

contains: str#
field: str#
class gantry.automations.curators.selectors.Sampler#

Base class for samplers.

class gantry.automations.curators.selectors.UniformSampler(*, sample: typing.Literal[<SamplingMethod.uniform: 'uniform'>] = SamplingMethod.uniform)#

Sampler for uniform sampling.

This sampler will select a subset of the data uniformly at random.

sample: Literal[<SamplingMethod.uniform: 'uniform'>]#
class gantry.automations.curators.selectors.OrderedSampler(*, sample: typing.Literal[<SamplingMethod.ordered: 'ordered'>] = SamplingMethod.ordered, field: str, sort: typing.Literal[<OrderingDirection.ASCENDING: 'ascending'>, <OrderingDirection.DESCENDING: 'descending'>])#

Sampler for ordered sampling.

This sampler will select a subset of the data ordered by the specified field. Fields can be ordered in ascending or descending order.

sample: Literal[<SamplingMethod.ordered: 'ordered'>]#
field: str#
sort: Literal[<OrderingDirection.ASCENDING: 'ascending'>, <OrderingDirection.DESCENDING: 'descending'>]#
class gantry.automations.curators.selectors.StratifiedSampler(*, field: str, sample: typing.Literal[<SamplingMethod.stratified: 'stratified'>] = SamplingMethod.stratified, mode: typing.Literal[<StratificationMode.strict: 'strict'>, <StratificationMode.proportional: 'proportional'>, <StratificationMode.balanced: 'balanced'>] = StratificationMode.proportional)#

Sampler for stratified sampling.

This sampler will select a subset of the data stratified by the specified field. Stratification is only supported for categorical data. The mode specifies how to stratify the data. See the documentation for StratificationMode for more details.

field: str#
sample: Literal[<SamplingMethod.stratified: 'stratified'>]#
mode: Literal[<StratificationMode.strict: 'strict'>, <StratificationMode.proportional: 'proportional'>, <StratificationMode.balanced: 'balanced'>]#
class gantry.automations.curators.selectors.Selector(*, method: typing.Union[gantry.automations.curators.selectors.OrderedSampler, gantry.automations.curators.selectors.UniformSampler, gantry.automations.curators.selectors.StratifiedSampler] = OrderedSampler(sample=<SamplingMethod.ordered: 'ordered'>, field='__time', sort=<OrderingDirection.DESCENDING: 'descending'>), limit: int, filters: typing.List[typing.Union[gantry.automations.curators.selectors.BoundsFilter, gantry.automations.curators.selectors.EqualsFilter, gantry.automations.curators.selectors.ContainsFilter]] = [], tags: typing.List[gantry.automations.curators.selectors.TagFilter] = [])#

Base class for selectors. All selectors must have a method and a limit. The method specifies how to select a subset of the data, and the limit specifies the number of records to select. The filters field specifies any filters to apply to the data before selecting a subset.

method: Union[gantry.automations.curators.selectors.OrderedSampler, gantry.automations.curators.selectors.UniformSampler, gantry.automations.curators.selectors.StratifiedSampler]#
limit: int#
filters: List[Union[gantry.automations.curators.selectors.BoundsFilter, gantry.automations.curators.selectors.EqualsFilter, gantry.automations.curators.selectors.ContainsFilter]]#
tags: List[gantry.automations.curators.selectors.TagFilter]#
classmethod validate_limit(value)#

Alerts Aggregations#

class gantry.alerts.client.AlertsAggregation(value)#

Allowed aggregations for Gantry Alerts.

The full list of aggregations are:

TOTAL_DIFFERENCE
PERCENT_DIFFERENCE
TOTAL
MAX
MIN
MEAN
STD
PDF
QUANTILES
PERCENT_NULL
PERCENT_TRUE
PERCENT_FALSE
PERCENT_TRUE_NOT_NULL
PERCENT_FALSE_NOT_NULL
CATEGORY_PERCENTS
ACCURACY
MAE
MSE
MAPE
MAX_ERROR
CONFUSION_MATRIX
F1
R2
PRECISION
RECALL
ROC_AUC_SCORE
PR_AUC_SCORE
D1
DINF
KL
KS
CATEGORICAL_D1
CATEGORICAL_DINF
CATEGORICAL_KL
X2
ROC_CURVE_PLOT
PR_CURVE_PLOT
DISTRIBUTION_SKETCH
CATEGORICAL_SKETCH

DataFrame and Series#

Learn about Gantry’s abstractions for querying data from your production data stream.

class gantry.query.core.dataframe.GantryDataFrame(api_client: gantry.api_client.APIClient, application: str, start_time: Union[str, datetime.datetime], end_time: Union[str, datetime.datetime], version: Optional[Union[str, int]] = None, env: Optional[str] = None, filters: Optional[List[dict]] = None, tags: Optional[dict] = None, relative_time_window: Optional[datetime.timedelta] = None, relative_time_delay: Optional[datetime.timedelta] = None)#

Two-dimensional dict that supports querying Gantry data.

property dtypes: pandas.core.series.Series#

Return the column types

head(n: int = 5) pandas.core.frame.DataFrame#

Fetch the top n entries. Results will include an additional column, join_id, whose values can be used to log and join feedback events to your Gantry application.

Parameters

n (int, defaults to 5) – Number of entries to fetch

Returns

A pandas dataframe object.

tail(n: int = 5) pandas.core.frame.DataFrame#

Fetch the last n entries. Results will include an additional column, join_id, whose values can be used to log and join feedback events to your Gantry application.

Parameters

n (int, defaults to 5) – Number of entries to fetch

Returns

A pandas dataframe object.

fetch() pandas.core.frame.DataFrame#

Fetch all entries. Results will include an additional column, join_id, whose values can be used to log and join feedback events to your Gantry application.

Returns

A pandas dataframe object.

describe() pandas.core.frame.DataFrame#

Print basic stats on the dataframe.

class gantry.query.core.dataframe.GantrySeries(name: str, id: str, datatype: str, series_type: str, parent_dataframe: gantry.query.core.dataframe.GantryDataFrame)#

Gantry Series object, similar to Pandas Series.

Operations on Gantry Series return objects that can be used as masks for Gantry Series or Gantry Dataframes.

# Operations on series return masks
mask = some_series.contains("apples")
apples_series = some_series[mask]

apples_series.fetch()
isin(other: List[str])#

For string Series, whether an entry is in a list of strings.

Parameters

other (list[str]) – the list of strings

contains(other: str)#

For string Series, whether an entry contains a string

Parameters

other (str) – string to compare

notnull()#

Filters out null values.

notna()#

Alias for notnull

isnull()#

Filters out non null values (ie, null values remain)

isna()#

Alias for isnull

head(n: int = 5) pandas.core.frame.DataFrame#

Fetch the top n entries

Parameters

n (int, defaults to 5) – Number of entries to fetch

Returns

A pandas dataframe object.

tail(n: int = 5) pandas.core.frame.DataFrame#

Fetch the last n entries

Parameters

n (int, defaults to 5) – Number of entries to fetch

Returns

A pandas dataframe object.

fetch() pandas.core.frame.DataFrame#

Fetch all entries

Returns

A pandas dataframe object.

mean(num_points: int = 1, group_by: Optional[str] = None) Union[float, pandas.core.frame.DataFrame]#

Runs on int and float series only. Get the computed average of this GantrySeries, if available

Parameters
  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with mean if num_points=1 and group_by=None (the default values), else a pd.DataFrame.

std(num_points: int = 1, group_by: Optional[str] = None) Union[float, pandas.core.frame.DataFrame]#

Runs on int and float series only. Get the standard deviation for this GantrySeries, if available

Parameters
  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with std if num_points=1 and group_by=None (the default values), else a pd.DataFrame.

median(num_points: int = 1) Union[float, pandas.core.frame.DataFrame]#

Runs on numeric series only. Get the median for this GantrySeries, if available

Parameters

num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

Float with median in case num_points=1 (the default value), else a pd.DataFrame.

count(num_points: int = 1, group_by: Optional[str] = None, limit: int = 20) Union[float, pandas.core.frame.DataFrame]#

Get the number of available data points for this GantrySeries, if available. Works on all series types.

Parameters
  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

  • limit (int, defaults to 20) – maximum number of unique categories to return. If you have many unique values, increase this number to > than number of unique categories. You can use the .unique() query to determine the number of categories and pass it.

Returns

Float with count if num_points=1 and group_by=None (the default values), else a pd.DataFrame.

min(num_points: int = 1, group_by: Optional[str] = None) Union[float, datetime.datetime, pandas.core.frame.DataFrame]#

Runs on int, float, and datetime series only. Get the minimum for this GantrySeries, if available.

Parameters
  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with min value if num_points=1 and group_by=None (the default values), else a pd.DataFrame.

max(num_points: int = 1, group_by: Optional[str] = None) Union[float, datetime.datetime, pandas.core.frame.DataFrame]#

Runs on int, float, and datetime series only. Get the maximum for this GantrySeries, if available.

Parameters
  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with max value if num_points=1 and group_by=None (the default values), else a pd.DataFrame.

histogram()#

Runs on bool, int, float and str series only. Get histogram of categories for this GantrySeries, if available.

Gets the histogram of percentages from [0, 1] for available data values for this GantrySeries.

Returns

Dict[str, float] histogram

unique() Set[str]#

Runs on bool and str series only. Get the unique values in this GantrySeries, if available.

Returns

Set[str] List containing all the unique values that occur in this series.

quantile(quantile_vals: List[float] = [0.5])#

Runs on int and float series only. Get quantiles for this GantrySeries, if available.

Parameters

quantile_vals – list of requested quantiles. Float values in the list should be in the range [0, 1].

Returns

List[floats] of len(quantiles_vals) where the order of the outputs matches the input

quantile_vals order.

pdf()#

Get requested probability density function for this GantrySeries, if available.

cdf()#

Get requested cumulative density function for this GantrySeries, if available.

describe() pandas.core.frame.DataFrame#

Runs on int, float, and bool series only. Return basic stats on the series.

Returns

A pandas dataframe with summary information.

percent_true(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#

Runs on boolean series only. Percent true, optionally drop null values.

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with true percentage if num_points=1 and group_by=None (by default),

else pd.DataFrame.

percent_false(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#

Runs on boolean series only. Percent false, optionally drop null values.

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with false percentage if num_points=1 and group_by=None (by default),

else pd.DataFrame.

percent_null(num_points: int = 1, group_by: Optional[str] = None)#

Runs on boolean series only. Percent null/NaN.

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with null percentage if num_points=1 and group_by=None (by default),

else pd.DataFrame.

percent_true_not_null(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#

Runs on boolean series only. Percent true of not null values, optionally drop null values before calculating result.

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with true percentage if num_points=1 and group_by=None (by default),

else pd.DataFrame.

percent_false_not_null(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#

Runs on boolean series only. Percent false of not null values, optionally drop null values before calculating result.

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.

Returns

Float with false percentage if num_points=1 and group_by=None (by default),

else pd.DataFrame.

Datasets#

A lightweight data container with simple versioning semantics that integrates seamlessly with Gantry curators.

class gantry.dataset.GantryDataset(api_client: gantry.api_client.APIClient, dataset_name: str, dataset_id: uuid.UUID, bucket_name: str, aws_region: str, dataset_s3_prefix: str, workspace: str)#

A class representing a lightweight container around your data that provides simple version control semantics for helping to make ML pipelines reproducible.

list_versions() List[Dict[str, Any]]#

This method will list all the versions of the dataset. Each dataset version is a snapshot of the dataset at a particular point in time. The result will be sorted from latest to earliest.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.list_versions()
[{'version_id': '7f100fca-f080-4daf-82d1-575e34234930',
  'dataset': 'dataset_name',
  'message': 'version notes',
  'created_at': 'Thu, 16 Feb 2023 00:16:33 GMT',
  'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06',
  'is_latest_version': True},
 {'version_id': '300cdb42-a8c9-4b6e-a16d-4f6c925eb25d',
  'dataset': 'dataset_name',
  'message': 'version notes',
  'created_at': 'Thu, 16 Feb 2023 00:16:18 GMT',
  'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06',
  'is_latest_version': False},
 {'version_id': '51e7771d-5ba9-4bdf-ab61-6d6ec9f7e066',
  'dataset': 'dataset_name',
  'message': 'initial dataset commit',
  'created_at': 'Thu, 16 Feb 2023 00:14:34 GMT',
  'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06',
  'is_latest_version': False}]
Returns

dataset versions from latest to earliest.

Return type

List[Dict[str, Any]]

pull(version_id: Optional[Union[str, uuid.UUID]] = None, forced: bool = False) Optional[Dict[str, Any]]#

Pull a specific version of the dataset from dataset server to local working directory.

If version ID is provided, this method will pull the snapshot of your dataset based on the version id to working directory. If version ID is not provided, this method will pull the latest version. If forced is set to True, any local changes will be discarded. This method will only change the files in your local dataset folder, pull an older version will not affect the version history.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
# pull the latest version of your dataset
>>> dataset.pull()
# pull a specific version of your dataset
>>> dataset.pull("b787034a-798b-4bb3-a726-0e197ddb8aff")
Parameters
  • version_id (str, optional) – target version ID, defaults to None which will pull the latest version.

  • forced (bool) – whether to discard local changes or not when pulling, defaults to False.

Returns

A dictionary with metadata representing the versioned rolled back to if successful, None otherwise.

Return type

Optional[Dict[str, Any]]

get_diff() Dict[str, List[str]]#

Show the local changes that have not been pushed to the server yet. This method will return a dictionary with three keys: new_files, modified_files, and deleted_files. Each key will have a list of files that have been added, modified, or deleted respectively.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.pull()
# make some changes to the dataset
>>> dataset.get_diff()
{'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'],
 'modified_files': ['tabular_manifests/modified_file_1.csv'],
 'deleted_files': ['tabular_manifests/deleted_file.csv']}
Returns

a dictionary representing the diff, which looks like this:

{
    "new_files": List[str],
    "modified_files": List[str],
    "deleted_files": List[str],
}

Return type

dict

push_version(message: str) Dict[str, Any]#

This method will create a new dataset version in the remote server with a version message. The new version will include all the local changes that have not been pushed. If there are no local changes, this method will return the current version.

To avoid race conditions, this method will check that the HEAD of the dataset is up to date. If the HEAD is not up to date, this method will raise a DatasetHeadOutOfDateException. In that case you could stash your local change and pull the latest version of the dataset and try again. Check out the documents for stash(), pull() and restore() for more details.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.pull()
# make some changes to the dataset
>>> dataset.get_diff()
{'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'],
'modified_files': ['tabular_manifests/modified_file_1.csv'],
'deleted_files': ['tabular_manifests/deleted_file.csv']}
>>> dataset.push_version("new version notes")
{'version_id': '09575ee7-0407-44b8-ae88-765a8270b17a',
'dataset': 'dataset_name',
'message': 'new version notes',
'created_at': 'Wed, 15 Feb 2023 22:17:55 GMT',
'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06',
'is_latest_version': True}
# After pushing the changes, the diff should be empty
>>> dataset.get_diff()
{'new_files':[],
'modified_files':[],
'deleted_files':[]}
Parameters

message (str) – a non-empty string representing a message to associate with the created version.

Returns

a dictionary with metadata representing the version created.

Return type

Dict[str, Any]

push_gantry_dataframe(gantrydf: gantry.query.core.dataframe.GantryDataFrame, filename: str = '')#

This method will take a GantryDataFrame as input, and save its contents to a new file in your dataset’s tabular_manifests folder, and create a new version. Note that all these operations happen in the remote server, if you have your dataset checked out locally, please make sure you call pull() after this operation to get the latest data.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.pull()
>>> os.listdir(workspace + dataset_name)
['.dataset_metadata', 'dataset_config.yaml', 'README.md', 'tabular_manifests']
>>> os.listdir(workspace + dataset_name + '/tabular_manifests')
[]
>>> gantrydf = gquery.query(
...   application=application,
...   start_time=start,
...   end_time=end,
...   version="1.1",
... )
>>>
>>> _ = dataset.push_gantry_dataframe(gantrydf)
>>>
>>> _ = dataset.pull()
>>> os.listdir(workspace + dataset_name + '/tabular_manifests')
['GantryDataAutoSave__4390b343-b802-4add-ab51-f86e53979c73_2023-02-13_12:09_to_2023-02-13_17:09_rows_0_69.csv']
Parameters
  • gantrydf (GantryDataFrame) – the GantryDataFrame that needs to be added to the dataset.

  • filename (str) – the name to be given to the file with data from the GantryDataFrame.

Returns

Latest commit info.

push_file(file: IO, filename: Optional[str] = None, message: Optional[str] = None, parent_version_id: Optional[Union[str, uuid.UUID]] = None) Dict[str, Any]#

This method will add a new file to the dataset in Gantry server and create a new version of the dataset. With this method you can easily add new files to the dataset without having to pull the dataset locally. If you have your dataset checked out locally, please make sure you call pull() after this operation to get the latest data. The difference between this method and push_tabular_file() is that this method can handle both tabular and non-tabular files. If you are uploading a tabular file, please use push_tabular_file() instead.

Parameters
  • file (IO) – the file to be uploaded

  • filename (str, Optional) – the name of the file to be uploaded. Defaults to the name of the file passed in the file parameter.

  • message (str, Optional) – the version message that will be associated with the upload of the file. Defaults to a generic message if not set

  • parent_version_id (uuid.UUID, Optional) – If specified, SDK will check whether you are making changes on top of the latest version, if not the operation will fail.

Returns

A dictionary with metadata representing the version created.

Return type

Dict[str, Any]

push_tabular_file(file: IO, filename: Optional[str] = None, message: Optional[str] = None, parent_version_id: Optional[Union[str, uuid.UUID]] = None) Dict[str, Any]#

This method will add a tabular file to the dataset’s tabular_manifests folder and create a new version of the dataset on dataset server. With this method you can easily add new tabular files to an existing dataset without having to pull the dataset locally. If you have your dataset checked out locally, please make sure you call pull() after this operation to get the latest data. If you are trying to add a non-tabular file, please use push_file() instead. Note: the tabular filename must be postfixed with .csv.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.push_tabular_file(
...     open("{file_to_be_added}.csv"),
...     "{dataset_file_name}.csv",
...     "version info"
...    )
{'version_id': '2b575ee7-0407-44b8-ae88-765a8270b17a',
'dataset': 'dataset_name',
'message': 'version info',
'created_at': 'Wed, 15 Feb 2023 22:17:55 GMT',
'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06',
'is_latest_version': True}
Parameters
  • file (IO) – the csv file to be uploaded

  • filename (str, Optional) – the name of the file to be uploaded. Defaults to the local file name of the file. Name must end with .csv.

  • message (str, optional) – the version message that will be associated with the upload of the file. Defaults to a generic message if not set

  • parent_version_id (uuid.UUID, optional) – If specified, SDK will check whether you are making changes on top of the latest version, if not the operation will fail.

Returns

A dictionary with metadata representing the version created.

Return type

Dict[str, Any]

push_dataframe(dataframe: pandas.core.frame.DataFrame, filename: Optional[str] = None, message: Optional[str] = None, parent_version_id: Optional[Union[str, uuid.UUID]] = None) Dict[str, Any]#

Add a dataframe to the dataset’s tabular_manifests folder as a csv file and create a new version of the dataset on the dataset server. With this method you can easily add new a dataframe to an existing dataset without having to pull the dataset locally. If you have your dataset checked out locally, please make sure you call pull() after this operation.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.push_dataframe(
...     pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}),
...     "dataframe.csv",
...     "version info"
... )
{'version_id': 'fa575ee7-0407-44b8-ae88-765a8270b17a',
'dataset': 'dataset_name',
'message': 'version info',
'created_at': 'Wed, 15 Feb 2023 22:17:55 GMT',
'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06',
'is_latest_version': True}
Parameters
  • dataframe (pd.DataFrame) – the dataframe to be uploaded

  • filename (str, Optional) – the name for the csv that will be uploaded. Defaults to dataframe-<random-hash>.csv

  • message (str, Optional) – the version message that will be associated with the upload of the file. Defaults to a generic message if not set

  • parent_version_id (uuid.UUID, Optional) – If specified, SDK will check whether you are making changes on top of the latest version, if not the operation will fail.

Returns

A dictionary with metadata representing the version created.

Return type

Dict[str, Any]

rollback(version_id: Union[str, uuid.UUID], forced: bool = False) Optional[Dict[str, Any]]#

Rollback the dataset data to a previous version and create a new dataset version based on the target version id. With this method you can easily rollback to any previous version of your dataset and continue working on top of it. Note that to do this, you must have your local dataset folder up to date. Also this method will automatically pull the latest version to local dataset folder after the rollback. If forced is set to True, any local changes will be discarded.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.pull()
>>> dataset.list_versions()
[{'version_id': 'd6f42bd0-b7a9-4089-88fb-be7cee6d4a83',
'dataset': 'dataset_name',
'message': 'version info',
'created_at': 'Wed, 08 Feb 2023 22:02:02 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': True},
{'version_id': 'b787034a-798b-4bb3-a726-0e197ddb8aff',
'dataset': 'dataset_name',
'message': 'initial dataset commit',
'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': False}]
>>> dataset.rollback("b787034a-798b-4bb3-a726-0e197ddb8aff")
{'version_id': '23bc4d35-0df2-424c-9156-d5ca105eb4c1',
'dataset': 'dataset_name',
'message': 'Rollback dataset to version: b787034a-798b-4bb3-a726-0e197ddb8aff',
'created_at': 'Thu, 09 Feb 2023 00:36:07 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': True}
>>> dataset.list_versions()
[{'version_id': '23bc4d35-0df2-424c-9156-d5ca105eb4c1',
'dataset': 'dataset_name',
'message': 'Rollback dataset to version: b787034a-798b-4bb3-a726-0e197ddb8aff',
'created_at': 'Thu, 09 Feb 2023 00:36:07 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': True},
{'version_id': 'd6f42bd0-b7a9-4089-88fb-be7cee6d4a83',
'dataset': 'dataset_name',
'message': 'version info',
'created_at': 'Wed, 08 Feb 2023 22:02:02 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': False},
{'version_id': 'b787034a-798b-4bb3-a726-0e197ddb8aff',
'dataset': 'dataset_name',
'message': 'initial dataset commit',
'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': False}]
Parameters
  • version_id (str) – ID of the version to rollback to.

  • forced (bool) – whether to discard local changes when rolling back.

Returns

A dictionary with metadata representing the versioned rolled back to if successful, None otherwise.

Return type

Optional[Dict[str, Any]]

stash() None#

This method will stash all the changes in your local dataset folder to a temporary folder. You can use this method to save your changes before the pull() method. Later on you can use the restore() method to restore your changes. Note that if you run stash() multiple times, the changes will be overwritten and only the latest stash will be kept.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.pull()
# make some changes to your dataset
>>> dataset.get_diff()
{'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'],
'modified_files': ['tabular_manifests/modified_file_1.csv'],
'deleted_files': ['tabular_manifests/deleted_file.csv']}
>>> dataset.stash()
>>> dataset.get_diff()
{'new_files': [],
'modified_files': [],
'deleted_files': []}
Returns

None

restore() None#

This method will restore the changes from the stash to your local dataset folder. Note that if you have made changes to your local dataset folder after the stash() method, those changes could be overwritten by this method.

Example usage:

>>> import gantry.dataset as gdataset
>>> dataset = gdataset.get_dataset(dataset_name)
>>> dataset.pull()
# make some changes to your local dataset folder
>>> dataset.get_diff()
{'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'],
'modified_files': ['tabular_manifests/modified_file_1.csv'],
'deleted_files': ['tabular_manifests/deleted_file.csv']}
>>> dataset.stash()
>>> dataset.get_diff()
{'new_files': [],
'modified_files': [],
'deleted_files': []}
>>> dataset.restore()
>>> dataset.get_diff()
{'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'],
'modified_files': ['tabular_manifests/modified_file_1.csv'],
'deleted_files': ['tabular_manifests/deleted_file.csv']}
Returns

None

delete() None#

Mark the dataset as deleted. As for now this will only hide dataset from the list_datasets API results, it will not delete the dataset data from the server, we are working on implementing the hard delete feature.

Returns

None

pull_huggingface_dataset_by_version(version_id: Optional[str] = None, hf_dataset_name: Optional[str] = None, streaming=False)#

This function will pull the dataset tabular files from gantry server and convert it to huggingface dataset. This method will not pull data to your local dataset folder, it will only load dataset tabular data from a remote server into a huggingface dataset object.

If there is no version_id specified, it will load the latest version of the dataset. If streaming set to True, it will return an IterableDataset or IterableDatasetDict instead of a Dataset or DatasetDict.

Refer to HuggingFace Documentation for more details about streaming.

Example usage:

import gantry.dataset as gdataset

dataset = gdataset.get_dataset(dataset_name)

# download your dataset as huggingface dataset
hf_dataset = dataset.pull_huggingface_dataset_by_version(version_id="version_id")

# pull the dataset as IterableDataset with streaming
hf_dataset = dataset.pull_huggingface_dataset_by_version(version_id="version_id", streaming=True) # noqa: E501
Parameters
  • version_id (Optional[str], optional) – dataset version id to load. Defaults to latest.

  • hf_dataset_name (Optional[str], optional) – param to overwrite the huggingface dataset name. Note this param must be a valid python identifier.

  • streaming (bool, default False) – If set to True, don’t download the data files. Instead, it streams the data progressively while iterating on the dataset. An IterableDataset or IterableDatasetDict is returned instead in this case.

get_huggingface_dataset(hf_dataset_name: Optional[str] = None, streaming: bool = False)#

This function will convert your local dataset folder to a huggingface dataset. Internally Gantry SDK will create a huggingface load script based on the dataset config and then load the dataset into huggingface pyarrow dataset. If streaming set to True, it will return an IterableDataset or IterableDatasetDict instead of a Dataset or DatasetDict.

Refer to HuggingFace documentation for more details about streaming.

Example usage:

import gantry.dataset as gdataset

dataset = gdataset.get_dataset(dataset_name)
dataset.pull()

# load the dataset into huggingface dataset
hf_dataset = dataset.get_huggingface_dataset()

# load the dataset into huggingface dataset with streaming
hf_dataset = dataset.get_huggingface_dataset(streaming=True)
Parameters
  • versoion_id (Optional[str], optional) – dataset version id to load. Defaults to latest.

  • hf_dataset_name (Optional[str], optional) – param to overwrite the huggingface dataset name. Note this param must be a valid python identifier.

  • streaming (bool, default False) – If set to True, don’t download the data files. Instead, it streams the data progressively while iterating on the dataset. An IterableDataset or IterableDatasetDict is returned instead in this case.

gantry.dataset.create_dataset(name: str, bucket_name: Optional[str] = None, app_name: Optional[str] = None) gantry.dataset.gantry_dataset.GantryDataset#

Alias for gantry.dataset.client.GantryDatasetClient.create_dataset()

Create a dataset with the provided name. If app_name is provided, we will use the application schema to set the dataset schema. bucket_name can be provided if you want to store data in your own bucket, please contact Gantry support if you want to use this feature. If not provided, we will use a gantry managed bucket.

Example usage:

import gantry.dataset as gdataset
# Create an empty dataset with the name "dataset_name"
gdataset.create_dataset("dataset_name")
# Create a dataset with the name "dataset_name" and set the schema to the schema of the
# application "app_name"
gdataset.create_dataset("dataset_with_app", app_name="app_name")
Parameters
  • name (str) – dataset name

  • bucket_name (str) – Provide bucket name if you want to use your own bucket. If not provided we will use a gantry managed bucket.

  • app_name (Optional[str], optional) – gantry application name which will be used to set dataset schema if provided.

Returns

an object representing the created dataset.

Return type

gantry.dataset.GantryDataset

gantry.dataset.get_dataset(name: str, app_name: Optional[str] = None) gantry.dataset.gantry_dataset.GantryDataset#

Alias for gantry.dataset.client.GantryDatasetClient.get_dataset()

Get a dataset object by name. If the dataset is marked as deleted, a warning will be logged. If the dataset does not exist, a gantry.exceptions.DatasetNotFoundError will be raised.

Example usage:

import gantry.dataset as gdataset
dataset = gdataset.get_dataset("dataset_name", "test_app")
Parameters
  • name (str) – the name of the dataset.

  • app_name (str) – the name of the application the dataset belongs to.

Returns

An object representing the dataset name.

Return type

gantry.dataset.GantryDataset

gantry.dataset.list_dataset_versions(dataset_name: str) List[Dict[str, Any]]#

Alias for gantry.dataset.client.GantryDatasetClient.list_dataset_versions()

List all versions of a dataset. Each dataset version is a snapshot of the dataset at a particular point in time. The result will be sorted from latest to earliest.

Example usage:

import gantry.dataset as gdataset
gdataset.list_dataset_versions("dataset_name")

Output example:

[{'version_id': 'd6f42bd0-b7a9-4089-88fb-be7cee6d4a83',
'dataset': 'dataset_name',
'message': 'version info',
'created_at': 'Wed, 08 Feb 2023 22:02:02 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': True},
{'version_id': 'b787034a-798b-4bb3-a726-0e197ddb8aff',
'dataset': 'dataset_name',
'message': 'initial dataset commit',
'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT',
'created_by': 'db459d6d-c83b-496d-b659-e48bca971156',
'is_latest_version': False}]
Parameters

name (str) – the name of the dataset.

Returns

list of dictionaries with metadata.

Return type

List[Dict[str, Any]]

gantry.dataset.list_datasets(include_deleted: bool = False, model_node_id: Optional[str] = None) List[Dict[str, Any]]#

Alias for gantry.dataset.client.GantryDatasetClient.list_datasets()

List all datasets. If include_deleted is set to True, deleted datasets will be included in the result list.

Example usage:

import gantry.dataset as gdataset
gdataset.list_datasets()

Output example:

[{'name': 'dataset_0',
'dataset_id': '44e8dff0-1e4c-4484-843c-ca3c585d405d',
'created_at': 'Sun, 12 Feb 2023 00:08:12 GMT'},
{'name': 'dataset_1',
'dataset_id': '3adb66fa-9dc7-4a60-86b6-83389f567186',
'created_at': 'Sun, 12 Feb 2023 00:05:15 GMT'},
{'name': 'dataset_2',
'dataset_id': '0a5b5706-2060-4601-8c5c-22900f06d54a',
'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT'},
Parameters

include_deleted (bool) – whether include deleted datasets in the result, defaults to False.

Returns

List of dictionaries, each representing one dataset and associated metadata.

Return type

List[Dict[str, Any]]

gantry.dataset.delete_dataset(name: str) None#

Alias for gantry.dataset.client.GantryDatasetClient.delete_dataset()

Mark the dataset as deleted. As for now this will only hide dataset from the list_datasets API results, it will not delete the dataset data from the server, we will release the hard deletion feature later.

Example usage:

import gantry.dataset as gdataset
gdataset.delete_dataset("dataset_name")
Parameters

name (str) – the name of the dataset to be deleted.

Returns

None

gantry.dataset.set_working_directory(working_dir: str) None#

Alias for gantry.dataset.client.GantryDatasetClient.set_working_directory()

Set the working directory for the dataset client. This is the directory where the local copy of the dataset will be stored.

Example usage:

import gantry.dataset as gdataset
gdataset.set_working_directory("your working directory")
Parameters

working_dir (str) – absolute path of the directory used to store local copy of datasets.

Returns

None

Queries#

Time Window#

Define a time window for querying data.

class gantry.query.time_window.TimeWindow(start_time: Union[str, datetime.datetime], end_time: Union[str, datetime.datetime])#
__init__(start_time: Union[str, datetime.datetime], end_time: Union[str, datetime.datetime])#

Initialize a standard time window.

Parameters
  • start_time (Union[str, datetime.datetime]) – the start time of the time window.

  • end_time (Union[str, datetime.datetime]) – the end time of the time window.

class gantry.query.time_window.RelativeTimeWindow(window_length: datetime.timedelta, offset: Optional[datetime.timedelta] = None)#
__init__(window_length: datetime.timedelta, offset: Optional[datetime.timedelta] = None)#

Initialize a relative time window.

Parameters
  • window_length (datetime.timedelta) – the length of the time window.

  • offset (Optional[datetime.timedelta]) – the offset of the time window. Defaults to None.

Metrics#

gantry.query.metric.main.accuracy_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.accuracy_score()

Categorical metric - accuracy

In multilabel classification, this computes the set of |outputs| which exactly match the available |feedback|.

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) accuracy score

gantry.query.metric.main.mean_squared_error(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, multioutput: Literal['uniform_average', 'raw_values'] = 'uniform_average', squared: bool = True) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.mean_squared_error()

Regression metric- mean squared error

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into.

  • multioutput ("uniform_average" or "raw_values") – type of averaging to use when computing the metric. Defaults to “uniform_average”.

  • squared (boo, defaults to True) – if True, return the squared error.

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) mean_squared_error

gantry.query.metric.main.confusion_matrix(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.confusion_matrix()

Categorical metric - confusion matrix The confusion matrix is a matrix \(C\) where \(C_{i, j}\) represents the number of times a data point from the class \(i\) was predicted to be in class \(j\).

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries.

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1)

confusion_matrix

gantry.query.metric.main.f1_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, average: Literal['micro'] = 'micro') pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.f1_score()

Categorical metric - F1 score

It is computed as the harmonic mean of precision and recall: F1 = 2 * (precision * recall) / (precision + recall) In multiclass classification, this is the average of the F1 score for all available classes.

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries.

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • average ("micro") – type of averaging to use when computing the metric. Currently only “micro” supported, which is the default value.

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) f1_score

gantry.query.metric.main.r2_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, multioutput: Literal['uniform_average', 'raw_values', 'variance_weighted'] = 'uniform_average') float#

Alias for gantry.query.core.GantryMetric.r2_score()

Regression metric- R^2 coefficient of determination

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries.

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • multioutput ("uniform_average", "raw_values", "variance_weighted") – type of averaging to use when computing the metric. Defaults to “uniform_average”.

Returns

(float) R^2 score.

gantry.query.metric.main.precision_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, average: Literal['micro'] = 'micro') pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.precision_score()

Categorical metric - precision score

precision = (number of true positives) / ((number of true positives) + (number of false positives))

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries.

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • average ("micro") – type of averaging to use when computing the metric. Currently only “micro” supported, which is the default value.

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) precision_score

gantry.query.metric.main.recall_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, average: Literal['micro'] = 'micro') pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.recall_score()

Categorical metric - recall score

recall = (number of true positives) / ((number of true positives) + (number of false negatives))

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries.

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

  • average ("micro") – type of averaging to use when computing the metric. Currently only “micro” supported, which is the default value.

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) recall_score

gantry.query.metric.main.roc_auc_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.roc_auc_score()

Classification score metric - the area under the ROC curve

Parameters
  • outputs (GantrySeries) – predictions as a GantrySeries

  • feedback (GantrySeries) – labels to compare against as a GantrySeries.

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) roc_auc_score

gantry.query.metric.main.percent_null(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.percent_null()

Percent null/NaN

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, drop rows with NaN values in result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_null

gantry.query.metric.main.percent_true(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.percent_true()

Percent true

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_true

gantry.query.metric.main.percent_false(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.percent_false()

Percent false

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_false

gantry.query.metric.main.percent_true_not_null(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.percent_true_not_null()

Percent true after excluding null values

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_true_not_null

gantry.query.metric.main.percent_false_not_null(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame#

Alias for gantry.query.core.GantryMetric.percent_false_not_null()

Percent false after excluding null values

Parameters
  • data_node (GantrySeries) – GantrySeries which will be calculated

  • dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.

  • num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into

Returns

(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_false_not_null

Distances#

gantry.query.distance.main.d1(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float#

Alias for gantry.query.core.GantryDistance.d1()

Computes the D1 distance between the input feature’s distributions.

Parameters
  • feat1 (GantrySeries) – feature as a GantrySeries

  • feat2 (GantrySeries) – feature to compute dist with as a GantrySeries

Returns: float d1 distance

gantry.query.distance.main.dinf(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float#

Alias for gantry.query.core.GantryDistance.dinf()

Computes the maximum distance between the input features’s distributions.

Parameters
  • feat1 (GantrySeries) – feature as a GantrySeries

  • feat2 (GantrySeries) – feature to compute dist with as a GantrySeries

Returns: float d_inf distance

gantry.query.distance.main.ks(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float#

Alias for gantry.query.core.GantryDistance.ks()

Performs the one-sample Kolmogorov-Smirnov test for goodness of fit between the input features’s distributions.

Parameters
  • feat1 (GantrySeries) – feature as a GantrySeries

  • feat2 (GantrySeries) – feature to compute dist with as a GantrySeries

Returns: Tuple[float ks distance measure]

gantry.query.distance.main.kl(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float#

Alias for gantry.query.core.GantryDistance.kl()

Gets the Kullback-Leibler divergence between the input features’s distributions.

Parameters
  • feat1 (GantrySeries) – feature as a GantrySeries

  • feat2 (GantrySeries) – feature to compute dist with as a GantrySeries

Returns: Tuple[float kl divergence]