Skip to content

DataBlob

airt.client.DataBlob

A class for importing and processing the data from sources like CSV files, databases, or AWS S3 bucket.

Currently, the only way to instantiate the DataBlob class is to call either from_local, from_mysql, from_clickhouse, or from_s3 static methods which imports the data from:

  • a local CSV file,

  • a MySql database,

  • a ClickHouse database, and

  • an AWS S3 bucket in the Parquet file format respectively.

We plan to support other databases and storage medium in the future.

For uploading the data from a local CSV file, the from_local method can be used and only requires the relative/absolute path of the CSV file as input.

For establishing the connection to the MySql database, the from_mysql method can be used and requires parameters like host, port, database name, table name, etc. If the access to the MySql database requires authentication, then the credentials can be passed as parameters to the from_mysql method or stored in the environment variables AIRT_CLIENT_DB_USERNAME and AIRT_CLIENT_DB_PASSWORD.

For establishing the connection to the ClickHouse database, the from_clickhouse method can be used and requires parameters like host, port, database name, protocol, etc. If the access to the ClickHouse database requires authentication, then the credentials can be passed as parameters to the from_clickhouse method or stored in the environment variables CLICKHOUSE_USERNAME and CLICKHOUSE_PASSWORD.

Similarly, for establishing a connection to the S3 bucket, the from_s3 method can be used and requires parameters like s3 uri, aws_access_key, and aws_secret_key, etc. The AWS S3 access_key and the secret_key can be passed as parameters to the from_s3 method or stored in the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

All the function calls to the library are asynchronous and they return immediately.

To manage completion, all methods inside the returned object will return a status object and a method to display an interactive progress bar that can be called to check the progress.

Below are code examples for accessing the above status methods:

An example to check for the status of from_s3 connection:

```python
data_blob = DataBlob.from_s3(
    uri="s3://bucket/events.parquet"
)
data_blob.is_ready()

data_source = data_blob.from_csv()
data_source.is_ready()
```

An example to display an interactive progress bar:

```python
data_blob = DataBlob.from_s3(
    uri="s3://bucket/events.parquet"
)
data_blob.progress_bar()

data_source = data_blob.from_csv()
data_source.progress_bar()
```

__init__(self, id, type=None, source=None, datasources=None, total_steps=None, completed_steps=None, folder_size=None, disabled=None, pulled_on=None, user_id=None, tags=None, error=None) special

Constructs a new DataBlob instance.

Warning

Do not construct this object directly by calling the constructor, please use from_s3, from_mysql, from_clickhouse or from_local methods instead.

Parameters:

Name Type Description Default
id int

Datablob id in the server.

required
type Optional[str]

Datablob type in the server.

None
source Optional[str]

Datablob source.

None
datasources Optional[List[str]]

The list of datasources created using the datablob.

None
total_steps Optional[int]

No of steps needed to upload the datablob to the server.

None
completed_steps Optional[int]

No of steps completed while the uploading the datablob to the server.

None
folder_size Optional[int]

The size of the uploaded datablob in bytes.

None
disabled Optional[bool]

Flag to indicate the active status of the datablob.

None
pulled_on Optional[str]

Datablob last pulled date.

None
user_id Optional[int]

The id of the user who created the datablob.

None
tags Optional[List]

Tag names associated with the datablob.

None
error Optional[str]

Error message while processing the datablob.

