Client
ClientFile
dataclass
ClientFile(abspath: str, relpath: str, state: Literal['ready', 'pending', 'failed', 'transferring', 'done', 'valid', 'invalid'], retries: int, max_retries: int, message: str | None = None, checksum: str | None = None)
Representation of a file potentially intended for transfer.
checksum
class-attribute
instance-attribute
checksum: str | None = None
CRC32 bytes, base64-encoded, for example 'yH2GHg==', if provided
max_retries
instance-attribute
max_retries: int
How many times may we retry transfer
message
class-attribute
instance-attribute
message: str | None = None
Details, if any, about failure / invalidity / etc.
retries
instance-attribute
retries: int
How many times have we attempted to retry a failed transfer
state
instance-attribute
state: Literal['ready', 'pending', 'failed', 'transferring', 'done', 'valid', 'invalid']
Definitions of values
ready: file has not yet been transferred to S3 or queued for transfer, but we plan to transfer it. pending: file is queued for transfer to S3. failed: file failed to transfer to S3 due to network or other error. This state does not indicate validation failure: a 'failed' file is not available to the validator at all. If retries < max_retries, the file is eligible for another attempt at transfer. transferring: file is currently transferring to S3. done: file has been transferred to S3 but not yet validated. valid: file has been marked as present and valid by the validator. invalid: file has been marked as missing/invalid by the validator, or the validation process appears to be irregular/invalid (e.g., the validator reports it valid despite us not transferring it).
DataRoot
DataRoot(source: Path | Bucket)
Compatibility layer for S3 -> S3 and local -> S3 operations.
cp
cp(source: ClientFile, destination_bucket: Bucket) -> str | None
Upload a file from local to S3, or initiate S3-to-S3 object copy.
ls
ls(key: str | Path | None = None) -> tuple[str, ...]
List the contents of a directory or prefix.
NothingToDoError
Bases: ValueError
all files already validated.
UploadClient
UploadClient(*, dataset: str, delivery_id: str, file_index: DataFrame, transfer_type: TransferType, source: str | Path | Bucket, lambda_client_config: dict | None = None, own_agent_id: str | None = None, n_threads: int = 1, debug: bool = False, max_retries: int = 2)
Manager class for data provider-side portions of file transfer and validation pipeline. Calls lambda to start the validation pipeline, talks to the validation pipeline via S3, uploads / transfers files, informs user about things.
done
property
done: bool
Have we either finished our work or (possibly prematurely) otherwise entered a terminal state?
n_complete
property
n_complete: int
How many files have either been transferred and undergone validation or entirely failed to transfer after max retries?
next_file
property
next_file: ClientFile | None
Next file available for transfer.
transfer_complete
property
transfer_complete: bool
Are we totally done with our transfer process (but possibly still awaiting validation)?
validation_complete
property
validation_complete: bool
Do we have nothing left to do at all -- i.e., all files have either successfully transferred and undergone validation or failed to transfer after max retries?
acquire_lock
acquire_lock(*, refresh: bool = False) -> bool
Attempt to acquire transfer lock ('owned' object in control bucket). Raises an exception if lock cannot be acquired. If refresh is True, write the lock file even if already held.
cmessage
cmessage(text: str, mtype: Literal['warning', 'error', 'info', 'complete', 'success']) -> None
Print a message in a predefined message-category color.
cognito_authenticate
cognito_authenticate() -> None
Retrieve tokens/creds from Cognito for AWS operations.
cprint
cprint(renderable: str, *, padded: bool = True) -> None
Print a message to our virtual console.
crash
crash(exception: BaseException) -> None
Call if client has entered an invalid state ('crashed'). Logs the crash to S3 if possible, informs the user about the crash, and quits.
dump_state
dump_state() -> None
Dump transfer/validation state to working directory as a CSV file.
excformat
excformat(exc: BaseException) -> str
Format an exception for printing / logging.
initiate_transfer
initiate_transfer() -> None
Attempt to start the validation pipeline.
quit
quit(_exception: BaseException | None = None) -> None
Call if client has entered a stopped state, finished or not. Logs the exit to S3 (if possible), stops all polling resources, releases lock if relevant, and marks state as quit. Note that the object should not be restarted. Construct a new UploadClient if you wish to fully 'reboot' within a single interpreter session.
release_lock
release_lock() -> None
Release the transfer lock. Should generally only be called from self.quit(); a running client should always hold the lock.
transfer_next_file
transfer_next_file() -> None | ClientFile
If possible, queue the next file for transfer. If there are no more files to transfer, print a warning and do nothing. If the client has crashed or is not yet ready/connected, print an error and raise an exception.
update
update() -> bool
Update the state of the client. Returns True if the update itself indicates that the client should stop, False otherwise.
Prints a warning and does nothing if the client is not running or has crashed.
Otherwise, calls self.reader.update() and acts on any new information found in the log. If self.reader.update() fails, crashes the client. Updates any files marked valid or invalid by the validator. If the validator has prematurely stopped or validation is complete, initiates quit sequence.
UploadClientError
UploadClientError(*args: Any)
Bases: ClientError
alias for botocore ClientError, which does a bunch of complicated stuff with running botocore operations when initialized, so is very inconvenient to just spuriously instantiate