Skip to content

Client

ClientFile dataclass

ClientFile(abspath: str, relpath: str, state: Literal['ready', 'pending', 'failed', 'transferring', 'done', 'valid', 'invalid'], retries: int, max_retries: int, message: str | None = None, checksum: str | None = None)

Representation of a file potentially intended for transfer.

checksum class-attribute instance-attribute

checksum: str | None = None

CRC32 bytes, base64-encoded, for example 'yH2GHg==', if provided

max_retries instance-attribute

max_retries: int

How many times may we retry transfer

message class-attribute instance-attribute

message: str | None = None

Details, if any, about failure / invalidity / etc.

retries instance-attribute

retries: int

How many times have we attempted to retry a failed transfer

state instance-attribute

state: Literal['ready', 'pending', 'failed', 'transferring', 'done', 'valid', 'invalid']
Definitions of values

ready: file has not yet been transferred to S3 or queued for transfer, but we plan to transfer it. pending: file is queued for transfer to S3. failed: file failed to transfer to S3 due to network or other error. This state does not indicate validation failure: a 'failed' file is not available to the validator at all. If retries < max_retries, the file is eligible for another attempt at transfer. transferring: file is currently transferring to S3. done: file has been transferred to S3 but not yet validated. valid: file has been marked as present and valid by the validator. invalid: file has been marked as missing/invalid by the validator, or the validation process appears to be irregular/invalid (e.g., the validator reports it valid despite us not transferring it).

DataRoot

DataRoot(source: Path | Bucket)

Compatibility layer for S3 -> S3 and local -> S3 operations.

cp

cp(source: ClientFile, destination_bucket: Bucket) -> str | None

Upload a file from local to S3, or initiate S3-to-S3 object copy.

ls

ls(key: str | Path | None = None) -> tuple[str, ...]

List the contents of a directory or prefix.

NothingToDoError

Bases: ValueError

all files already validated.

UploadClient

UploadClient(*, dataset: str, delivery_id: str, file_index: DataFrame, transfer_type: TransferType, source: str | Path | Bucket, lambda_client_config: dict | None = None, own_agent_id: str | None = None, n_threads: int = 1, debug: bool = False, max_retries: int = 2)

Manager class for data provider-side portions of file transfer and validation pipeline. Calls lambda to start the validation pipeline, talks to the validation pipeline via S3, uploads / transfers files, informs user about things.

done property

done: bool

Have we either finished our work or (possibly prematurely) otherwise entered a terminal state?

n_complete property

n_complete: int

How many files have either been transferred and undergone validation or entirely failed to transfer after max retries?

next_file property

next_file: ClientFile | None

Next file available for transfer.

transfer_complete property

transfer_complete: bool

Are we totally done with our transfer process (but possibly still awaiting validation)?

validation_complete property

validation_complete: bool

Do we have nothing left to do at all -- i.e., all files have either successfully transferred and undergone validation or failed to transfer after max retries?

acquire_lock

acquire_lock(*, refresh: bool = False) -> bool

Attempt to acquire transfer lock ('owned' object in control bucket). Raises an exception if lock cannot be acquired. If refresh is True, write the lock file even if already held.

cmessage

cmessage(text: str, mtype: Literal['warning', 'error', 'info', 'complete', 'success']) -> None

Print a message in a predefined message-category color.

cognito_authenticate

cognito_authenticate() -> None

Retrieve tokens/creds from Cognito for AWS operations.

cprint

cprint(renderable: str, *, padded: bool = True) -> None

Print a message to our virtual console.

crash

crash(exception: BaseException) -> None

Call if client has entered an invalid state ('crashed'). Logs the crash to S3 if possible, informs the user about the crash, and quits.

dump_state

dump_state() -> None

Dump transfer/validation state to working directory as a CSV file.

excformat

excformat(exc: BaseException) -> str

Format an exception for printing / logging.

initiate_transfer

initiate_transfer() -> None

Attempt to start the validation pipeline.

quit

quit(_exception: BaseException | None = None) -> None

Call if client has entered a stopped state, finished or not. Logs the exit to S3 (if possible), stops all polling resources, releases lock if relevant, and marks state as quit. Note that the object should not be restarted. Construct a new UploadClient if you wish to fully 'reboot' within a single interpreter session.

release_lock

release_lock() -> None

Release the transfer lock. Should generally only be called from self.quit(); a running client should always hold the lock.

transfer_next_file

transfer_next_file() -> None | ClientFile

If possible, queue the next file for transfer. If there are no more files to transfer, print a warning and do nothing. If the client has crashed or is not yet ready/connected, print an error and raise an exception.

update

update() -> bool

Update the state of the client. Returns True if the update itself indicates that the client should stop, False otherwise.

Prints a warning and does nothing if the client is not running or has crashed.

Otherwise, calls self.reader.update() and acts on any new information found in the log. If self.reader.update() fails, crashes the client. Updates any files marked valid or invalid by the validator. If the validator has prematurely stopped or validation is complete, initiates quit sequence.

UploadClientError

UploadClientError(*args: Any)

Bases: ClientError

alias for botocore ClientError, which does a bunch of complicated stuff with running botocore operations when initialized, so is very inconvenient to just spuriously instantiate