None
Source code in airt/client.py
def __init__(
    self,
    id: int,
    type: Optional[str] = None,
    source: Optional[str] = None,
    datasources: Optional[List[str]] = None,
    total_steps: Optional[int] = None,
    completed_steps: Optional[int] = None,
    folder_size: Optional[int] = None,
    disabled: Optional[bool] = None,
    pulled_on: Optional[str] = None,
    user_id: Optional[int] = None,
    tags: Optional[List] = None,
    error: Optional[str] = None,
):
    """Constructs a new DataBlob instance.

    Warning:
        Do not construct this object directly by calling the constructor, please use `from_s3`,
        `from_mysql`, `from_clickhouse` or `from_local` methods instead.

    Args:
        id: Datablob id in the server.
        type: Datablob type in the server.
        source: Datablob source.
        datasources: The list of datasources created using the datablob.
        total_steps: No of steps needed to upload the datablob to the server.
        completed_steps: No of steps completed while the uploading the datablob to the server.
        folder_size: The size of the uploaded datablob in bytes.
        disabled: Flag to indicate the active status of the datablob.
        pulled_on: Datablob last pulled date.
        user_id: The id of the user who created the datablob.
        tags: Tag names associated with the datablob.
        error: Error message while processing the datablob.
    """
    self.id = id
    self.type = type
    self.source = source
    self.datasources = datasources
    self.total_steps = total_steps
    self.completed_steps = completed_steps
    self.folder_size = folder_size
    self.disabled = disabled
    self.pulled_on = pulled_on
    self.user_id = user_id
    self.tags = tags
    self.error = error

as_df(dbx) staticmethod

Return the details of datablob instances as a pandas dataframe.

Parameters:

Name Type Description Default
dbx List[DataBlob]

List of datablob instances.

required

Returns:

Type Description
DataFrame

Details of all the datablobs in a dataframe.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example get the details of available datablobs:

dbx = DataBlob.ls()
Datablob.as_df(dbx)
Source code in airt/client.py
@staticmethod
def as_df(dbx: List["DataBlob"]) -> pd.DataFrame:
    """Return the details of datablob instances as a pandas dataframe.

    Args:
        dbx: List of datablob instances.

    Returns:
        Details of all the datablobs in a dataframe.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example get the details of available datablobs:

    ```python
    dbx = DataBlob.ls()
    Datablob.as_df(dbx)
    ```
    """
    db_lists = get_attributes_from_instances(dbx, DataBlob.ALL_DB_COLS)  # type: ignore

    for db in db_lists:
        db = DataBlob._get_tag_name_and_datasource_id(db)

    lists_df = generate_df(db_lists, DataBlob.BASIC_DB_COLS)
    df = add_ready_column(lists_df)

    df = df.rename(columns=DataBlob.COLS_TO_RENAME)

    return df

delete(self)

Delete a datablob from the server.

Returns:

Type Description
DataFrame

A pandas DataFrame encapsulating the details of the deleted datablob.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to delete a datablob from the server:

db.delete()
Source code in airt/client.py
@patch
def delete(self: DataBlob) -> pd.DataFrame:
    """Delete a datablob from the server.

    Returns:
        A pandas DataFrame encapsulating the details of the deleted datablob.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to delete a datablob from the server:

    ```python
    db.delete()
    ```
    """

    response = Client._delete_data(relative_url=f"/datablob/{self.id}")

    response = DataBlob._get_tag_name_and_datasource_id(response)

    df = pd.DataFrame([response])[DataBlob.BASIC_DB_COLS]

    df = df.rename(columns=DataBlob.COLS_TO_RENAME)

    return add_ready_column(df)

details(self)

Return details of a datablob.

Returns:

Type Description
DataFrame

The datablob details as a pandas dataframe.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to get details of a datablob from the server:

db.details()
Source code in airt/client.py
@patch
def details(self: DataBlob) -> pd.DataFrame:
    """Return details of a datablob.

    Returns:
        The datablob details as a pandas dataframe.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to get details of a datablob from the server:

    ```python
    db.details()
    ```
    """

    details = Client._get_data(relative_url=f"/datablob/{self.id}")

    details = DataBlob._get_tag_name_and_datasource_id(details)

    details_df = pd.DataFrame([details])[DataBlob.ALL_DB_COLS]

    details_df = details_df.rename(columns=DataBlob.COLS_TO_RENAME)

    return add_ready_column(details_df)

from_clickhouse(*, host, database, table, protocol, index_column, timestamp_column, port=0, username=None, password=None, filters=None, tag=None) staticmethod

