Skip to content

DataSource

airt.client.DataSource

A class for managing the datasources and training ML models on them.

To instantiate the DataSource class, please call DataBlob.to_datasource method of the DataBlob class.

The DataSource class has two categories of methods,

  • Methods for managing the datasources.
  • Method for training a model against a datasource.

Methods like delete, ls, details, head, etc., can be used to manage and get additional details of a datasource.

And, the train method can be used to train a new model against a datasource.

All the function calls to the library are asynchronous and they return immediately.

The methods inside the returned object will have a status property and a method to display an interactive progress bar. Calling these will return the remote action progress.

Below is an example for training a new model and monitoring its progress:

model = datasource.train(
    client_column="user_id",
    target_column="event_type",
    target="*purchase",
    predict_after=timedelta(hours=3)
)

model.progress_bar()

For more information, please refer to the documentation of the train method.

dtypes: DataFrame property readonly

Return the dtypes of the datasource.

Returns:

Type Description
DataFrame

A pandas dataframe containing the dtypes.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to get the dtypes of a datasource:

ds.dtypes

__init__(self, uuid, datablob=None, folder_size=None, no_of_rows=None, error=None, disabled=None, created=None, pulled_on=None, user=None, hash=None, region=None, cloud_provider=None, tags=None, total_steps=None, completed_steps=None) special

Constructs a new DataSource instance.

Warning

Do not construct this object directly by calling the constructor, please use DataBlob.to_datasource method instead.

Parameters:

Name Type Description Default
uuid str

The datasource uuid in server.

required
datablob Optional[str]

The datablob uuid in server.

None
folder_size Optional[int]

The size of the uploaded datasource in bytes.

None
no_of_rows Optional[int]

The number of records in the datasource.

None
error Optional[str]

Error message while processing the datasource.

None
disabled Optional[bool]

Flag to indicate the active status of the datasource.

None
created Optional[str]

The datasource creation date.

None
pulled_on Optional[str]

The datasource last pulled date.

None
user Optional[str]

The uuid of the user who created the datasource.

None
hash Optional[str]

The datasource hash.

None
region Optional[str]

AWS bucket region.

None
cloud_provider Optional[str]

The name of the cloud storage provider where the datasource is stored.

None
tags Optional[List[Dict[str, str]]]

Tag names associated with the datasource.

None
total_steps Optional[int]

No of steps needed to upload the datasource to server.

None
completed_steps Optional[int]

No of steps completed while the uploading the datasource to server.

None
Source code in airt/client.py
def __init__(
    self,
    uuid: str,
    datablob: Optional[str] = None,
    folder_size: Optional[int] = None,
    no_of_rows: Optional[int] = None,
    error: Optional[str] = None,
    disabled: Optional[bool] = None,
    created: Optional[str] = None,
    pulled_on: Optional[str] = None,
    user: Optional[str] = None,
    hash: Optional[str] = None,
    region: Optional[str] = None,
    cloud_provider: Optional[str] = None,
    tags: Optional[List[Dict[str, str]]] = None,
    total_steps: Optional[int] = None,
    completed_steps: Optional[int] = None,
):
    """Constructs a new `DataSource` instance.

    Warning:
        Do not construct this object directly by calling the constructor, please use `DataBlob.to_datasource` method instead.

    Args:
        uuid: The datasource uuid in server.
        datablob: The datablob uuid in server.
        folder_size: The size of the uploaded datasource in bytes.
        no_of_rows: The number of records in the datasource.
        error: Error message while processing the datasource.
        disabled: Flag to indicate the active status of the datasource.
        created: The datasource creation date.
        pulled_on: The datasource last pulled date.
        user: The uuid of the user who created the datasource.
        hash: The datasource hash.
        region: AWS bucket region.
        cloud_provider: The name of the cloud storage provider where the datasource is stored.
        tags: Tag names associated with the datasource.
        total_steps: No of steps needed to upload the datasource to server.
        completed_steps: No of steps completed while the uploading the datasource to server.
    """
    self.uuid = uuid
    self.datablob = datablob
    self.folder_size = folder_size
    self.no_of_rows = no_of_rows
    self.error = error
    self.disabled = disabled
    self.created = created
    self.pulled_on = pulled_on
    self.user = user
    self.hash = hash
    self.region = region
    self.cloud_provider = cloud_provider
    self.tags = tags
    self.total_steps = total_steps
    self.completed_steps = completed_steps

