Applications and Logging#
- gantry.init(api_key: Optional[str] = None, logging_level: Optional[typing_extensions.Literal[DEBUG, INFO, WARNING, CRITICAL, ERROR]] = None, environment: Optional[str] = None, send_in_background: Optional[bool] = None)#
Initialize gantry services. This is always the first step in using the SDK.
Example:
import gantry gantry.init(api_key="foobar")
- Parameters
api_key (optional, str) – Your Gantry API Key. You can also set this parameter by setting the env variable GANTRY_API_KEY.
logging_level (optional, str) – Set logging level for Gantry system logging. You can also set this parameter by setting the env variable GANTRY_LOGGING_LEVEL. Options are: DEBUG, INFO, WARNING, CRITICAL or ERROR. If not specified, it defaults to INFO.
environment (optional, str) – Set the value for the environment label attached to data instrumented. You can also set this parameter by setting the env variable GANTRY_ENVIRONMENT. If not provided, it defaults to “dev”. The environment is a tag attached to data. To override this value on ingestion, you set the ‘env’ tag in the tags parameter.
send_in_background (optional, bool) – Set whether Gantry logging methods should run synchronously. You can also set this value by setting the env variable GANTRY_SEND_IN_BACKGROUND. If not provided, it defaults to True unless running in an AWS lambda.
- class gantry.Application(name: str, api_client: gantry.api_client.APIClient, id: Optional[uuid.UUID] = None, organization_id: Optional[uuid.UUID] = None)#
A class to represent a single application. This class is not directly initialized, but is instead initialized by calling gantry.create_application(“my-app”), or gantry.get_application(“my-app”).
- __init__(name: str, api_client: gantry.api_client.APIClient, id: Optional[uuid.UUID] = None, organization_id: Optional[uuid.UUID] = None)#
Note: Please use gantry.create_application() to create an application instead of defining an application directly by calling this method.
- log(inputs: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, outputs: Optional[Union[Any, List[Any], dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, feedbacks: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, ignore_inputs: Optional[List[str]] = None, timestamps: Optional[Union[datetime.datetime, List[datetime.datetime], pandas.core.indexes.datetimes.DatetimeIndex, numpy.ndarray]] = None, sort_on_timestamp: bool = True, sample_rate: float = 1.0, as_batch: Optional[bool] = False, version: Optional[Union[str, int]] = None, tags: Optional[Union[Dict[str, str], List[Dict[str, str]]]] = None, row_tags: Optional[Union[Dict[str, str], List[Dict[str, str]], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, global_tags: Optional[Dict[str, str]] = None, join_keys: Optional[Union[str, List[str], pandas.core.series.Series]] = None)#
Ingests an event or a batch of events containing predictions (inputs and outputs), feedback, or both.
Example:
app.log( inputs=[{'A': 100}, {'A': 101}], outputs=[{'B': 'foo'}, {'B': 'bar'}], version=1, feedbacks=[{'B': 'bar'}, {'B': 'foo'}], global_tags={"env": "environment1"}, join_keys=["12345", "67890"] )
- Parameters
inputs (optional, Union[dict, List[dict], pd.DataFrame, pd.Series, np.ndarray]) – A list of prediction inputs. inputs[i] is a dict of the features for the i-th prediction to be logged.
outputs (optional, Union[Any, List[Any], dict, List[dict], pd.DataFrame, pd.Series, np.ndarray) – A list of prediction outputs. outputs[i] should be the application output for the prediction with features inputs[i].
feedbacks (optional, Union[dict, List[dict], pd.DataFrame, pd.Series, np.ndarray]) – A list of feedbacks. feedbacks[i] is a dict of the features for the i-th prediction to be logged.
ignore_inputs (optional, List[str]) – A list of names of input features that should not be logged.
timestamps (optional, Union[List[datetime.datetime], pd.DatetimeIndex, np.array[datetime.datetime]) – A list of prediction timestamps. If specified, timestamps[i] should be the timestamps for the i-th prediction. If timestamps = None (default), then the prediction timestamp defaults to the time when log_records is called.
sort_on_timestamp (bool, defaults to True) – Works when timestamps are provided. Sort using the given timestamp. Default to True.
sample_rate – Used for down-sampling. The probability as a float that each record will be sent to Gantry.
as_batch (bool, defaults to False) – Whether to add batch metadata and tracking in the ‘batch’ section of the dashboard
version (optional, Union[int, str]) – Version of the function schema to use for validation by Gantry. If not provided, Gantry will use the latest version. If the version doesn’t exist yet, Gantry will create it by auto-generating the schema based on data it has seen. Providing an int or its value stringified has no difference (e.g. version=10 will be the same as version=’10’).
tags (optional, Optional[Union[Dict[str, str], List[Dict[str, str]]]) – A tag is a label that you assign to your data. E.g. you can specify which environment the data belongs to by setting “env” tag like this tags = {“env”: “environment1”} if not assigned we will use Gantry client’s environment value as the defaults environment. If this parameter is a dict, tags will apply to all records. Alternatively, you can pass a list of dicts to apply tags to each record independantly. IMPORTANT: this parameter is in deprecation mode and it will be removed soon. Use row_tags and global_tags instead.
row_tags (optional, Optional[Union[List[Dict[str, str]], pd.DataFrame, pd.Series, np.ndarray]]) –
Specify row level tags. If provided, this parameter should be a list of tags to be applied to each of the records. row_tags[i] will contain a dictionary of strings containing the tags to attach to the i-th record. Alternatively, tags may be specified by passing in a DataFrame, Series, or Array, like inputs.
For batch global tags, use the ‘global_tags’ parameter instead.
global_tags (optional, Dict[str, str]) – Specify a set of tags that will be attached to all ingested records from this batch. For record specific tags, you can use ‘row_tags’ param. Only used when log is not called within a run.
join_keys (optional, Union[List[str], pd.Series[str]]) – provide keys to identify each record. These keys can be used later to provide feedback. If not provided, a random record key will be generated for each record.
- Returns
The batch_id will be None if records are not logged as batch. The list of join_keys will be the records keys.
- Return type
Tuple ([batch_id, list[join_keys]])
- query(time_window: Optional[Union[gantry.query.time_window.TimeWindow, gantry.query.time_window.RelativeTimeWindow]] = None, version: Optional[Union[str, int]] = None, env: Optional[str] = None, filters: Optional[List[dict]] = None, tags: Optional[dict] = None)#
Query for a window of data for this application, with given parameters.
Example:
time_window = RelativeTimeWindow(window_length = datetime.timedelta(days=1), offset = datetime.timedelta(minutes=1)) query = app.query( time_window, tags = {"champion": "messi"}, filters=[ { "feature_name": "inputs.speed", "lower_bound": 0, "upper_bound": 3 } ] )
- Parameters
time_window (optional, Union[TimeWindow, RelativeTimeWindow]) –
A time window object. If a TimeWindow is passed in, the query is saved with fixed timestamps. If a RelativeTimeWindow is passed in, the query is saved with relative time
window and offset.
version (optional, Union[int, str]) – The version of the application to query.
env (optional, str) – The environment of the application to query.
filters (optional, list[dict]) – A list of filters to apply to the query.
tags (optional, dict) – A dictionary of tags to apply to the query.
- Returns
Gantry Dataframe with all the query information to fetch data.
- save_query(name: str, query: gantry.query.core.dataframe.GantryDataFrame)#
Save a query to the Gantry database.
Example:
query = app.query(....) app.save_query("demo_query", query)
- Parameters
name (str) – The name of the query.
query (GantryDataFrame) – The query to save.
- get_query(name: str) gantry.query.core.dataframe.GantryDataFrame #
Get a saved query from the Gantry database.
Example:
query = app.get_query("demo_query")
- Parameters
name (str) – The name of the query.
- Returns
Gantry Dataframe with all the query information to fetch data.
- list_queries()#
List all saved queries for this application. Example:
queries = app.list_queries() for query in queries: print(query["name"])
- Returns
A list of queries (in Python dict).
- start_run(name: str, tags: Optional[dict] = None)#
Start a run for this application.
Example:
with app.start_run(name="Demo_run", tags = {"champion": "messi"}) as run: app.log(inputs=inputs, outputs=outputs, feedbacks=feedback, join_keys=join_keys) app.log(inputs={...}, outputs={...}, join_keys='12345') app.log(....)
- Parameters
name (str) – Name of the run.
tags (optional, dict) – A dictionary of tags to apply to the run.
- Returns
A Run object.
- get_run(name: Optional[str] = None, tags: Optional[dict] = None)#
Get a run in this application, filtered by tags or name.
Example:
job_info = app.get_run(name="Demo_run_prediction")
jobs_info = app.get_run(tags={"champion": "argentina"})
- Parameters
name (optional, str) – Name of the run.
tags (optional, dict) – Tags of the run.
- Returns
An ingestion job object associated with the requested run.
- add_automation(automation: gantry.automations.automations.Automation)#
Add an automation to this application.
Example:
automation = Automation(...) app.add_automation(automation)
- Parameters
automation (Automation) – An Automation object.
- Returns
None
- get_automation(automation_name: Optional[str] = None)#
Get an automation by name
Example:
automation = app.get_automation(automation_name="demo_automation")
- Parameters
automation_name (optional, str) – Name of the automation.
- Returns
An Automation object.
- list_automations()#
List all automations for this application.
Example:
automations = app.list_automations() for automation in automations: print(automation["name"])
- Returns
A list of Automation objects.
- list_workspaces()#
Get all workspaces for this application.
Example:
workspaces = app.list_workspaces() for workspace in workspaces: print(workspace["name"])
- Returns
A list of Workspace objects.
- get_schema()#
Get the schema for this application.
Example:
schema = app.get_schema() print(schema["func_name"]) -> application name
- Returns
The application schema (in Python dict).
- create_dataset(name: str)#
Create a dataset for this application.
Example:
dataset = app.create_dataset(name="demo_dataset")
- Parameters
name (str) – The name of the dataset.
- Returns
an object representing the created dataset.
- Return type
- list_datasets(include_deleted: bool = False) List[Dict[str, Any]] #
List all datasets for this application.
Example:
datasets = app.list_datasets() for dataset in datasets: print(dataset["name"])
- Parameters
include_deleted (optional, bool) – Will include deleted datasets if set to true and will omit them otherwise.
- Returns
List of dictionaries, each representing one dataset and associated metadata.
- Return type
List[Dict[str, Any]]
- class gantry.CompletionApplication(name: str, api_client: gantry.api_client.APIClient, id: typing.Optional[uuid.UUID] = None, organization_id: typing.Optional[uuid.UUID] = None, log_store_factory: typing.Callable[[gantry.api_client.APIClient], gantry.logger.stores.BaseLogStore] = <class 'gantry.applications.llm.CompletionAPILogStore'>)#
A class representing a completion application, a subclass of Application. This has special logging methods for logging completions.
- __init__(name: str, api_client: gantry.api_client.APIClient, id: typing.Optional[uuid.UUID] = None, organization_id: typing.Optional[uuid.UUID] = None, log_store_factory: typing.Callable[[gantry.api_client.APIClient], gantry.logger.stores.BaseLogStore] = <class 'gantry.applications.llm.CompletionAPILogStore'>)#
Note: Please use gantry.create_application() to create an application instead of defining an application directly by calling this method.
- create_version(prompt_template: str, description: str, model: str, model_params: Optional[dict] = None, prompt_inputs: Optional[List[str]] = None) gantry.applications.llm.VersionDetails #
Creates a new version of a prompt template associated to this application.
Example usage for a version using OpenAI Completion model:
import gantry my_llm_app = gantry.get_application("my_llm_app") my_llm_app.create_version( prompt_template="This is a prompt template. {{input1}} {{input2}}", description="My first version", model="text-davinci-001", model_params={"temperature": 0.5}, prompt_inputs=["input1", "input2"], )
Example usage for a version using OpenAI Chat model with function support:
import gantry my_llm_app = gantry.get_application("my_llm_app") my_llm_app.create_version( prompt_template=( "Assistant: you are a helpful assistant\n\n" "User: What's the weather in Boston?" ), description="My first version", model="gpt-4", model_params={ "temperature": 1, "function_call": "auto", "functions": [ { "name": "get_current_weather", "description": "Get the current weather in a given location", "parameters": { "type": "object", "properties": { "location": { "type": "string", "description": "The city and state, e.g. San Francisco, CA" }, "unit": { "type": "string", "enum": ["celsius", "fahrenheit"] } }, "required": ["location"] } } ] } )
Example usage for a version using Cohere model:
import gantry my_llm_app = gantry.get_application("my_llm_app") my_llm_app.create_version( prompt_template="This is a prompt template. {{input1}} {{input2}}", description="My first version", model="command", model_params={"max_tokens": 500}, prompt_inputs=["input1", "input2"], )
Example output:
VersionDetails( config={ "model": "text-davinci-002", "params": { "frequency_penalty": 0.0, "max_tokens": 16, "presence_penalty": 0.0, "temperature": 0.5, "top_p": 1.0 }, "prompt": "This is a prompt template. {{input1}} {{input2}}", "prompt_inputs": ["input1", "input2"], }, description="My first version", app_name="my_llm_app", version_id=UUID("a1b2c3d4-e5f6-4a3b-8c2d-1e2f3a4b5c6d"), version_number=1, )
- IMPORTANT: ‘functions’ and ‘function_call’ are valid model_params only
for ‘openai-chat’ vendor.
- Parameters
prompt_template (str) – The prompt template to use for this version. Denote variables like this: {{variable}}
description (str) – A description of this version.
model (str) – The model to use for this version. Must be a valid OpenAI completion text model.
model_params (dict, optional) – A dictionary of model parameters to use for this prompt. See vendor’s documentation for more details.
prompt_inputs (List[str], optional) – A list of input names to use for this prompt, when using prompt variables. If not provided, the function will attempt to extract the input names from the prompt template.
- Returns
A dataclass containing the configuration data, description, application name, version ID, and version number of the newly created version.
- Return type
VersionDetails
- get_version(version: Literal['prod', 'test', 'latest']) Optional[gantry.applications.llm.VersionDetails] #
Returns the prompt configuration data for a given version of this application.
Example usage:
import gantry my_llm_app = gantry.get_application("my_llm_app") my_llm_app.get_version("prod")
Example output:
VersionDetails( config={ "model": "text-davinci-002", "params": { "frequency_penalty": 0.0, "max_tokens": 16, "presence_penalty": 0.0, "temperature": 0.5, "top_p": 1.0 }, "prompt": "This is a prompt template. {{input1}} {{input2}}", "prompt_inputs": ["input1", "input2"], }, description="My first version", app_name="my_llm_app", version_id=UUID("a1b2c3d4-e5f6-4a3b-8c2d-1e2f3a4b5c6d"), version_number=1, )
- Parameters
version (Literal["prod", "test", "latest"]) – The version to get. Can be one of “prod”, “test”, or “latest”. When “latest” is used, the latest version will be returned, regardless of deployment status. When “prod” or “test” is used, the latest version that has been deployed to that environment will be returned.
- Returns
A dataclass containing the configuration data, description, application name, version ID, and version number of the requested version. If no version is found, returns None.
- Return type
Optional[VersionDetails]
- log_llm_data(api_request: Dict, api_response: Dict, request_attributes: Optional[Dict] = None, response_attributes: Optional[Dict] = None, feedback: Optional[Dict] = None, selected_choice_index: int = 0, session_id: Optional[Union[str, uuid.UUID]] = None, tags: Optional[Dict] = None, version: Optional[int] = None, vendor: str = 'openai')#
Log request/response LLM completion pair into Gantry from several vendors. See documentation on supported vendors.
Example usage:
import gantry from gantry.applications.llm_utils import fill_prompt import openai from openai.util import convert_to_dict gantry.init() my_llm_app = gantry.get_application("my-llm-app") version = my_llm_app.get_version("test") # "latest", "prod", or "test" config = version.config prompt = config['prompt'] def generate(values): filled_in_prompt = fill_prompt(prompt, values) request = { "model": "text-davinci-002", "prompt": filled_in_prompt, } results = openai.Completion.create(**request) my_llm_app.log_llm_data( api_request=request, api_response=convert_to_dict(results), request_attributes={"prompt_values": values}, version=version.version_number, ) return results
- Parameters
api_request (dict) – The LLM completion request.
response (dict) – The LLM completion response.
request_attributes (Optional[dict]) – Additional inputs
response_attributes (Optional[dict]) – Additional outputs
feedback (Optional[dict]) – Optional feedback data. See https://docs.gantry.io/docs/logging-feedback-actuals for more information about logging feedback in Gantry.
selected_choice_index (Optional[int]) – The selected choice index. Defaults to 1.
session_id (Optional[str or uuid.UUID]) – Optional session to group data together.
tags (Optional[dict]) – Optional tags to add to the record.
version (Optional[int]) – The version number used.
vendor (str, defaults to 'openai') – The vendor the data is from. See Gantry documentation for supported vendors.
- deploy_version(version_id: uuid.UUID, env: Literal['prod', 'test']) None #
Deploys a version in a specific environment.
Example usage:
import gantry my_llm_app = gantry.get_application("my_llm_app") version = my_llm_app.get_version("latest") my_llm_app.deploy_version(version.version_id, "prod")
- Parameters
version_id (uuid) – The version id.
env (str, 'prod' or 'test') – The target environment, can be ‘prod’ or ‘test’.
- class gantry.Run(application, tags: Optional[Dict], name: str)#
Context manager for logging.
- __init__(application, tags: Optional[Dict], name: str)#
- merge_events()#
Merge prediction and feedback events into a single list of events.
- gantry.log(application: str, version: Optional[Union[str, int]] = None, inputs: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, outputs: Optional[Union[Any, List[Any], dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, feedbacks: Optional[Union[dict, List[dict], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, ignore_inputs: Optional[List[str]] = None, tags: Optional[Union[Dict[str, str], List[Dict[str, str]]]] = None, timestamps: Optional[Union[datetime.datetime, List[datetime.datetime], pandas.core.indexes.datetimes.DatetimeIndex, numpy.ndarray]] = None, sort_on_timestamp: bool = True, sample_rate: float = 1.0, row_tags: Optional[Union[Dict[str, str], List[Dict[str, str]], pandas.core.frame.DataFrame, pandas.core.series.Series, numpy.ndarray]] = None, global_tags: Optional[Dict[str, str]] = None, join_keys: Optional[Union[str, List[str], pandas.core.series.Series]] = None, as_batch: Optional[bool] = False, run_id: Optional[str] = None, run_tags: Optional[dict] = None)#
Alias for
gantry.logger.client.Gantry.log()
A general log function to log inputs, outputs, and feedbacks regardless of single or multiple records that is not attached to a specific application. This is intended for use primarily in contexts production where contexts where avoiding initializing an instance of the application is preferable.
- Parameters
application (str) – Name of the application. Gantry validates and monitors data by function.
version (optional, Union[int, str]) – Version of the function schema to use for validation by Gantry. If not provided, Gantry will use the latest version. If the version doesn’t exist yet, Gantry will create it by auto-generating the schema based on data it has seen. Providing an int or its value stringified has no difference (e.g. version=10 will be the same as version=’10’).
inputs (Union[List[dict], pd.Dataframe]) – A list of prediction inputs. inputs[i] is a dict of the features for the i-th prediction to be logged.
outputs (Union[List[dict], pd.Dataframe]) – A list of prediction outputs. outputs[i] should be the application output for the prediction with features inputs[i].
feedbacks (Union[List[dict], pd.DataFrame]) – A list of feedbacks. feedbacks[i] is a dict of the features for the i-th prediction to be logged.
ignore_inputs (optional, List[str]) – A list of names of input features that should not be monitored.
timestamps (optional, Union[List[datetime.datetime], pd.DatetimeIndex, np.array[datetime.datetime]) – A list of prediction timestamps. If specified, timestamps[i] should be the timestamps for the i-th prediction. If timestamps = None (default), then the prediction timestamp defaults to the time when log_records is called.
sort_on_timestamp (bool, defaults to True) – Works when timestamps are provided. Sort using the given timestamp. Default to True.
sample_rate – Used for down-sampling. The probability as a float that each record will be sent to Gantry.
as_batch (bool, defaults to False) – Whether to add batch metadata and tracking in the ‘batch’ section of the dashboard
tags (optional, Optional[Union[Dict[str, str], List[Dict[str, str]]]) – A tag is a label that you assign to your data. E.g. you can specify which environment the data belongs to by setting “env” tag like this tags = {“env”: “environment1”} if not assigned we will use Gantry client’s environment value as the defaults environment. If this parameter is a dict, tags will apply to all records. Alternatively, you can pass a list of dicts to apply tags to each record independantly. IMPORTANT: this parameter is in deprecation mode and it will be removed soon. Use row_tags and global_tags instead.
join_keys (optional, Union[List[str], pd.Series[str]]) – provide keys to identify each record. These keys can be used later to provide feedback. If not provided, a random record key will be generated for each record.
row_tags (optional, Optional[Union[List[Dict[str, str]], pd.DataFrame, pd.Series, np.ndarray]]) –
Specify row level tags. If provided, this parameter should be a list of tags to be applied to each of the records. row_tags[i] will contain a dictionary of strings containing the tags to attach to the i-th record. Alternatively, tags may be specified by passing in a DataFrame, Series, or Array, like inputs.
For batch global tags, use the ‘global_tags’ parameter instead.
global_tags (optional, Dict[str, str]) – Specify a set of tags that will be attached to all ingested records from this batch. For record specific tags, you can use ‘row_tags’ param. Only used when log is not called within a run.
run_id (optional, str) – This should never be provided by user. It will be populated automatically when logging within a run to group records together.
run_tags (optional, Dict[str, str]) – This should never be provided by user. It will be populated automatically when logging within a run to provide global tags for all records in the run.
Returns –
Tuple[batch_id – The batch_id will be None if records are not
list[join_keys]] – The batch_id will be None if records are not
keys. (logged as batch. The list of join_keys will be the records) –
- gantry.log_from_data_connector(application: str, source_data_connector_name: str, timestamp: Optional[str] = None, inputs: Optional[List[str]] = None, outputs: Optional[List[str]] = None, feedbacks: Optional[List[str]] = None, join_key: Optional[str] = None, row_tags: Optional[List[str]] = None, global_tags: Optional[Dict[str, str]] = None, schedule: Optional[gantry.logger.types.Schedule] = None, pipeline_name: Optional[str] = None) gantry.logger.types.IngestFromDataConnectorResponse #
Alias for
gantry.logger.client.Gantry.log_from_data_connector()
Function to ingest source tabular records from a registered source data connector into Gantry.
To log predictions using this function, both column names of inputs and outputs must be passed. To log feedback using this function, both column names of join_key and feedback must be passed.
Example:
# Record an application's predictions. gantry.log_from_data_connector( application="foobar", source_data_connector_name="my_snowflake_connector", inputs=["column_A", "column_B"], outputs=["column_C"], timestamp="column_T", global_tags = {"env":"dev", "version": "1.0.0"}, row_tags=["column_D"], join_key="column_E", ) # Record an application's feedbacks. # to a previously ingested prediction. gantry.log_from_data_connector( application="foobar", source_data_connector_name="my_snowflake_connector", feedbacks=["column_E", "column_F"], timestamp="column_T", global_tags = {"env":"dev", "version": "1.0.0"}, row_tags=["column_D"], join_key="column_E", ) # Trigger scheduled ingestion every 8 hours from a data connector incrementally. from gantry.logger.types import ( Schedule, ScheduleFrequency, ScheduleType, ScheduleOptions, ) gantry.log_from_data_connector( application="foobar", source_data_connector_name="my_snowflake_connector", inputs=["column_A", "column_B"], outputs=["column_C"], timestamp="column_T", global_tags = {"env":"dev", "version": "1.0.0"}, row_tags=["column_D"], join_key="column_E", schedule=Schedule( start_on="2023-01-14T17:00:00.000000", frequency=ScheduleFrequency.EVERY_8_HOURS, type=ScheduleType.INCREMENTAL_APPEND, options=ScheduleOptions(watermark_key="column_T"), ) ) # If the data are expected to arrive late in your source table/view, use delay_time # to specify how long to wait before triggering the scheduled ingestion. gantry.log_from_data_connector( application="foobar", source_data_connector_name="my_snowflake_connector", inputs=["column_A", "column_B"], outputs=["column_C"], timestamp="column_T", global_tags = {"env":"dev", "version": "1.0.0"}, row_tags=["column_D"], join_key="column_E", schedule=Schedule( start_on="2023-01-14T17:00:00.000000", frequency=ScheduleFrequency.EVERY_8_HOURS, type=ScheduleType.INCREMENTAL_APPEND, options=ScheduleOptions( delay_time=300, # Delay 5 minutes before triggering ingestion. watermark_key="column_T" ), ) )
- Parameters
application (str) – Name of the application to ingest data into
source_data_connector_name (str) – Name of the registered source data connector to ingest data from.
timestamp (optional, str) – by default, the timestamp values will be filled with the timestamp at ingestion. Set this parameter to reference a different column.
inputs (optional, list[str]) – A list of column names for inputs.
outputs (optional, list[str]) – A list ofcolumn names for outputs.
feedbacks (optional, list[str]) – A list of column names for feedbacks.
join_key (optional, str) – A column name to use as the join key to identify each record.
row_tags (optional, list[str]) – A list of column names to use the values of as tags for each row.
global_tags (optional, list[dict[str, str]]) – A list of dictionaries of string key and value pairs to tag the entire ingestion from this data connector
schedule (optional, Schedule) – An optional parameter to schedule the ingestion.
pipeline_name (optional, str) – An optional parameter to specify the ingestion pipeline’s name
- gantry.ping() bool #
Alias for
gantry.logger.client.Gantry.ping()
Pings the log store API server to check if it is alive. Returns True if alive, False if there is an error during ping process.
- gantry.ready() bool #
Alias for
gantry.logger.client.Gantry.ready()
Checks if the configured API key authenticates with the API. Returns True if ready, False otherwise.
- class gantry.JoinKey#
- classmethod from_dict(dict_: Dict) str #
Utility function to retrieve a deterministic value from a dictionary. Intended to be used for setting join_key when logging to Gantry.
- Parameters
dict (Dict) – The dictionary from which the join key should be generated
- Returns
The generated join key
- Return type
str
- gantry.create_application(application_name: str, application_type: Optional[str] = None) gantry.applications.core.Application #
Alias for
gantry.applications.client.ApplicationClient.create_application()
Create an Application object by name and type. If the application already exists, an exception will be raised. If your organization does not allow you to create an application of a certain type, there will be an exception raised as well.
Example usage:
import gantry application = gantry.create_application("demo", "Custom")
- Parameters
application_name (str) – the name of the application.
application_type (str, optional) – the type of the application. Defaults to Custom.
type (You have the following options for application) – Custom Chat Completion
- Returns
An object representing the application.
- Return type
- gantry.get_application(application_name: str) gantry.applications.core.Application #
Alias for
gantry.applications.client.ApplicationClient.get_application()
Get an Application object by name.
Example usage:
import gantry application = gantry.get_application("demo")
- Parameters
application_name (str) – the name of the application.
- Returns
An object representing the application.
- Return type
- gantry.delete_application(application_name: str) None #
Alias for
gantry.applications.client.ApplicationClient.delete_application()
Hard delete an Application.
Example usage:
import gantry application = gantry.delete_application("demo")
- Parameters
application_name (str) – the name of the application.
- Returns
None
- gantry.archive_application(application_name: str) None #
Alias for
gantry.applications.client.ApplicationClient.archive_application()
Archive an Application.
Example usage:
import gantry application = gantry.archive_application("demo")
- Parameters
application_name (str) – the name of the application.
- Returns
None
Automations#
An automation consists of a trigger and an action.
- class gantry.automations.Automation(name: str, trigger: gantry.automations.triggers.triggers.Trigger, action: gantry.automations.actions.actions.Action, api_client: Optional[gantry.api_client.APIClient] = None, application: Optional[str] = None)#
Automation is the process of automatically executing tasks. It consists of 2 main components: Trigger and Action.
Trigger is the condition that, when met, cause the Action to be executed.
Currently, we are supporting:
IntervalTrigger: based on time intervals & delayss
AggregationTrigger: same idea of Alert on the UI. It tracks if an aggregation is outside
of the range for the evaluation window and triggers when conditions are met.
SendSlackMessage (Action): This action sets up a notification setting that will send
users a slack notification when executed by the trigger.
Curator (Action): This action creates a new curator.
- Note:
AggregationTrigger can only be paired with SendSlackMessage
IntervalTrigger can only be paired with Curator
To setup and start an automation process, there are a few steps:
- Define the trigger
from gantry.automations.triggers import AggregationTrigger from gantry.query.time_window import RelativeTimeWindow alert_trigger = AggregationTrigger( name = "alert-trigger", aggregation = "maximum", fields = ["inputs.speed"], lower_bound = 1, upper_bound = 5, evaluation_window=RelativeTimeWindow( window_length=datetime.timedelta(minutes=10) ), tags = None )
- Define the action
from gantry.automations.actions import SendSlackMessage # Set up action to send slack message to the specified webhook. slack_action = SendSlackMessage( name = "demo-slack", webhook_url="SLACK_WEBHOOK_URL" )
- Define the automation and put trigger & action together
from gantry.automations.automations import Automation # Define automation object and put trigger and action together. automation_alert = Automation( name = "alert-automation", trigger=alert_trigger, action=slack_action )
- Add the automation to an application
app.add_automation(automation_alert)
- When you are done, stop the automation process
automation_alert.stop()
- __init__(name: str, trigger: gantry.automations.triggers.triggers.Trigger, action: gantry.automations.actions.actions.Action, api_client: Optional[gantry.api_client.APIClient] = None, application: Optional[str] = None)#
Initialize an automation object.
- Parameters
name (str) – the name of the automation.
trigger (
gantry.automations.triggers.Trigger
) – the trigger of the automation.action (
gantry.automations.actions.Action
) – the action of the automation.
- stop()#
Stop the automation process and delete all relevant actions.
Triggers#
A way of broadcasting certain events that take place within Gantry, such as data threshold breach.
- class gantry.automations.triggers.AggregationTrigger(name: str, aggregation: gantry.alerts.client.AlertsAggregation, fields: List[str], lower_bound: float, upper_bound: float, evaluation_window: gantry.query.time_window.RelativeTimeWindow, tags: Optional[Dict] = None)#
- __init__(name: str, aggregation: gantry.alerts.client.AlertsAggregation, fields: List[str], lower_bound: float, upper_bound: float, evaluation_window: gantry.query.time_window.RelativeTimeWindow, tags: Optional[Dict] = None)#
Initialize an aggregation trigger.
- Parameters
name (str) – the name of the trigger.
aggregation (AlertsAggregation) – the aggregation of the trigger.
fields (List[str]) – the fields of the trigger.
lower_bound (float) – the lower bound of the field.
upper_bound (float) – the upper bound of the field.
evaluation_window (
gantry.query.time_window.RelativeTimeWindow
) – evaluation window for the trigger.tags (Optional[Dict]) – the tags of the trigger. Defaults to None.
- class gantry.automations.triggers.IntervalTrigger(start_on: datetime.datetime, interval: datetime.timedelta, delay: datetime.timedelta = datetime.timedelta(0))#
- __init__(start_on: datetime.datetime, interval: datetime.timedelta, delay: datetime.timedelta = datetime.timedelta(0))#
Initialize an interval trigger.
- Parameters
start_on (datetime.datetime) – the start time of the interval.
interval (datetime.timedelta) – the interval of the trigger.
delay (datetime.timedelta) – the delay of the trigger. Defaults to 0.
- class gantry.automations.triggers.QueriesAggregationTrigger(name: str, compare_fields: List[str], compare_aggregation: gantry.alerts.client.AlertsAggregation, trigger_range_type: str, queries: List[gantry.query.core.dataframe.GantryDataFrame], query_aggregation: List[gantry.alerts.client.AlertsAggregation], lower_bound: Optional[float] = None, upper_bound: Optional[float] = None)#
- __init__(name: str, compare_fields: List[str], compare_aggregation: gantry.alerts.client.AlertsAggregation, trigger_range_type: str, queries: List[gantry.query.core.dataframe.GantryDataFrame], query_aggregation: List[gantry.alerts.client.AlertsAggregation], lower_bound: Optional[float] = None, upper_bound: Optional[float] = None)#
Initialize a trigger based on multiple queries. As of now, we only support trigger based on 2 queries.
Example:
Say I want to setup a trigger that:
Calculate max(inputs.latitude) of query1 and query2.
Calculate the difference between the 2 max values, as in:
max(inputs.latitude) of query1 - max(inputs.latitude) of query2.
Trigger an alert if the difference is within a range [0.1, 1.0].
first_query = app.get_query("first_query") second_query = app.get_query("second_query") query_trigger_alert = QueriesAggregationTrigger( name = "queries-trigger", compare_aggregation = AlertsAggregation.TOTAL_DIFFERENCE, compare_fields = ["inputs.latitude"], queries = [first_query, second_query], query_aggregation=[AlertsAggregation.MAX, AlertsAggregation.MAX], lower_bound = 0.1, upper_bound = 1.0, trigger_range_type="within", )
- Parameters
name (str) – the name of the trigger.
compare_fields – (List[str]) The fields to calculate aggregated difference on between queries. As of now, we only support 1 field.
compare_aggregation (AlertsAggregation) – the aggregation of the difference between aggregated query values.
lower_bound (float) – the lower bound (inclusive) of the aggregated difference.
upper_bound (float) – the upper bound (inclusive) of the aggregated difference.
trigger_range_type (str) –
the type of the trigger range. The options are:
”within”: triggered when aggregated difference is within the bounds
”outside”: triggered when aggregated difference is outside the bounds
”above”: triggered when aggregated difference is above the upper bound
”below”: triggered when aggregated difference is below the lower bound
queries (List[GantryDataFrame]) – the list of queries (in order) to compare.
query_aggregation (List[AlertsAggregation]) – the aggregation of the queries.
Actions#
Actions that can be carried out when triggers fire, including Slack notifications.
- class gantry.automations.actions.Action#
Parent class for all actions.
- class gantry.automations.actions.SendSlackMessage(name: str, webhook_url: str, id: Optional[str] = None, notify_daily: bool = False, alert_id: Optional[str] = None)#
- __init__(name: str, webhook_url: str, id: Optional[str] = None, notify_daily: bool = False, alert_id: Optional[str] = None)#
Initialize a slack notification action.
- Parameters
name (str) – the name of the slack notification.
webhook_url (str) – the webhook url of the slack channel.
notify_daily (bool) – If true, send notification daily. Otherwise, send immediately.
Curators#
Curators are a type of automation for collecting data from your production data stream.
- gantry.automations.curators.get_curator(name: str) Curator #
Alias for
gantry.automations.curators.curators.CuratorClient.get_curator()
Get a curator by name. If the curator does not exist, an exception will be raised.
- Parameters
name (str) – the name of the curator to be retrieved.
- Returns
A Curator object representing the curator corresponding to the provided name.
- Return type
- gantry.automations.curators.get_all_curators(application_name: Optional[str] = None) List[Curator] #
Alias for
gantry.automations.curators.curators.CuratorClient.get_all_curators()
Returns a list of the curators in the organization if no
application_name
is passed, and all the curators associated with the provided application if a name is passed.- Parameters
application_name (str, optional) – defaults to
None
, application must exist.- Returns
A list of the curators either in the organization or for a specific application.
- Return type
List[Curator]
- gantry.automations.curators.list_curators(application_name: Optional[str] = None) List[str] #
Alias for
gantry.automations.curators.curators.CuratorClient.list_curators()
Returns a list of the names of all the curators in the organization if no
application_name
is passed, and all the curators associated with the provided application if a name is passed.- Parameters
application_name (str, optional) – defaults to
None
, application must exist.- Returns
A list of the curators either in the organization or for a specific application.
- Return type
List[str]
- class gantry.automations.curators.Curator(name: str, application_name: str, api_client: Optional[gantry.api_client.APIClient] = None, id: Optional[uuid.UUID] = None, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curation_delay: datetime.timedelta = datetime.timedelta(0), curate_past_intervals: bool = True, created_at: Optional[datetime.datetime] = None, selectors: Optional[List[gantry.automations.curators.selectors.Selector]] = None)#
A class representing a user defined curator, where the user provides the selection criteria using a list of selectors, as well as other metadata needed to create and manage the creation jobs.
- __init__(name: str, application_name: str, api_client: Optional[gantry.api_client.APIClient] = None, id: Optional[uuid.UUID] = None, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curation_delay: datetime.timedelta = datetime.timedelta(0), curate_past_intervals: bool = True, created_at: Optional[datetime.datetime] = None, selectors: Optional[List[gantry.automations.curators.selectors.Selector]] = None)#
A Curator defines a process that runs periodically to curate a dataset. It uses selectors to select rows from the dataset and write them to a curated dataset. The curated dataset is a new dataset that is created by the curator. The curator will create the curated dataset if it does not already exist, or update it if it does exist.f
Instantiating a curator does not create the curator in Gantry. To create the curator, call Curator.create after instantiating the curator. Curators that have been created in Gantry can be instantiated as a Curator object using Curator.from_curator_info, although it is recommended to use the module-level get_curator function instead.
Curators can be updated using Curator.update. This will update the curator in Gantry and also update the curator object in memory.
Curators can be deleted using Curator.delete. This will delete the curator in Gantry but will not delete the curator object in memory.
- Parameters
api_client (Optional[APIClient], optional) – APIClient to use. Defaults to None, in which case the Curator will use the global APIClient.
id (Optional[uuid.UUID], optional) – The id of the curator. Defaults to None. The id only exists in Gantry for curators that have been created. Typically, you will not need to set this. But some methods, such as Curator.from_curator_info, will set this for you.
str (name) – The name of the curator.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset. Defaults to None. If None, the curated dataset name will be the same as the curator name.
application_name (str) – The name of the application that the curator is running in.
start_on (Optional[datetime.datetime], optional) – The time at which the curator should start curating. Defaults to None. If None, the curator will start curating immediately, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval at which the curator should curate. Defaults to datetime.timedelta(days=1).
curate_past_intervals (bool, optional) – Whether the curator should curate past intervals. Defaults to True. If True, the curator will immediately curate all intervals that have passed since the start_on time.
created_at (Optional[datetime.datetime], optional) – The time at which the curator was created. Defaults to None. The created_at time only exists in Gantry for curators that have been created. Typically, you will not need to set this. But some methods, such as Curator.from_curator_info, will set this for you.
selectors (Optional[List[Selector]], optional) – The selectors that the curator should use to curate. Defaults to None. If None, the request to create the curator will fail.
- property enabled: bool#
Enabling or disabling must be done via the .enable() or .disable() methods
- update_curation_start_on(start_on: datetime.datetime) None #
Update the curation start on time. :param start_on: The new start on time :type start_on: datetime.datetime
- update_curation_interval(curation_interval: datetime.timedelta) None #
Update the curation interval. :param curation_interval: The new curation interval :type curation_interval: datetime.timedelta
- update_curation_delay(curation_delay: datetime.timedelta) None #
Update the curation delay. :param curation_delay: The new curation delay :type curation_delay: datetime.timedelta
- start() None #
Trigger to create the curator. This function is called from Automation.
- stop() None #
Trigger to delete the curator. This function is called from Automation.
- create(enable=True) gantry.automations.curators.curators.Curator #
Creates the Curator. By default, the Curator is also enabled upon creation. Use the enable parameter to change this
Once created, the Curator will start curating the dataset according to the curation_interval. If curate_past_intervals is set to True, it will curate all past intervals and continue curating in the future. The Curator will exist in Gantry until it is deleted.
The results of the curation will be stored in a dataset with the name curated_dataset_name. If the dataset does not exist, it will be created. If it does exist, it will be appended to. Curation commits will be made by the user with the api key used when initializing gantry services.
The curated dataset can be accessed using the get_curated_dataset method..
- Parameters
enable (Optional[bool]) – Whether to automatically enable the curator after creation. Defaults to True.
- Returns
The created curator.
- Return type
- update(new_curator_name: Optional[str] = None, new_curated_dataset_name: Optional[str] = None, new_curation_interval: Optional[datetime.timedelta] = None, new_selectors: Optional[List[gantry.automations.curators.selectors.Selector]] = None, create_new_dataset: bool = False) gantry.automations.curators.curators.Curator #
Updates the curator. At least one of the update parameters must be provided. If a parameter is not provided, it will not be updated.
- Parameters
new_curator_name (Optional[str], optional) – The new name of the curator. Defaults to None.
new_curated_dataset_name (Optional[str], optional) – The name of the new dataset to curate to. Defaults to None.
new_curation_interval (Optional[datetime.timedelta], optional) – The new curation interval. Defaults to None.
new_selectors (Optional[List[Selector]], optional) – The new selectors. Defaults to None.
create_new_dataset (bool, optional) – Whether to create a new dataset if the requested name does not correspond to an existing dataset. Defaults to False.
- Raises
ValueError – If curator ID is not set.
ValueError – If no update parameters are provided.
- Returns
The updated curator.
- Return type
- delete() str #
Deletes the curator using the curator id. The Curator object is not deleted, but the curator id is reset to None, as well as the created_at time.
- Raises
ValueError – If curator id is not set.
- Returns
- A message indicating the curator was deleted. The message also contains the
time the curator was deleted.
- Return type
str
- get_curated_dataset() gantry.dataset.gantry_dataset.GantryDataset #
Gets the curated dataset associated with the Curator.
- Returns
A GantryDataset object representing the curated dataset.
- Return type
- enable() str #
Enable Curator - Gantry will resume curating data using this Curator. Intervals that have elapsed since the curator was disabled will be backfilled.
- disable() str #
Disable Curator - Gantry will stop curating data using this Curator.
- class gantry.automations.curators.AscendedSortCurator(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that sorts the application field specified by sort_field in ascending order and adds up to the top limit results to curated_dataset_name every curation_interval.
- __init__(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
sort_field (str) – The field to sort.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.BalancedStratificationCurator(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that builds a balanced stratification of application records according to the field stratify_field and adds up to limit results to curated_dataset_name.
Currently, this curator only supports stratification of categorical fields. And the limit per stratum is determineed by the stratum with the fewest records in the current curation_interval.
- __init__(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
stratify_field (str) – The field to stratify.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.BoundedRangeCurator(name: str, application_name: str, limit: int, bound_field: str, upper_bound: float, lower_bound: float, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True, sort_ascending: bool = True)#
Stock Curator that strictly bounds the application field specified by bound_field between upper_bound and lower_bound and adds up to limit results to curated_dataset_name every curation_interval. If the curation returns more results than limit, the results are sorted in ascending order by bound_field when sort_ascending is True (default) and in descending order when sort_ascending is False.
- __init__(name: str, application_name: str, limit: int, bound_field: str, upper_bound: float, lower_bound: float, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True, sort_ascending: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
bound_field (str) – The field to bound.
upper_bound (float) – Upper bound on the field.
lower_bound (float) – Lower bound on the field.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
sort_ascending (bool, optional) – Whether or not to sort the results in ascending order. Defaults to True.
- class gantry.automations.curators.DescendedSortCurator(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that sorts the application field specified by sort_field in descending order and adds up to the top limit results to curated_dataset_name every curation_interval.
- __init__(name: str, application_name: str, limit: int, sort_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
sort_field (str) – The field to sort.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.NewestCurator(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that sorts the application records in descending time-order according to the internally logged Gantry timestamp, __TIME, and adds up to the top limit results to curated_dataset_name every curation_interval.
- __init__(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.OldestCurator(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that sorts the application records in ascending time-order according to the internally logged Gantry timestamp, __TIME, and adds up to the top limit results to curated_dataset_name every curation_interval.
- __init__(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.ProportionalStratificationCurator(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that builds a proportional stratification of application records according to the field stratify_field and adds up to limit results to curated_dataset_name.
The proportion of records in each stratum is respected, but the limit per curation may be violated, unlike the StrictStratificationCurator, which will respect the limit at the (potential) cost of violating the proportion of records in each stratum.
Additionally, it is worth noting that proportions are rounded to even numbers, so 1.5 will be rounded to 2, and 0.5 will be rounded to 0.
Example
Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.
If the limit is 10, then the curator will return 11 records, with 5 records with the value “A” for the field, 2 records with the value “B” for the field, 2 records with the value “C” for the field, 2 records with the value “D” for the field, and 0 records with the value “E”.
- __init__(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
stratify_field (str) – The field to stratify.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.StrictStratificationCurator(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that builds a strict stratification of application records according to the field stratify_field and adds up to limit results to curated_dataset_name.
The limit of records per curation is respected, but the proportion of records in each stratum may be violated, unlike the ProportionalStratificationCurator, which will respect the proportion at the (potential) cost of violating the limit per curation.
Additionally, it is worth noting that the number of records to be added to each stratum is calulated exactly, leading to poentially fractional assignments. The integer part of the assignment is taken, and the total remainder is added, up to the limit, randomly.
Example
Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.
If the limit is 10, then the curator will return 10 records, with 5 records with the value “A” for the field, 1 record with “B” for the field, 1 record with “C” for the field, 1 record with “D” for the field, 0 records with “E” for the field, and the remaining 2 records will be randomly selected from the remaining records.
- __init__(name: str, application_name: str, limit: int, stratify_field: str, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
stratify_field (str) – The field to stratify.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
- class gantry.automations.curators.UniformCurator(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
Stock Curator that uniformly samples application records and adds up to the top limit results to curated_dataset_name every curation_interval.
- __init__(name: str, application_name: str, limit: int, curated_dataset_name: Optional[str] = None, start_on: Optional[datetime.datetime] = None, curation_interval: datetime.timedelta = datetime.timedelta(days=1), curate_past_intervals: bool = True)#
- Parameters
name (str) – The name of the curator.
application_name (str) – The name of the application that the curator is running in.
limit (int) – The maximum number of results to return.
curated_dataset_name (Optional[str], optional) – The name of the curated dataset that the curator will write to. Defaults to None, in which case the name of the curator will be used.
start_on (Optional[datetime.datetime], optional) – The datetime to start the curator. Defaults to None, in which case the curator will start now, looking back one curation_interval.
curation_interval (datetime.timedelta, optional) – The interval of time considered
datetime.timedelta (during each run of the curator. Defaults to) –
curate_past_intervals (bool, optional) – Whether or not to curate past intervals when
True. (start_on is in the past. Defaults to) –
Selectors and Filters#
A simple API for telling curators which data to sample when they run.
- class gantry.automations.curators.selectors.OrderingDirection(value)#
Direction specifiying how to order a sort.
- ASCENDING = 'ascending'#
- DESCENDING = 'descending'#
- class gantry.automations.curators.selectors.SamplingMethod(value)#
Sampling method to use when selecting a subset of data.
Selected data will be sampled using one of the methods enumerated here.
- uniform = 'uniform'#
- ordered = 'ordered'#
- stratified = 'stratified'#
- class gantry.automations.curators.selectors.StratificationMode(value)#
Stratification mode to use when selecting a subset of data. Currently, stratification is only supported for categorical data.
Selected data will be stratified using one of the modes enumerated here.
In “strict” mode, the limit of records per curation is respected, but the proportion of records in each stratum may be violated, unlike the ProportionalStratificationCurator, which will respect the proportion at the (potential) cost of violating the limit per curation.
Additionally, it is worth noting that the number of records to be added to each stratum is calulated exactly, leading to poentially fractional assignments. The integer part of the assignment is taken, and the total remainder is added, up to the limit, randomly.
Example
Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.
If the limit is 10, then the curator will return 10 records, with 5 records with the value “A” for the field, 1 record with “B” for the field, 1 record with “C” for the field, 1 record with “D” for the field, 0 records with “E” for the field, and the remaining 2 records will be randomly selected from the remaining records.
In “proportional” mode, the proportion of records in each stratum is respected, but the limit per curation may be violated, unlike the StrictStratificationCurator, which will respect the limit at the (potential) cost of violating the proportion of records in each stratum.
Additionally, it is worth noting that proportions are rounded to even numbers, so 1.5 will be rounded to 2, and 0.5 will be rounded to 0.
Example
Let’s say there are 100 records: 51 records with the value “A” for the field, 15 records with the value “B” for the field, and 16 records with the value “C” for the field, 17 records with the value “D” for the field, and 1 record with the value “E”.
If the limit is 10, then the curator will return 11 records, with 5 records with the value “A” for the field, 2 records with the value “B” for the field, 2 records with the value “C” for the field, 2 records with the value “D” for the field, and 0 records with the value “E”.
In “balanced” mode, the number of records per stratum is balanced with respect to the least represented stratum.
- strict = 'strict'#
- proportional = 'proportional'#
- balanced = 'balanced'#
- class gantry.automations.curators.selectors.TagFilter(*, name: str, value: str)#
Filter for tags
- name: str#
- value: str#
- class gantry.automations.curators.selectors.Filter(*, field: str)#
Base class for filters. All filters must have a field.
- field: str#
- class gantry.automations.curators.selectors.BoundsFilter(*, field: str, upper: Optional[Union[float, int]] = None, inclusive_upper: Optional[Union[float, int]] = None, lower: Optional[Union[float, int]] = None, inclusive_lower: Optional[Union[float, int]] = None)#
Filter for bounds. Must have either an upper or lower bound; both can be specified. Default bounds are exclusive, but inclusive bounds can be given instead.
- upper: Optional[Union[float, int]]#
- inclusive_upper: Optional[Union[float, int]]#
- lower: Optional[Union[float, int]]#
- inclusive_lower: Optional[Union[float, int]]#
- classmethod validate_bounds(values)#
- field: str#
- class gantry.automations.curators.selectors.EqualsFilter(*, field: str, equals: Union[bool, float, int, str])#
Filter for equality. Must have an equals field.
The equals field can be a boolean, string, float, or int.
- equals: Union[bool, float, int, str]#
- field: str#
- class gantry.automations.curators.selectors.ContainsFilter(*, field: str, contains: str)#
Filter for string containment. The specified field must be a string.
- contains: str#
- field: str#
- class gantry.automations.curators.selectors.Sampler#
Base class for samplers.
- class gantry.automations.curators.selectors.UniformSampler(*, sample: typing.Literal[<SamplingMethod.uniform: 'uniform'>] = SamplingMethod.uniform)#
Sampler for uniform sampling.
This sampler will select a subset of the data uniformly at random.
- sample: Literal[<SamplingMethod.uniform: 'uniform'>]#
- class gantry.automations.curators.selectors.OrderedSampler(*, sample: typing.Literal[<SamplingMethod.ordered: 'ordered'>] = SamplingMethod.ordered, field: str, sort: typing.Literal[<OrderingDirection.ASCENDING: 'ascending'>, <OrderingDirection.DESCENDING: 'descending'>])#
Sampler for ordered sampling.
This sampler will select a subset of the data ordered by the specified field. Fields can be ordered in ascending or descending order.
- sample: Literal[<SamplingMethod.ordered: 'ordered'>]#
- field: str#
- sort: Literal[<OrderingDirection.ASCENDING: 'ascending'>, <OrderingDirection.DESCENDING: 'descending'>]#
- class gantry.automations.curators.selectors.StratifiedSampler(*, field: str, sample: typing.Literal[<SamplingMethod.stratified: 'stratified'>] = SamplingMethod.stratified, mode: typing.Literal[<StratificationMode.strict: 'strict'>, <StratificationMode.proportional: 'proportional'>, <StratificationMode.balanced: 'balanced'>] = StratificationMode.proportional)#
Sampler for stratified sampling.
This sampler will select a subset of the data stratified by the specified field. Stratification is only supported for categorical data. The mode specifies how to stratify the data. See the documentation for StratificationMode for more details.
- field: str#
- sample: Literal[<SamplingMethod.stratified: 'stratified'>]#
- mode: Literal[<StratificationMode.strict: 'strict'>, <StratificationMode.proportional: 'proportional'>, <StratificationMode.balanced: 'balanced'>]#
- class gantry.automations.curators.selectors.Selector(*, method: typing.Union[gantry.automations.curators.selectors.OrderedSampler, gantry.automations.curators.selectors.UniformSampler, gantry.automations.curators.selectors.StratifiedSampler] = OrderedSampler(sample=<SamplingMethod.ordered: 'ordered'>, field='__time', sort=<OrderingDirection.DESCENDING: 'descending'>), limit: int, filters: typing.List[typing.Union[gantry.automations.curators.selectors.BoundsFilter, gantry.automations.curators.selectors.EqualsFilter, gantry.automations.curators.selectors.ContainsFilter]] = [], tags: typing.List[gantry.automations.curators.selectors.TagFilter] = [])#
Base class for selectors. All selectors must have a method and a limit. The method specifies how to select a subset of the data, and the limit specifies the number of records to select. The filters field specifies any filters to apply to the data before selecting a subset.
- method: Union[gantry.automations.curators.selectors.OrderedSampler, gantry.automations.curators.selectors.UniformSampler, gantry.automations.curators.selectors.StratifiedSampler]#
- limit: int#
- filters: List[Union[gantry.automations.curators.selectors.BoundsFilter, gantry.automations.curators.selectors.EqualsFilter, gantry.automations.curators.selectors.ContainsFilter]]#
- tags: List[gantry.automations.curators.selectors.TagFilter]#
- classmethod validate_limit(value)#
Alerts Aggregations#
- class gantry.alerts.client.AlertsAggregation(value)#
Allowed aggregations for Gantry Alerts.
The full list of aggregations are:
TOTAL_DIFFERENCE PERCENT_DIFFERENCE TOTAL MAX MIN MEAN STD PDF QUANTILES PERCENT_NULL PERCENT_TRUE PERCENT_FALSE PERCENT_TRUE_NOT_NULL PERCENT_FALSE_NOT_NULL CATEGORY_PERCENTS ACCURACY MAE MSE MAPE MAX_ERROR CONFUSION_MATRIX F1 R2 PRECISION RECALL ROC_AUC_SCORE PR_AUC_SCORE D1 DINF KL KS CATEGORICAL_D1 CATEGORICAL_DINF CATEGORICAL_KL X2 ROC_CURVE_PLOT PR_CURVE_PLOT DISTRIBUTION_SKETCH CATEGORICAL_SKETCH
DataFrame and Series#
Learn about Gantry’s abstractions for querying data from your production data stream.
- class gantry.query.core.dataframe.GantryDataFrame(api_client: gantry.api_client.APIClient, application: str, start_time: Union[str, datetime.datetime], end_time: Union[str, datetime.datetime], version: Optional[Union[str, int]] = None, env: Optional[str] = None, filters: Optional[List[dict]] = None, tags: Optional[dict] = None, relative_time_window: Optional[datetime.timedelta] = None, relative_time_delay: Optional[datetime.timedelta] = None)#
Two-dimensional dict that supports querying Gantry data.
- property dtypes: pandas.core.series.Series#
Return the column types
- head(n: int = 5) pandas.core.frame.DataFrame #
Fetch the top n entries. Results will include an additional column, join_id, whose values can be used to log and join feedback events to your Gantry application.
- Parameters
n (int, defaults to 5) – Number of entries to fetch
- Returns
A pandas dataframe object.
- tail(n: int = 5) pandas.core.frame.DataFrame #
Fetch the last n entries. Results will include an additional column, join_id, whose values can be used to log and join feedback events to your Gantry application.
- Parameters
n (int, defaults to 5) – Number of entries to fetch
- Returns
A pandas dataframe object.
- fetch() pandas.core.frame.DataFrame #
Fetch all entries. Results will include an additional column, join_id, whose values can be used to log and join feedback events to your Gantry application.
- Returns
A pandas dataframe object.
- describe() pandas.core.frame.DataFrame #
Print basic stats on the dataframe.
- class gantry.query.core.dataframe.GantrySeries(name: str, id: str, datatype: str, series_type: str, parent_dataframe: gantry.query.core.dataframe.GantryDataFrame)#
Gantry Series object, similar to Pandas Series.
Operations on Gantry Series return objects that can be used as masks for Gantry Series or Gantry Dataframes.
# Operations on series return masks mask = some_series.contains("apples") apples_series = some_series[mask] apples_series.fetch()
- isin(other: List[str])#
For string Series, whether an entry is in a list of strings.
- Parameters
other (list[str]) – the list of strings
- contains(other: str)#
For string Series, whether an entry contains a string
- Parameters
other (str) – string to compare
- notnull()#
Filters out null values.
- notna()#
Alias for notnull
- isnull()#
Filters out non null values (ie, null values remain)
- isna()#
Alias for isnull
- head(n: int = 5) pandas.core.frame.DataFrame #
Fetch the top n entries
- Parameters
n (int, defaults to 5) – Number of entries to fetch
- Returns
A pandas dataframe object.
- tail(n: int = 5) pandas.core.frame.DataFrame #
Fetch the last n entries
- Parameters
n (int, defaults to 5) – Number of entries to fetch
- Returns
A pandas dataframe object.
- fetch() pandas.core.frame.DataFrame #
Fetch all entries
- Returns
A pandas dataframe object.
- mean(num_points: int = 1, group_by: Optional[str] = None) Union[float, pandas.core.frame.DataFrame] #
Runs on int and float series only. Get the computed average of this GantrySeries, if available
- Parameters
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
Float with mean if num_points=1 and group_by=None (the default values), else a pd.DataFrame.
- std(num_points: int = 1, group_by: Optional[str] = None) Union[float, pandas.core.frame.DataFrame] #
Runs on int and float series only. Get the standard deviation for this GantrySeries, if available
- Parameters
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
Float with std if num_points=1 and group_by=None (the default values), else a pd.DataFrame.
- median(num_points: int = 1) Union[float, pandas.core.frame.DataFrame] #
Runs on numeric series only. Get the median for this GantrySeries, if available
- Parameters
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
Float with median in case num_points=1 (the default value), else a pd.DataFrame.
- count(num_points: int = 1, group_by: Optional[str] = None, limit: int = 20) Union[float, pandas.core.frame.DataFrame] #
Get the number of available data points for this GantrySeries, if available. Works on all series types.
- Parameters
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
limit (int, defaults to 20) – maximum number of unique categories to return. If you have many unique values, increase this number to > than number of unique categories. You can use the .unique() query to determine the number of categories and pass it.
- Returns
Float with count if num_points=1 and group_by=None (the default values), else a pd.DataFrame.
- min(num_points: int = 1, group_by: Optional[str] = None) Union[float, datetime.datetime, pandas.core.frame.DataFrame] #
Runs on int, float, and datetime series only. Get the minimum for this GantrySeries, if available.
- Parameters
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
Float with min value if num_points=1 and group_by=None (the default values), else a pd.DataFrame.
- max(num_points: int = 1, group_by: Optional[str] = None) Union[float, datetime.datetime, pandas.core.frame.DataFrame] #
Runs on int, float, and datetime series only. Get the maximum for this GantrySeries, if available.
- Parameters
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
Float with max value if num_points=1 and group_by=None (the default values), else a pd.DataFrame.
- histogram()#
Runs on bool, int, float and str series only. Get histogram of categories for this GantrySeries, if available.
Gets the histogram of percentages from [0, 1] for available data values for this GantrySeries.
- Returns
Dict[str, float] histogram
- unique() Set[str] #
Runs on bool and str series only. Get the unique values in this GantrySeries, if available.
- Returns
Set[str] List containing all the unique values that occur in this series.
- quantile(quantile_vals: List[float] = [0.5])#
Runs on int and float series only. Get quantiles for this GantrySeries, if available.
- Parameters
quantile_vals – list of requested quantiles. Float values in the list should be in the range [0, 1].
- Returns
- List[floats] of len(quantiles_vals) where the order of the outputs matches the input
quantile_vals order.
- pdf()#
Get requested probability density function for this GantrySeries, if available.
- cdf()#
Get requested cumulative density function for this GantrySeries, if available.
- describe() pandas.core.frame.DataFrame #
Runs on int, float, and bool series only. Return basic stats on the series.
- Returns
A pandas dataframe with summary information.
- percent_true(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#
Runs on boolean series only. Percent true, optionally drop null values.
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
- Float with true percentage if num_points=1 and group_by=None (by default),
else pd.DataFrame.
- percent_false(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#
Runs on boolean series only. Percent false, optionally drop null values.
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
- Float with false percentage if num_points=1 and group_by=None (by default),
else pd.DataFrame.
- percent_null(num_points: int = 1, group_by: Optional[str] = None)#
Runs on boolean series only. Percent null/NaN.
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
- Float with null percentage if num_points=1 and group_by=None (by default),
else pd.DataFrame.
- percent_true_not_null(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#
Runs on boolean series only. Percent true of not null values, optionally drop null values before calculating result.
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
- Float with true percentage if num_points=1 and group_by=None (by default),
else pd.DataFrame.
- percent_false_not_null(dropna: bool = False, num_points: int = 1, group_by: Optional[str] = None)#
Runs on boolean series only. Percent false of not null values, optionally drop null values before calculating result.
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
group_by (str, defaults to None) – column to use as group_by. If None then no group_by operation is performed and a single result is returned.
- Returns
- Float with false percentage if num_points=1 and group_by=None (by default),
else pd.DataFrame.
Datasets#
A lightweight data container with simple versioning semantics that integrates seamlessly with Gantry curators.
- class gantry.dataset.GantryDataset(api_client: gantry.api_client.APIClient, dataset_name: str, dataset_id: uuid.UUID, bucket_name: str, aws_region: str, dataset_s3_prefix: str, workspace: str)#
A class representing a lightweight container around your data that provides simple version control semantics for helping to make ML pipelines reproducible.
- list_versions() List[Dict[str, Any]] #
This method will list all the versions of the dataset. Each dataset version is a snapshot of the dataset at a particular point in time. The result will be sorted from latest to earliest.
Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.list_versions() [{'version_id': '7f100fca-f080-4daf-82d1-575e34234930', 'dataset': 'dataset_name', 'message': 'version notes', 'created_at': 'Thu, 16 Feb 2023 00:16:33 GMT', 'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06', 'is_latest_version': True}, {'version_id': '300cdb42-a8c9-4b6e-a16d-4f6c925eb25d', 'dataset': 'dataset_name', 'message': 'version notes', 'created_at': 'Thu, 16 Feb 2023 00:16:18 GMT', 'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06', 'is_latest_version': False}, {'version_id': '51e7771d-5ba9-4bdf-ab61-6d6ec9f7e066', 'dataset': 'dataset_name', 'message': 'initial dataset commit', 'created_at': 'Thu, 16 Feb 2023 00:14:34 GMT', 'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06', 'is_latest_version': False}]
- Returns
dataset versions from latest to earliest.
- Return type
List[Dict[str, Any]]
- pull(version_id: Optional[Union[str, uuid.UUID]] = None, forced: bool = False) Optional[Dict[str, Any]] #
Pull a specific version of the dataset from dataset server to local working directory.
If version ID is provided, this method will pull the snapshot of your dataset based on the version id to working directory. If version ID is not provided, this method will pull the latest version. If forced is set to True, any local changes will be discarded. This method will only change the files in your local dataset folder, pull an older version will not affect the version history.
Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) # pull the latest version of your dataset >>> dataset.pull() # pull a specific version of your dataset >>> dataset.pull("b787034a-798b-4bb3-a726-0e197ddb8aff")
- Parameters
version_id (str, optional) – target version ID, defaults to
None
which will pull the latest version.forced (bool) – whether to discard local changes or not when pulling, defaults to
False
.
- Returns
A dictionary with metadata representing the versioned rolled back to if successful,
None
otherwise.- Return type
Optional[Dict[str, Any]]
- get_diff() Dict[str, List[str]] #
Show the local changes that have not been pushed to the server yet. This method will return a dictionary with three keys:
new_files
,modified_files
, anddeleted_files
. Each key will have a list of files that have been added, modified, or deleted respectively.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.pull() # make some changes to the dataset >>> dataset.get_diff() {'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'], 'modified_files': ['tabular_manifests/modified_file_1.csv'], 'deleted_files': ['tabular_manifests/deleted_file.csv']}
- Returns
a dictionary representing the diff, which looks like this:
{ "new_files": List[str], "modified_files": List[str], "deleted_files": List[str], }
- Return type
dict
- push_version(message: str) Dict[str, Any] #
This method will create a new dataset version in the remote server with a version message. The new version will include all the local changes that have not been pushed. If there are no local changes, this method will return the current version.
To avoid race conditions, this method will check that the HEAD of the dataset is up to date. If the HEAD is not up to date, this method will raise a
DatasetHeadOutOfDateException
. In that case you could stash your local change and pull the latest version of the dataset and try again. Check out the documents forstash()
,pull()
andrestore()
for more details.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.pull() # make some changes to the dataset >>> dataset.get_diff() {'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'], 'modified_files': ['tabular_manifests/modified_file_1.csv'], 'deleted_files': ['tabular_manifests/deleted_file.csv']} >>> dataset.push_version("new version notes") {'version_id': '09575ee7-0407-44b8-ae88-765a8270b17a', 'dataset': 'dataset_name', 'message': 'new version notes', 'created_at': 'Wed, 15 Feb 2023 22:17:55 GMT', 'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06', 'is_latest_version': True} # After pushing the changes, the diff should be empty >>> dataset.get_diff() {'new_files':[], 'modified_files':[], 'deleted_files':[]}
- Parameters
message (str) – a non-empty string representing a message to associate with the created version.
- Returns
a dictionary with metadata representing the version created.
- Return type
Dict[str, Any]
- push_gantry_dataframe(gantrydf: gantry.query.core.dataframe.GantryDataFrame, filename: str = '')#
This method will take a GantryDataFrame as input, and save its contents to a new file in your dataset’s
tabular_manifests
folder, and create a new version. Note that all these operations happen in the remote server, if you have your dataset checked out locally, please make sure you callpull()
after this operation to get the latest data.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.pull() >>> os.listdir(workspace + dataset_name) ['.dataset_metadata', 'dataset_config.yaml', 'README.md', 'tabular_manifests'] >>> os.listdir(workspace + dataset_name + '/tabular_manifests') [] >>> gantrydf = gquery.query( ... application=application, ... start_time=start, ... end_time=end, ... version="1.1", ... ) >>> >>> _ = dataset.push_gantry_dataframe(gantrydf) >>> >>> _ = dataset.pull() >>> os.listdir(workspace + dataset_name + '/tabular_manifests') ['GantryDataAutoSave__4390b343-b802-4add-ab51-f86e53979c73_2023-02-13_12:09_to_2023-02-13_17:09_rows_0_69.csv']
- Parameters
gantrydf (GantryDataFrame) – the GantryDataFrame that needs to be added to the dataset.
filename (str) – the name to be given to the file with data from the GantryDataFrame.
- Returns
Latest commit info.
- push_file(file: IO, filename: Optional[str] = None, message: Optional[str] = None, parent_version_id: Optional[Union[str, uuid.UUID]] = None) Dict[str, Any] #
This method will add a new file to the dataset in Gantry server and create a new version of the dataset. With this method you can easily add new files to the dataset without having to pull the dataset locally. If you have your dataset checked out locally, please make sure you call
pull()
after this operation to get the latest data. The difference between this method andpush_tabular_file()
is that this method can handle both tabular and non-tabular files. If you are uploading a tabular file, please usepush_tabular_file()
instead.- Parameters
file (IO) – the file to be uploaded
filename (str, Optional) – the name of the file to be uploaded. Defaults to the name of the file passed in the
file
parameter.message (str, Optional) – the version message that will be associated with the upload of the file. Defaults to a generic message if not set
parent_version_id (uuid.UUID, Optional) – If specified, SDK will check whether you are making changes on top of the latest version, if not the operation will fail.
- Returns
A dictionary with metadata representing the version created.
- Return type
Dict[str, Any]
- push_tabular_file(file: IO, filename: Optional[str] = None, message: Optional[str] = None, parent_version_id: Optional[Union[str, uuid.UUID]] = None) Dict[str, Any] #
This method will add a tabular file to the dataset’s
tabular_manifests
folder and create a new version of the dataset on dataset server. With this method you can easily add new tabular files to an existing dataset without having to pull the dataset locally. If you have your dataset checked out locally, please make sure you callpull()
after this operation to get the latest data. If you are trying to add a non-tabular file, please usepush_file()
instead. Note: the tabular filename must be postfixed with.csv
.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.push_tabular_file( ... open("{file_to_be_added}.csv"), ... "{dataset_file_name}.csv", ... "version info" ... ) {'version_id': '2b575ee7-0407-44b8-ae88-765a8270b17a', 'dataset': 'dataset_name', 'message': 'version info', 'created_at': 'Wed, 15 Feb 2023 22:17:55 GMT', 'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06', 'is_latest_version': True}
- Parameters
file (IO) – the csv file to be uploaded
filename (str, Optional) – the name of the file to be uploaded. Defaults to the local file name of the file. Name must end with
.csv
.message (str, optional) – the version message that will be associated with the upload of the file. Defaults to a generic message if not set
parent_version_id (uuid.UUID, optional) – If specified, SDK will check whether you are making changes on top of the latest version, if not the operation will fail.
- Returns
A dictionary with metadata representing the version created.
- Return type
Dict[str, Any]
- push_dataframe(dataframe: pandas.core.frame.DataFrame, filename: Optional[str] = None, message: Optional[str] = None, parent_version_id: Optional[Union[str, uuid.UUID]] = None) Dict[str, Any] #
Add a dataframe to the dataset’s
tabular_manifests
folder as a csv file and create a new version of the dataset on the dataset server. With this method you can easily add new a dataframe to an existing dataset without having to pull the dataset locally. If you have your dataset checked out locally, please make sure you callpull()
after this operation.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.push_dataframe( ... pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}), ... "dataframe.csv", ... "version info" ... ) {'version_id': 'fa575ee7-0407-44b8-ae88-765a8270b17a', 'dataset': 'dataset_name', 'message': 'version info', 'created_at': 'Wed, 15 Feb 2023 22:17:55 GMT', 'created_by': '04c894c7-a853-4fec-a024-66f5aae07b06', 'is_latest_version': True}
- Parameters
dataframe (pd.DataFrame) – the dataframe to be uploaded
filename (str, Optional) – the name for the csv that will be uploaded. Defaults to dataframe-<random-hash>.csv
message (str, Optional) – the version message that will be associated with the upload of the file. Defaults to a generic message if not set
parent_version_id (uuid.UUID, Optional) – If specified, SDK will check whether you are making changes on top of the latest version, if not the operation will fail.
- Returns
A dictionary with metadata representing the version created.
- Return type
Dict[str, Any]
- rollback(version_id: Union[str, uuid.UUID], forced: bool = False) Optional[Dict[str, Any]] #
Rollback the dataset data to a previous version and create a new dataset version based on the target version id. With this method you can easily rollback to any previous version of your dataset and continue working on top of it. Note that to do this, you must have your local dataset folder up to date. Also this method will automatically pull the latest version to local dataset folder after the rollback. If forced is set to True, any local changes will be discarded.
Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.pull() >>> dataset.list_versions() [{'version_id': 'd6f42bd0-b7a9-4089-88fb-be7cee6d4a83', 'dataset': 'dataset_name', 'message': 'version info', 'created_at': 'Wed, 08 Feb 2023 22:02:02 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': True}, {'version_id': 'b787034a-798b-4bb3-a726-0e197ddb8aff', 'dataset': 'dataset_name', 'message': 'initial dataset commit', 'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': False}] >>> dataset.rollback("b787034a-798b-4bb3-a726-0e197ddb8aff") {'version_id': '23bc4d35-0df2-424c-9156-d5ca105eb4c1', 'dataset': 'dataset_name', 'message': 'Rollback dataset to version: b787034a-798b-4bb3-a726-0e197ddb8aff', 'created_at': 'Thu, 09 Feb 2023 00:36:07 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': True} >>> dataset.list_versions() [{'version_id': '23bc4d35-0df2-424c-9156-d5ca105eb4c1', 'dataset': 'dataset_name', 'message': 'Rollback dataset to version: b787034a-798b-4bb3-a726-0e197ddb8aff', 'created_at': 'Thu, 09 Feb 2023 00:36:07 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': True}, {'version_id': 'd6f42bd0-b7a9-4089-88fb-be7cee6d4a83', 'dataset': 'dataset_name', 'message': 'version info', 'created_at': 'Wed, 08 Feb 2023 22:02:02 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': False}, {'version_id': 'b787034a-798b-4bb3-a726-0e197ddb8aff', 'dataset': 'dataset_name', 'message': 'initial dataset commit', 'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': False}]
- Parameters
version_id (str) – ID of the version to rollback to.
forced (bool) – whether to discard local changes when rolling back.
- Returns
A dictionary with metadata representing the versioned rolled back to if successful,
None
otherwise.- Return type
Optional[Dict[str, Any]]
- stash() None #
This method will stash all the changes in your local dataset folder to a temporary folder. You can use this method to save your changes before the
pull()
method. Later on you can use therestore()
method to restore your changes. Note that if you runstash()
multiple times, the changes will be overwritten and only the latest stash will be kept.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.pull() # make some changes to your dataset >>> dataset.get_diff() {'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'], 'modified_files': ['tabular_manifests/modified_file_1.csv'], 'deleted_files': ['tabular_manifests/deleted_file.csv']} >>> dataset.stash() >>> dataset.get_diff() {'new_files': [], 'modified_files': [], 'deleted_files': []}
- Returns
None
- restore() None #
This method will restore the changes from the stash to your local dataset folder. Note that if you have made changes to your local dataset folder after the
stash()
method, those changes could be overwritten by this method.Example usage:
>>> import gantry.dataset as gdataset >>> dataset = gdataset.get_dataset(dataset_name) >>> dataset.pull() # make some changes to your local dataset folder >>> dataset.get_diff() {'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'], 'modified_files': ['tabular_manifests/modified_file_1.csv'], 'deleted_files': ['tabular_manifests/deleted_file.csv']} >>> dataset.stash() >>> dataset.get_diff() {'new_files': [], 'modified_files': [], 'deleted_files': []} >>> dataset.restore() >>> dataset.get_diff() {'new_files': ['tabular_manifests/new_file.csv', 'artifacts/image.png'], 'modified_files': ['tabular_manifests/modified_file_1.csv'], 'deleted_files': ['tabular_manifests/deleted_file.csv']}
- Returns
None
- delete() None #
Mark the dataset as deleted. As for now this will only hide dataset from the list_datasets API results, it will not delete the dataset data from the server, we are working on implementing the hard delete feature.
- Returns
None
- pull_huggingface_dataset_by_version(version_id: Optional[str] = None, hf_dataset_name: Optional[str] = None, streaming=False)#
This function will pull the dataset tabular files from gantry server and convert it to huggingface dataset. This method will not pull data to your local dataset folder, it will only load dataset tabular data from a remote server into a huggingface dataset object.
If there is no version_id specified, it will load the latest version of the dataset. If streaming set to True, it will return an IterableDataset or IterableDatasetDict instead of a Dataset or DatasetDict.
Refer to HuggingFace Documentation for more details about streaming.
Example usage:
import gantry.dataset as gdataset dataset = gdataset.get_dataset(dataset_name) # download your dataset as huggingface dataset hf_dataset = dataset.pull_huggingface_dataset_by_version(version_id="version_id") # pull the dataset as IterableDataset with streaming hf_dataset = dataset.pull_huggingface_dataset_by_version(version_id="version_id", streaming=True) # noqa: E501
- Parameters
version_id (Optional[str], optional) – dataset version id to load. Defaults to latest.
hf_dataset_name (Optional[str], optional) – param to overwrite the huggingface dataset name. Note this param must be a valid python identifier.
streaming (bool, default
False
) – If set to True, don’t download the data files. Instead, it streams the data progressively while iterating on the dataset. An IterableDataset or IterableDatasetDict is returned instead in this case.
- get_huggingface_dataset(hf_dataset_name: Optional[str] = None, streaming: bool = False)#
This function will convert your local dataset folder to a huggingface dataset. Internally Gantry SDK will create a huggingface load script based on the dataset config and then load the dataset into huggingface pyarrow dataset. If streaming set to True, it will return an IterableDataset or IterableDatasetDict instead of a Dataset or DatasetDict.
Refer to HuggingFace documentation for more details about streaming.
Example usage:
import gantry.dataset as gdataset dataset = gdataset.get_dataset(dataset_name) dataset.pull() # load the dataset into huggingface dataset hf_dataset = dataset.get_huggingface_dataset() # load the dataset into huggingface dataset with streaming hf_dataset = dataset.get_huggingface_dataset(streaming=True)
- Parameters
versoion_id (Optional[str], optional) – dataset version id to load. Defaults to latest.
hf_dataset_name (Optional[str], optional) – param to overwrite the huggingface dataset name. Note this param must be a valid python identifier.
streaming (bool, default
False
) – If set to True, don’t download the data files. Instead, it streams the data progressively while iterating on the dataset. An IterableDataset or IterableDatasetDict is returned instead in this case.
- gantry.dataset.create_dataset(name: str, bucket_name: Optional[str] = None, app_name: Optional[str] = None) gantry.dataset.gantry_dataset.GantryDataset #
Alias for
gantry.dataset.client.GantryDatasetClient.create_dataset()
Create a dataset with the provided name. If
app_name
is provided, we will use the application schema to set the dataset schema.bucket_name
can be provided if you want to store data in your own bucket, please contact Gantry support if you want to use this feature. If not provided, we will use a gantry managed bucket.Example usage:
import gantry.dataset as gdataset # Create an empty dataset with the name "dataset_name" gdataset.create_dataset("dataset_name") # Create a dataset with the name "dataset_name" and set the schema to the schema of the # application "app_name" gdataset.create_dataset("dataset_with_app", app_name="app_name")
- Parameters
name (str) – dataset name
bucket_name (str) – Provide bucket name if you want to use your own bucket. If not provided we will use a gantry managed bucket.
app_name (Optional[str], optional) – gantry application name which will be used to set dataset schema if provided.
- Returns
an object representing the created dataset.
- Return type
- gantry.dataset.get_dataset(name: str, app_name: Optional[str] = None) gantry.dataset.gantry_dataset.GantryDataset #
Alias for
gantry.dataset.client.GantryDatasetClient.get_dataset()
Get a dataset object by name. If the dataset is marked as deleted, a warning will be logged. If the dataset does not exist, a
gantry.exceptions.DatasetNotFoundError
will be raised.Example usage:
import gantry.dataset as gdataset dataset = gdataset.get_dataset("dataset_name", "test_app")
- Parameters
name (str) – the name of the dataset.
app_name (str) – the name of the application the dataset belongs to.
- Returns
An object representing the dataset name.
- Return type
- gantry.dataset.list_dataset_versions(dataset_name: str) List[Dict[str, Any]] #
Alias for
gantry.dataset.client.GantryDatasetClient.list_dataset_versions()
List all versions of a dataset. Each dataset version is a snapshot of the dataset at a particular point in time. The result will be sorted from latest to earliest.
Example usage:
import gantry.dataset as gdataset gdataset.list_dataset_versions("dataset_name")
Output example:
[{'version_id': 'd6f42bd0-b7a9-4089-88fb-be7cee6d4a83', 'dataset': 'dataset_name', 'message': 'version info', 'created_at': 'Wed, 08 Feb 2023 22:02:02 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': True}, {'version_id': 'b787034a-798b-4bb3-a726-0e197ddb8aff', 'dataset': 'dataset_name', 'message': 'initial dataset commit', 'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT', 'created_by': 'db459d6d-c83b-496d-b659-e48bca971156', 'is_latest_version': False}]
- Parameters
name (str) – the name of the dataset.
- Returns
list of dictionaries with metadata.
- Return type
List[Dict[str, Any]]
- gantry.dataset.list_datasets(include_deleted: bool = False, model_node_id: Optional[str] = None) List[Dict[str, Any]] #
Alias for
gantry.dataset.client.GantryDatasetClient.list_datasets()
List all datasets. If
include_deleted
is set toTrue
, deleted datasets will be included in the result list.Example usage:
import gantry.dataset as gdataset gdataset.list_datasets()
Output example:
[{'name': 'dataset_0', 'dataset_id': '44e8dff0-1e4c-4484-843c-ca3c585d405d', 'created_at': 'Sun, 12 Feb 2023 00:08:12 GMT'}, {'name': 'dataset_1', 'dataset_id': '3adb66fa-9dc7-4a60-86b6-83389f567186', 'created_at': 'Sun, 12 Feb 2023 00:05:15 GMT'}, {'name': 'dataset_2', 'dataset_id': '0a5b5706-2060-4601-8c5c-22900f06d54a', 'created_at': 'Wed, 08 Feb 2023 22:00:26 GMT'},
- Parameters
include_deleted (bool) – whether include deleted datasets in the result, defaults to
False
.- Returns
List of dictionaries, each representing one dataset and associated metadata.
- Return type
List[Dict[str, Any]]
- gantry.dataset.delete_dataset(name: str) None #
Alias for
gantry.dataset.client.GantryDatasetClient.delete_dataset()
Mark the dataset as deleted. As for now this will only hide dataset from the list_datasets API results, it will not delete the dataset data from the server, we will release the hard deletion feature later.
Example usage:
import gantry.dataset as gdataset gdataset.delete_dataset("dataset_name")
- Parameters
name (str) – the name of the dataset to be deleted.
- Returns
None
- gantry.dataset.set_working_directory(working_dir: str) None #
Alias for
gantry.dataset.client.GantryDatasetClient.set_working_directory()
Set the working directory for the dataset client. This is the directory where the local copy of the dataset will be stored.
Example usage:
import gantry.dataset as gdataset gdataset.set_working_directory("your working directory")
- Parameters
working_dir (str) – absolute path of the directory used to store local copy of datasets.
- Returns
None
Queries#
Time Window#
Define a time window for querying data.
- class gantry.query.time_window.TimeWindow(start_time: Union[str, datetime.datetime], end_time: Union[str, datetime.datetime])#
- __init__(start_time: Union[str, datetime.datetime], end_time: Union[str, datetime.datetime])#
Initialize a standard time window.
- Parameters
start_time (Union[str, datetime.datetime]) – the start time of the time window.
end_time (Union[str, datetime.datetime]) – the end time of the time window.
- class gantry.query.time_window.RelativeTimeWindow(window_length: datetime.timedelta, offset: Optional[datetime.timedelta] = None)#
- __init__(window_length: datetime.timedelta, offset: Optional[datetime.timedelta] = None)#
Initialize a relative time window.
- Parameters
window_length (datetime.timedelta) – the length of the time window.
offset (Optional[datetime.timedelta]) – the offset of the time window. Defaults to None.
Metrics#
- gantry.query.metric.main.accuracy_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.accuracy_score()
Categorical metric - accuracy
In multilabel classification, this computes the set of |outputs| which exactly match the available |feedback|.
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries
dropna (bool, defaults to False) – if True, drop rows with NaN values in result
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) accuracy score
- gantry.query.metric.main.mean_squared_error(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, multioutput: Literal['uniform_average', 'raw_values'] = 'uniform_average', squared: bool = True) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.mean_squared_error()
Regression metric- mean squared error
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries
dropna (bool, defaults to False) – if True, drop rows with NaN values in result
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into.
multioutput ("uniform_average" or "raw_values") – type of averaging to use when computing the metric. Defaults to “uniform_average”.
squared (boo, defaults to True) – if True, return the squared error.
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) mean_squared_error
- gantry.query.metric.main.confusion_matrix(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.confusion_matrix()
Categorical metric - confusion matrix The confusion matrix is a matrix \(C\) where \(C_{i, j}\) represents the number of times a data point from the class \(i\) was predicted to be in class \(j\).
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries.
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
- (pd.DataFrame) pd.DataFrame of shape (num_points, 1)
confusion_matrix
- gantry.query.metric.main.f1_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, average: Literal['micro'] = 'micro') pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.f1_score()
Categorical metric - F1 score
It is computed as the harmonic mean of precision and recall: F1 = 2 * (precision * recall) / (precision + recall) In multiclass classification, this is the average of the F1 score for all available classes.
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries.
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
average ("micro") – type of averaging to use when computing the metric. Currently only “micro” supported, which is the default value.
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) f1_score
- gantry.query.metric.main.r2_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, multioutput: Literal['uniform_average', 'raw_values', 'variance_weighted'] = 'uniform_average') float #
Alias for
gantry.query.core.GantryMetric.r2_score()
Regression metric- R^2 coefficient of determination
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries.
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
multioutput ("uniform_average", "raw_values", "variance_weighted") – type of averaging to use when computing the metric. Defaults to “uniform_average”.
- Returns
(float) R^2 score.
- gantry.query.metric.main.precision_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, average: Literal['micro'] = 'micro') pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.precision_score()
Categorical metric - precision score
precision = (number of true positives) / ((number of true positives) + (number of false positives))
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries.
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
average ("micro") – type of averaging to use when computing the metric. Currently only “micro” supported, which is the default value.
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) precision_score
- gantry.query.metric.main.recall_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1, average: Literal['micro'] = 'micro') pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.recall_score()
Categorical metric - recall score
recall = (number of true positives) / ((number of true positives) + (number of false negatives))
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries.
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
average ("micro") – type of averaging to use when computing the metric. Currently only “micro” supported, which is the default value.
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) recall_score
- gantry.query.metric.main.roc_auc_score(outputs: gantry.query.core.dataframe.GantrySeries, feedback: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.roc_auc_score()
Classification score metric - the area under the ROC curve
- Parameters
outputs (GantrySeries) – predictions as a GantrySeries
feedback (GantrySeries) – labels to compare against as a GantrySeries.
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) roc_auc_score
- gantry.query.metric.main.percent_null(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.percent_null()
Percent null/NaN
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, drop rows with NaN values in result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_null
- gantry.query.metric.main.percent_true(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.percent_true()
Percent true
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_true
- gantry.query.metric.main.percent_false(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.percent_false()
Percent false
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_false
- gantry.query.metric.main.percent_true_not_null(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.percent_true_not_null()
Percent true after excluding null values
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_true_not_null
- gantry.query.metric.main.percent_false_not_null(data_node: gantry.query.core.dataframe.GantrySeries, dropna: bool = False, num_points: int = 1) pandas.core.frame.DataFrame #
Alias for
gantry.query.core.GantryMetric.percent_false_not_null()
Percent false after excluding null values
- Parameters
data_node (GantrySeries) – GantrySeries which will be calculated
dropna (bool, defaults to False) – if True, first drops NaN values before calculating result.
num_points (int, defaults to 1) – number of points to divide the time window of the GantrySeries into
- Returns
(pd.DataFrame) pd.DataFrame of shape (num_points, 1) percent_false_not_null
Distances#
- gantry.query.distance.main.d1(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float #
Alias for
gantry.query.core.GantryDistance.d1()
Computes the D1 distance between the input feature’s distributions.
- Parameters
feat1 (GantrySeries) – feature as a GantrySeries
feat2 (GantrySeries) – feature to compute dist with as a GantrySeries
Returns: float d1 distance
- gantry.query.distance.main.dinf(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float #
Alias for
gantry.query.core.GantryDistance.dinf()
Computes the maximum distance between the input features’s distributions.
- Parameters
feat1 (GantrySeries) – feature as a GantrySeries
feat2 (GantrySeries) – feature to compute dist with as a GantrySeries
Returns: float d_inf distance
- gantry.query.distance.main.ks(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float #
Alias for
gantry.query.core.GantryDistance.ks()
Performs the one-sample Kolmogorov-Smirnov test for goodness of fit between the input features’s distributions.
- Parameters
feat1 (GantrySeries) – feature as a GantrySeries
feat2 (GantrySeries) – feature to compute dist with as a GantrySeries
Returns: Tuple[float ks distance measure]
- gantry.query.distance.main.kl(feat1: gantry.query.core.dataframe.GantrySeries, feat2: gantry.query.core.dataframe.GantrySeries) float #
Alias for
gantry.query.core.GantryDistance.kl()
Gets the Kullback-Leibler divergence between the input features’s distributions.
- Parameters
feat1 (GantrySeries) – feature as a GantrySeries
feat2 (GantrySeries) – feature to compute dist with as a GantrySeries
Returns: Tuple[float kl divergence]