Create and return a datablob that encapsulates the data from a ClickHouse database.

If the database requires authentication, pass the username/password as parameters or store it in the CLICKHOUSE_USERNAME and CLICKHOUSE_PASSWORD environment variables.

Parameters:

Name Type Description Default
host str

Remote database host name.

required
database str

Database name.

required
table str

Table name.

required
protocol str

Protocol to use (native/http).

required
index_column str

The column to use as index (row labels).

required
timestamp_column str

Timestamp column column name.

required
port int

Host port number. If not passed, then the default value 0 will be used.

0
username Optional[str]

Database username. If not passed, then the value set in the environment variable CLICKHOUSE_USERNAME will be used else the default value "root" will be used.

None
password Optional[str]

Database password. If not passed, then the value set in the environment variable CLICKHOUSE_PASSWORD will be used else the default value "" will be used.

None
filters Optional[Dict[str, Any]]

Any additional parameters to be used while importing the data as a dict.

None
tag Optional[str]

A string to tag the datablob. If not passed, then the tag latest will be assigned to the datablob.

None

Returns:

Type Description
DataBlob

An instance of the DataBlob class.

Exceptions:

Type Description
ValueError

If parameters to the API are invalid.

ConnectionError

If the server address is invalid or not reachable.

An example to create a Datablob from a database:

    db = DataBlob.from_clickhouse(
        host="host_name",
        database="database_name",
        table="table_name"
        protocol="native",
        index_column="index_column_name",
        timestamp_column="timestamp_column_name"
    )
Source code in airt/client.py
@staticmethod
def from_clickhouse(
    *,
    host: str,
    database: str,
    table: str,
    protocol: str,
    index_column: str,
    timestamp_column: str,
    port: int = 0,
    username: Optional[str] = None,
    password: Optional[str] = None,
    filters: Optional[Dict[str, Any]] = None,
    tag: Optional[str] = None,
) -> "DataBlob":
    """Create and return a datablob that encapsulates the data from a ClickHouse database.

    If the database requires authentication, pass the username/password as parameters or store it in
    the **CLICKHOUSE_USERNAME** and **CLICKHOUSE_PASSWORD** environment variables.

    Args:
        host: Remote database host name.
        database: Database name.
        table: Table name.
        protocol: Protocol to use (native/http).
        index_column: The column to use as index (row labels).
        timestamp_column: Timestamp column column name.
        port: Host port number. If not passed, then the default value **0** will be used.
        username: Database username. If not passed, then the value set in the environment variable
            **CLICKHOUSE_USERNAME** will be used else the default value "root" will be used.
        password: Database password. If not passed, then the value set in the environment variable
            **CLICKHOUSE_PASSWORD** will be used else the default value "" will be used.
        filters: Any additional parameters to be used while importing the data as a dict.
        tag: A string to tag the datablob. If not passed, then the tag **latest** will be assigned to the datablob.

    Returns:
       An instance of the `DataBlob` class.

    Raises:
        ValueError: If parameters to the API are invalid.
        ConnectionError: If the server address is invalid or not reachable.

    An example to create a Datablob from a database:

    ```python
        db = DataBlob.from_clickhouse(
            host="host_name",
            database="database_name",
            table="table_name"
            protocol="native",
            index_column="index_column_name",
            timestamp_column="timestamp_column_name"
        )
    ```
    """
    username = (
        username
        if username is not None
        else os.environ.get("CLICKHOUSE_USERNAME", "root")
    )

    password = (
        password
        if password is not None
        else os.environ.get("CLICKHOUSE_PASSWORD", "")
    )

    _body = dict(
        host=host,
        database=database,
        table=table,
        protocol=protocol,
        port=port,
        username=username,
        password=password,
        index_column=index_column,
        timestamp_column=timestamp_column,
        filters=filters,
        tag=tag,
    )

    response = Client._post_data(
        relative_url=f"/datablob/from_clickhouse", data=_body
    )

    return DataBlob(
        id=response["id"], type=response["type"], source=response["source"]
    )