as_df(dsx) staticmethod

Return the details of DataSource instances as a pandas dataframe.

Parameters:

Name Type Description Default
dsx List[DataSource]

List of DataSource instances.

required

Returns:

Type Description
DataFrame

Details of the datasources in a dataframe.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example get the details of the DataSource instances:

dsx = DataSource.ls()
DataSource.as_df(dsx)
Source code in airt/client.py
@staticmethod
def as_df(dsx: List["DataSource"]) -> pd.DataFrame:
    """Return the details of `DataSource` instances as a pandas dataframe.

    Args:
        dsx: List of `DataSource` instances.

    Returns:
        Details of the datasources in a dataframe.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example get the details of the `DataSource` instances:

    ```python
    dsx = DataSource.ls()
    DataSource.as_df(dsx)
    ```
    """

    ds_lists = [{i: getattr(ds, i) for i in DataSource.ALL_DS_COLS} for ds in dsx]

    for ds in ds_lists:
        ds["tags"] = get_values_from_item(ds["tags"], "name")

    lists_df = generate_df(ds_lists, DataSource.BASIC_DS_COLS)
    df = add_ready_column(lists_df)

    df = df.rename(columns=DataSource.COLS_TO_RENAME)

    return df

delete(self)

Delete a datasource from the server.

Returns:

Type Description
DataFrame

A pandas DataFrame encapsulating the details of the deleted datasource.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to delete a datasource from server:

ds.delete()
Source code in airt/client.py
@patch
def delete(self: DataSource) -> pd.DataFrame:
    """Delete a datasource from the server.

    Returns:
        A pandas DataFrame encapsulating the details of the deleted datasource.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to delete a datasource from server:

    ```python
    ds.delete()
    ```
    """

    response = Client._delete_data(relative_url=f"/datasource/{self.uuid}")

    response["tags"] = get_values_from_item(response["tags"], "name")

    df = pd.DataFrame([response])[DataSource.BASIC_DS_COLS]

    df = df.rename(columns=DataSource.COLS_TO_RENAME)

    return add_ready_column(df)

details(self)

Return details of a datasource.

Returns:

Type Description
DataFrame

The datasource details as a pandas dataframe.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to get details of a datasource from server:

ds.details()
Source code in airt/client.py
@patch
def details(self: DataSource) -> pd.DataFrame:
    """Return details of a datasource.

    Returns:
        The datasource details as a pandas dataframe.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to get details of a datasource from server:

    ```python
    ds.details()
    ```
    """

    response = Client._get_data(relative_url=f"/datasource/{self.uuid}")

    response["tags"] = get_values_from_item(response["tags"], "name")

    df = pd.DataFrame([response])[DataSource.ALL_DS_COLS]

    df = df.rename(columns=DataSource.COLS_TO_RENAME)

    return add_ready_column(df)

head(self)

Return the first few rows of the datasource.

Returns:

Type Description
DataFrame

The first few rows of the datasource as a pandas dataframe.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to show the first few records of the data source:

ds.head()
Source code in airt/client.py
@patch
def head(self: DataSource) -> pd.DataFrame:
    """Return the first few rows of the datasource.

    Returns:
        The first few rows of the datasource as a pandas dataframe.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to show the first few records of the data source:

    ```python
    ds.head()
    ```
    """
    response = Client._get_data(relative_url=f"/datasource/{self.uuid}/head")
    df = dict_to_df(response)

    return df

is_ready(self)

Check if the method's progress is complete.

Returns:

Type Description
bool

True if the progress if completed, else False.

ds.is_ready()
Source code in airt/client.py
@patch
def is_ready(
    self: DataSource,
) -> bool:
    """Check if the method's progress is complete.

    Returns:
        **True** if the progress if completed, else **False**.

    ```python

    ds.is_ready()
    ```
    """
    progress_status = ProgressStatus(relative_url=f"/datasource/{self.uuid}")

    return progress_status.is_ready()

ls(offset=0, limit=100, disabled=False, completed=False) staticmethod

Return the list of DataSource instances available in server.

Parameters:

Name Type Description Default
offset int

The number of datasources to offset at the beginning. If None, then the default value 0 will be used.

0
limit int

The maximum number of datasources to return from the server. If None, then the default value 100 will be used.

100
disabled bool

If set to True, then only the deleted datasources will be returned. Else, the default value False will be used to return only the list of active datasources.

False
completed bool

If set to True, then only the datasources that are successfully processed in server will be returned. Else, the default value False will be used to return all the datasources.

False

Returns:

Type Description
List[DataSource]

A list of DataSource instances available in server.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to list the available datablobs:

DataSource.ls()
Source code in airt/client.py
@staticmethod
def ls(
    offset: int = 0,
    limit: int = 100,
    disabled: bool = False,
    completed: bool = False,
) -> List["DataSource"]:
    """Return the list of `DataSource` instances available in server.

    Args:
        offset: The number of datasources to offset at the beginning. If **None**,
            then the default value **0** will be used.
        limit: The maximum number of datasources to return from the server. If **None**,
            then the default value **100** will be used.
        disabled: If set to **True**, then only the deleted datasources will be returned.
            Else, the default value **False** will be used to return only the list
            of active datasources.
        completed: If set to **True**, then only the datasources that are successfully processed
            in server will be returned. Else, the default value **False** will be used to
            return all the datasources.

    Returns:
        A list of `DataSource` instances available in server.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to list the available datablobs:

    ```python
    DataSource.ls()
    ```
    """
    lists = Client._get_data(
        relative_url=f"/datasource/?disabled={disabled}&completed={completed}&offset={offset}&limit={limit}"
    )

    dsx = [
        DataSource(
            uuid=ds["uuid"],
            datablob=ds["datablob"],
            folder_size=ds["folder_size"],
            no_of_rows=ds["no_of_rows"],
            region=ds["region"],
            cloud_provider=ds["cloud_provider"],
            error=ds["error"],
            disabled=ds["disabled"],
            created=ds["created"],
            pulled_on=ds["pulled_on"],
            user=ds["user"],
            hash=ds["hash"],
            tags=ds["tags"],
            total_steps=ds["total_steps"],
            completed_steps=ds["completed_steps"],
        )
        for ds in lists
    ]

    return dsx

progress_bar(self, sleep_for=5, timeout=0)

Blocks the execution and displays a progress bar showing the remote action progress.

Parameters:

Name Type Description Default
sleep_for Union[int, float]

The time interval in seconds between successive API calls.

5
timeout int

The maximum time allowed in seconds for the asynchronous call to complete. If not the progressbar will be terminated.

0

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

TimeoutError

in case of connection timeout.

ds.progress_bar()
Source code in airt/client.py
@patch
def progress_bar(self: DataSource, sleep_for: Union[int, float] = 5, timeout: int = 0):
    """Blocks the execution and displays a progress bar showing the remote action progress.

    Args:
        sleep_for: The time interval in seconds between successive API calls.
        timeout: The maximum time allowed in seconds for the asynchronous call to complete. If not the
            progressbar will be terminated.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.
        TimeoutError: in case of connection timeout.

    ```python

    ds.progress_bar()
    ```
    """
    progress_status = ProgressStatus(
        relative_url=f"/datasource/{self.uuid}", sleep_for=sleep_for, timeout=timeout
    )

    progress_status.progress_bar()

tag(self, name)

Tag an existing datasource in server.

Parameters:

Name Type Description Default
name str

A string to tag the datasource.

required

Returns:

Type Description
DataFrame

A pandas dataframe with the details of the tagged datasource.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to tag an existing datasource:

ds.tag(name="v1.0")
Source code in airt/client.py
@patch
def tag(self: DataSource, name: str) -> pd.DataFrame:
    """Tag an existing datasource in server.

    Args:
        name: A string to tag the datasource.

    Returns:
        A pandas dataframe with the details of the tagged datasource.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to tag an existing datasource:

    ```python
    ds.tag(name="v1.0")
    ```
    """
    response = Client._post_data(
        relative_url=f"/datasource/{self.uuid}/tag", json=dict(name=name)
    )

    response["tags"] = get_values_from_item(response["tags"], "name")

    df = pd.DataFrame([response])[DataSource.BASIC_DS_COLS]
    df = df.rename(columns=DataSource.COLS_TO_RENAME)

    return add_ready_column(df)