from_csv(self, *, index_column, sort_by, deduplicate_data=False, blocksize='256MB', **kwargs)

Process the CSV data and return a datasource object.

Parameters:

Name Type Description Default
index_column str

The column to use as index (row labels).

required
sort_by Union[str, List[str]]

The column(s) to sort the data. Can either be a string or a sequence of strings.

required
deduplicate_data bool

If set to True (default value False), then duplicate rows are removed while uploading.

False
blocksize str

Data split size in bytes. If None, then the split size is set to 256MB.

'256MB'
kwargs

Any additional parameters to be used while processing the data.

{}

Returns:

Type Description
DataSource

An instance of the DataSource class.

Exceptions:

Type Description
ValueError

If the CSV file processing fails.

ConnectionError

If the server address is invalid or not reachable.

An example for processing a csv datablob:

    data_source_csv = DataBlob.from_csv(
        index_column="index_column_name",
        sort_by="sort_by_column_name"
    )
Source code in airt/client.py
@patch
def from_csv(
    self: DataBlob,
    *,
    index_column: str,
    sort_by: Union[str, List[str]],
    deduplicate_data: bool = False,
    blocksize: str = "256MB",
    **kwargs,
) -> DataSource:
    """Process the CSV data and return a datasource object.

    Args:
        index_column: The column to use as index (row labels).
        sort_by: The column(s) to sort the data. Can either be a string or a sequence of strings.
        deduplicate_data: If set to **True** (default value **False**), then duplicate rows are removed while uploading.
        blocksize: Data split size in bytes. If None, then the split size is set to **256MB**.
        kwargs: Any additional parameters to be used while processing the data.

    Returns:
        An instance of the `DataSource` class.

    Raises:
        ValueError: If the CSV file processing fails.
        ConnectionError: If the server address is invalid or not reachable.

    An example for processing a csv datablob:

    ```python
        data_source_csv = DataBlob.from_csv(
            index_column="index_column_name",
            sort_by="sort_by_column_name"
        )
    ```
    """
    data = dict(
        deduplicate_data=deduplicate_data,
        index_column=index_column,
        sort_by=sort_by,
        blocksize=blocksize,
        kwargs=kwargs,
    )
    response = Client._post_data(
        relative_url=f"/datablob/{self.id}/from_csv", data=data
    )

    return DataSource(id=response["id"])

from_local(path, tag=None, show_progress=True) staticmethod

Create and return a datablob from local file.

Parameters:

Name Type Description Default
path Union[str, pathlib.Path]

The relative or absolute path to a local CSV file or to a directory containing the CSV files.

required
tag Optional[str]

A string to tag the datablob. If not passed, then the tag latest will be assigned to the datablob.

None
show_progress Optional[bool]

Flag to set the progressbar visibility. If not passed, then the default value True will be used.

True

Returns:

Type Description
DataBlob

An instance of the DataBlob class.

Exceptions:

Type Description
ValueError

If parameters to the API are invalid.

ConnectionError

If the server address is invalid or not reachable.

An example to create a Datablob from a local file:

    db = DataBlob.from_local(path=Path('path-to-local-file'))
Source code in airt/client.py
@staticmethod
def from_local(
    path: Union[str, Path],
    tag: Optional[str] = None,
    show_progress: Optional[bool] = True,
) -> "DataBlob":
    """Create and return a datablob from local file.

    Args:
        path: The relative or absolute path to a local CSV file or to a directory containing the CSV files.
        tag: A string to tag the datablob. If not passed, then the tag **latest** will be assigned to the datablob.
        show_progress: Flag to set the progressbar visibility. If not passed, then the default value **True** will be used.

    Returns:
       An instance of the `DataBlob` class.

    Raises:
        ValueError: If parameters to the API are invalid.
        ConnectionError: If the server address is invalid or not reachable.

    An example to create a Datablob from a local file:

    ```python
        db = DataBlob.from_local(path=Path('path-to-local-file'))
    ```
    """
    path = Path(path)

    # Step 1: get presigned URL
    _path = f"local:{str(path)}"

    response = Client._post_data(
        relative_url=f"/datablob/from_local/start", data=dict(path=_path, tag=tag)
    )

    # Step 2: download the csv to the s3 bucket
    files = list(path.glob("*")) if path.is_dir() else [path]

    # Initiate progress bar
    t = tqdm(total=len(files), disable=not show_progress)

    for fname in files:
        with open(fname, "rb") as f:
            s3_response = requests.post(
                response["presigned"]["url"],
                files={"file": f},
                data=response["presigned"]["fields"],
            )

            if not s3_response.status_code == 204:
                raise ValueError(s3_response.text)

        t.update()

    t.close()
    return DataBlob(id=response["id"], type=response["type"])

from_mysql(*, host, database, table, port=3306, username=None, password=None, tag=None) staticmethod

Create and return a datablob that encapsulates the data from a mysql database.

If the database requires authentication, pass the username/password as parameters or store it in the AIRT_CLIENT_DB_USERNAME and AIRT_CLIENT_DB_PASSWORD environment variables.

Parameters:

Name Type Description Default
host str

Remote database host name.

required
database str

Database name.

required
table str

Table name.

required
port int

Host port number. If not passed, then the default value 3306 will be used.

3306
username Optional[str]

Database username. If not passed, then the value set in the environment variable AIRT_CLIENT_DB_USERNAME will be used else the default value "root" will be used.

None
password Optional[str]

Database password. If not passed, then the value set in the environment variable AIRT_CLIENT_DB_PASSWORD will be used else the default value "" will be used.

None
tag Optional[str]

A string to tag the datablob. If not passed, then the tag latest will be assigned to the datablob.

None

Returns:

Type Description
DataBlob

An instance of the DataBlob class.

Exceptions:

Type Description
ValueError

If parameters to the API are invalid.

ConnectionError

If the server address is invalid or not reachable.

An example to create a Datablob from a database:

    db = DataBlob.from_mysql(
        host="host_name",
        database="database_name",
        table="table_name"
    )
Source code in airt/client.py
@staticmethod
def from_mysql(
    *,
    host: str,
    database: str,
    table: str,
    port: int = 3306,
    username: Optional[str] = None,
    password: Optional[str] = None,
    tag: Optional[str] = None,
) -> "DataBlob":
    """Create and return a datablob that encapsulates the data from a mysql database.

    If the database requires authentication, pass the username/password as parameters or store it in
    the **AIRT_CLIENT_DB_USERNAME** and **AIRT_CLIENT_DB_PASSWORD** environment variables.

    Args:
        host: Remote database host name.
        database: Database name.
        table: Table name.
        port: Host port number. If not passed, then the default value **3306** will be used.
        username: Database username. If not passed, then the value set in the environment variable
            **AIRT_CLIENT_DB_USERNAME** will be used else the default value "root" will be used.
        password: Database password. If not passed, then the value set in the environment variable
            **AIRT_CLIENT_DB_PASSWORD** will be used else the default value "" will be used.
        tag: A string to tag the datablob. If not passed, then the tag **latest** will be assigned to the datablob.

    Returns:
       An instance of the `DataBlob` class.

    Raises:
        ValueError: If parameters to the API are invalid.
        ConnectionError: If the server address is invalid or not reachable.

    An example to create a Datablob from a database:

    ```python
        db = DataBlob.from_mysql(
            host="host_name",
            database="database_name",
            table="table_name"
        )
    ```
    """
    username = (
        username
        if username is not None
        else os.environ.get(CLIENT_DB_USERNAME, "root")
    )

    password = (
        password if password is not None else os.environ.get(CLIENT_DB_PASSWORD, "")
    )

    _body = dict(
        host=host,
        port=port,
        username=username,
        password=password,
        database=database,
        table=table,
        tag=tag,
    )

    response = Client._post_data(relative_url=f"/datablob/from_mysql", data=_body)

    return DataBlob(
        id=response["id"], type=response["type"], source=response["source"]
    )