train(self, *, client_column, timestamp_column=None, target_column, target, predict_after)

Train a model against the datasource.

This method trains the model for predicting which clients are most likely to have a specified event in the future.

The call to this method is asynchronous and the progress can be checked using the progress bar method or the status flag attribute available in the DataSource class.

For more model specific information, please check the documentation of Model class.

Parameters:

Name Type Description Default
client_column str

The column name that uniquely identifies the users/clients.

required
timestamp_column Optional[str]

The timestamp column indicating the time of an event. If not passed, then the default value None will be used.

None
target_column str

Target column name that indicates the type of the event.

required
target str

Target event name to train and make predictions. You can pass the target event as a string or as a regular expression for predicting more than one event. For example, passing *checkout will train a model to predict any checkout event.

required
predict_after timedelta

Time delta in hours of the expected target event.

required

Returns:

Type Description
Model

An instance of the Model class.

Exceptions:

Type Description
ValueError

If the input parameters to the API are invalid.

ConnectionError

If the server address is invalid or not reachable.

An example to predict which users will perform a purchase event ("*purchase") 3 hours before they actually do it:

from datetime import timedelta

model = ds.train(
    client_column="user_id",
    target_column="event_type",
    target="*purchase",
    predict_after=timedelta(hours=3)
)

model.progress_bar()
Source code in airt/client.py
@patch
def train(
    self: DataSource,
    *,
    client_column: str,
    timestamp_column: Optional[str] = None,
    target_column: str,
    target: str,
    predict_after: timedelta,
) -> Model:
    """Train a model against the datasource.

    This method trains the model for predicting which clients are most likely to have a specified
    event in the future.

    The call to this method is asynchronous and the progress can be checked using the progress bar method
    or the status flag attribute available in the `DataSource` class.

    For more model specific information, please check the documentation of `Model` class.

    Args:
        client_column: The column name that uniquely identifies the users/clients.
        timestamp_column: The timestamp column indicating the time of an event. If not passed,
            then the default value **None** will be used.
        target_column: Target column name that indicates the type of the event.
        target: Target event name to train and make predictions. You can pass the target event as a string or as a
            regular expression for predicting more than one event. For example, passing ***checkout** will
            train a model to predict any checkout event.
        predict_after: Time delta in hours of the expected target event.

    Returns:
        An instance of the `Model` class.

    Raises:
        ValueError: If the input parameters to the API are invalid.
        ConnectionError: If the server address is invalid or not reachable.

    An example to predict which users will perform a purchase event **("*purchase")** 3 hours before they actually do it:

    ```python
    from datetime import timedelta

    model = ds.train(
        client_column="user_id",
        target_column="event_type",
        target="*purchase",
        predict_after=timedelta(hours=3)
    )

    model.progress_bar()
    ```
    """
    response = Client._post_data(
        relative_url=f"/model/train",
        json=dict(
            data_uuid=self.uuid,
            client_column=client_column,
            target_column=target_column,
            target=target,
            predict_after=int(predict_after.total_seconds()),
        ),
    )

    return Model(uuid=response["uuid"])

wait(self, sleep_for=1, timeout=0)

Blocks execution while waiting for the remote action to complete.

Parameters:

Name Type Description Default
sleep_for Union[int, float]

The time interval in seconds between successive API calls.

1
timeout int

The maximum time allowed in seconds for the asynchronous call to complete. If not the progressbar will be terminated.

0

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

TimeoutError

in case of timeout.

ds.wait()
Source code in airt/client.py
@patch
def wait(self: DataSource, sleep_for: Union[int, float] = 1, timeout: int = 0):
    """Blocks execution while waiting for the remote action to complete.

    Args:
        sleep_for: The time interval in seconds between successive API calls.
        timeout: The maximum time allowed in seconds for the asynchronous call to complete. If not the
            progressbar will be terminated.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.
        TimeoutError: in case of timeout.

    ```python

    ds.wait()
    ```
    """

    progress_status = ProgressStatus(
        relative_url=f"/datasource/{self.uuid}", sleep_for=sleep_for, timeout=timeout
    )

    progress_status.wait()
Back to top