from_parquet(self, *, index_column, sort_by, deduplicate_data=False, blocksize='256MB', **kwargs)

Process the parquet data and return a datasource object.

Parameters:

Name Type Description Default
index_column str

The column to use as index (row labels).

required
sort_by Union[str, List[str]]

The column(s) to sort the data. Can either be a string or a sequence of strings.

required
deduplicate_data bool

If set to True (default value False), then duplicate rows are removed while uploading.

False
blocksize str

Data split size in bytes. If None, then the split size is set to 256MB.

'256MB'
kwargs

Any additional parameters to be used while processing the data.

{}

Returns:

Type Description
DataSource

An instance of the DataSource class.

Exceptions:

Type Description
ValueError

If processing of the parquet file fails.

ConnectionError

If the server address is invalid or not reachable.

An example for processing a parquet datablob:

    data_source_parquet = DataBlob.from_parquet(
        index_column="index_column_name",
        sort_by="sort_by_column_name"
    )
Source code in airt/client.py
@patch
def from_parquet(
    self: DataBlob,
    *,
    index_column: str,
    sort_by: Union[str, List[str]],
    deduplicate_data: bool = False,
    blocksize: str = "256MB",
    **kwargs,
) -> DataSource:
    """Process the parquet data and return a datasource object.

    Args:
        index_column: The column to use as index (row labels).
        sort_by: The column(s) to sort the data. Can either be a string or a sequence of strings.
        deduplicate_data: If set to **True** (default value **False**), then duplicate rows are removed while uploading.
        blocksize: Data split size in bytes. If None, then the split size is set to **256MB**.
        kwargs: Any additional parameters to be used while processing the data.

    Returns:
        An instance of the `DataSource` class.

    Raises:
        ValueError: If processing of the parquet file fails.
        ConnectionError: If the server address is invalid or not reachable.

    An example for processing a parquet datablob:

    ```python
        data_source_parquet = DataBlob.from_parquet(
            index_column="index_column_name",
            sort_by="sort_by_column_name"
        )
    ```
    """
    data = dict(
        deduplicate_data=deduplicate_data,
        index_column=index_column,
        sort_by=sort_by,
        blocksize=blocksize,
        kwargs=kwargs,
    )
    response = Client._post_data(
        relative_url=f"/datablob/{self.id}/from_parquet", data=data
    )

    return DataSource(id=response["id"])

from_s3(*, uri, access_key=None, secret_key=None, tag=None) staticmethod

Create and return a datablob that encapsulates the data from an AWS S3 bucket.

Parameters:

Name Type Description Default
uri str

AWS S3 bucket uri.

required
access_key Optional[str]

Access key for the S3 bucket. If None (default value), then the value from AWS_ACCESS_KEY_ID environment variable is used.

None
secret_key Optional[str]

Secret key for the S3 bucket. If None (default value), then the value from AWS_SECRET_ACCESS_KEY environment variable is used.

None
tag Optional[str]

A string to tag the datablob. If not passed, then the tag latest will be assigned to the datablob.

None

Returns:

Type Description
DataBlob

An instance of the DataBlob class.

Exceptions:

Type Description
ValueError

If parameters to the API are invalid.

ConnectionError

If the server address is invalid or not reachable.

An example to create a Datablob from AWS s3 bucket:

    db = DataBlob.from_s3(
        uri="s3://bucket/events.parquet"
    )
Source code in airt/client.py
@staticmethod
def from_s3(
    *,
    uri: str,
    access_key: Optional[str] = None,
    secret_key: Optional[str] = None,
    tag: Optional[str] = None,
) -> "DataBlob":
    """Create and return a datablob that encapsulates the data from an AWS S3 bucket.

    Args:
        uri: AWS S3 bucket uri.
        access_key: Access key for the S3 bucket. If **None** (default value), then the value
            from **AWS_ACCESS_KEY_ID** environment variable is used.
        secret_key: Secret key for the S3 bucket. If **None** (default value), then the value
            from **AWS_SECRET_ACCESS_KEY** environment variable is used.
        tag: A string to tag the datablob. If not passed, then the tag **latest** will be assigned to the datablob.

    Returns:
        An instance of the `DataBlob` class.

    Raises:
        ValueError: If parameters to the API are invalid.
        ConnectionError: If the server address is invalid or not reachable.

    An example to create a Datablob from AWS s3 bucket:

    ```python
        db = DataBlob.from_s3(
            uri="s3://bucket/events.parquet"
        )
    ```
    """
    access_key = (
        access_key if access_key is not None else os.environ["AWS_ACCESS_KEY_ID"]
    )
    secret_key = (
        secret_key
        if secret_key is not None
        else os.environ["AWS_SECRET_ACCESS_KEY"]
    )

    response = Client._post_data(
        relative_url="/datablob/from_s3",
        data=dict(uri=uri, access_key=access_key, secret_key=secret_key, tag=tag),
    )

    return DataBlob(
        id=response["id"], type=response["type"], source=response["source"]
    )

is_ready(self)

Check if the method's progress is complete.

Info

This method will check the progress only if the datablob is created using from_s3 or from_mysql methods.

Returns:

Type Description
bool

True if the progress if completed, else False.

db.is_ready()
Source code in airt/client.py
def is_ready(self) -> bool:
    """Check if the method's progress is complete.

    !!! info

        This method will check the progress only if the datablob is created using `from_s3`
        or `from_mysql` methods.

    Returns:
        **True** if the progress if completed, else **False**.

    ```python

    db.is_ready()
    ```
    """
    if self.type in ["local"]:
        return True

    progress_status = ProgressStatus(relative_url=f"/datablob/{self.id}")

    return progress_status.is_ready()

ls(offset=0, limit=100, disabled=False, completed=False) staticmethod

Return the list of DataBlob instances available in the server.

Parameters:

Name Type Description Default
offset int

The number of datablobs to offset at the beginning. If None, then the default value 0 will be used.

0
limit int

The maximum number of datablobs to return from the server. If None, then the default value 100 will be used.

100
disabled bool

If set to True, then only the deleted datablobs will be returned. Else, the default value False will be used to return only the list of active datablobs.

False
completed bool

If set to True, then only the datablobs that are successfully downloaded to the server will be returned. Else, the default value False will be used to return all the datablobs.

False

Returns:

Type Description
List[DataBlob]

A list of DataBlob instances available in the server.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to list the available datablobs:

DataBlob.ls()
Source code in airt/client.py
@staticmethod
def ls(
    offset: int = 0,
    limit: int = 100,
    disabled: bool = False,
    completed: bool = False,
) -> List["DataBlob"]:
    """Return the list of DataBlob instances available in the server.

    Args:
        offset: The number of datablobs to offset at the beginning. If **None**,
            then the default value **0** will be used.
        limit: The maximum number of datablobs to return from the server. If **None**,
            then the default value **100** will be used.
        disabled: If set to **True**, then only the deleted datablobs will be returned.
            Else, the default value **False** will be used to return only the list
            of active datablobs.
        completed: If set to **True**, then only the datablobs that are successfully downloaded
            to the server will be returned. Else, the default value **False** will be used to
            return all the datablobs.

    Returns:
        A list of DataBlob instances available in the server.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to list the available datablobs:

    ```python
    DataBlob.ls()
    ```
    """
    lists = Client._get_data(
        relative_url=f"/datablob/?disabled={disabled}&completed={completed}&offset={offset}&limit={limit}"
    )

    dbx = [
        DataBlob(
            id=db["id"],
            type=db["type"],
            source=db["source"],
            datasources=db["datasources"],
            total_steps=db["total_steps"],
            completed_steps=db["completed_steps"],
            folder_size=db["folder_size"],
            disabled=db["disabled"],
            pulled_on=db["pulled_on"],
            user_id=db["user_id"],
            tags=db["tags"],
            error=db["error"],
        )
        for db in lists
    ]

    return dbx

progress_bar(self, sleep_for=5, timeout=0)

Blocks the execution and displays a progress bar showing the remote action progress.

Info

This method will check the progress only if the datablob is created using from_s3 or from_mysql methods.

Parameters:

Name Type Description Default
sleep_for Union[int, float]

The time interval in seconds between successive API calls.

5
timeout int

The maximum time allowed in seconds for the asynchronous call to complete. If not the progressbar will be terminated.

0

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

TimeoutError

in case of connection timeout.

db.progress_bar()
Source code in airt/client.py
def progress_bar(self, sleep_for: Union[int, float] = 5, timeout: int = 0):
    """Blocks the execution and displays a progress bar showing the remote action progress.

    !!! info

        This method will check the progress only if the datablob is created using `from_s3`
        or `from_mysql` methods.

    Args:
        sleep_for: The time interval in seconds between successive API calls.
        timeout: The maximum time allowed in seconds for the asynchronous call to complete. If not the
            progressbar will be terminated.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.
        TimeoutError: in case of connection timeout.

    ```python

    db.progress_bar()
    ```
    """
    if self.type not in ["local"]:
        progress_status = ProgressStatus(
            relative_url=f"/datablob/{self.id}",
            sleep_for=sleep_for,
            timeout=timeout,
        )

        progress_status.progress_bar()

tag(self, name)

Tag an existing datablob in the server.

Parameters:

Name Type Description Default
name str

A string to tag the datablob.

required

Returns:

Type Description
DataFrame

A pandas dataframe with the details of the tagged datablob.

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

An example to tag an existing datablob:

db.tag(name="v1.0")
Source code in airt/client.py
@patch
def tag(self: DataBlob, name: str) -> pd.DataFrame:
    """Tag an existing datablob in the server.

    Args:
        name: A string to tag the datablob.

    Returns:
        A pandas dataframe with the details of the tagged datablob.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.

    An example to tag an existing datablob:

    ```python
    db.tag(name="v1.0")
    ```
    """
    response = Client._post_data(
        relative_url=f"/datablob/{self.id}/tag", data=dict(name=name)
    )

    response = DataBlob._get_tag_name_and_datasource_id(response)

    df = pd.DataFrame([response])[DataBlob.BASIC_DB_COLS]

    df = df.rename(columns=DataBlob.COLS_TO_RENAME)

    return add_ready_column(df)

wait(self, sleep_for=1, timeout=0)

Blocks execution while waiting for the remote action to complete.

Info

This method will check the progress only if the datablob is created using from_s3 or from_mysql methods.

Parameters:

Name Type Description Default
sleep_for Union[int, float]

The time interval in seconds between successive API calls.

1
timeout int

The maximum time allowed in seconds for the asynchronous call to complete. If not the progressbar will be terminated.

0

Exceptions:

Type Description
ConnectionError

If the server address is invalid or not reachable.

TimeoutError

in case of timeout.

db.wait()
Source code in airt/client.py
def wait(self, sleep_for: Union[int, float] = 1, timeout: int = 0):
    """Blocks execution while waiting for the remote action to complete.

    !!! info

        This method will check the progress only if the datablob is created using `from_s3`
        or `from_mysql` methods.

    Args:
        sleep_for: The time interval in seconds between successive API calls.
        timeout: The maximum time allowed in seconds for the asynchronous call to complete. If not the
            progressbar will be terminated.

    Raises:
        ConnectionError: If the server address is invalid or not reachable.
        TimeoutError: in case of timeout.

    ```python

    db.wait()
    ```
    """
    if self.type not in ["local"]:
        progress_status = ProgressStatus(
            relative_url=f"/datablob/{self.id}",
            sleep_for=sleep_for,
            timeout=timeout,
        )

        progress_status.wait()
Back to top