Skip to content

Core

Core objects for the FAST validation server. In normal operation, these expect to be running in an isolated remote environment, communicating asynchronously via "log" objects in an S3 directory bucket. They are not intended for local use. See the validation commands in upload.cli, or their Pythonic interfaces in validation, for local alternatives.

ValidationManager

ValidationManager(bucket: Bucket, index: FileIndex, label: Label, *, n_threads: int = 1)

Helper class for managing validation worker threads. Should typically only be used by ValidationSession.

queue_validation

queue_validation(keys: Sequence[str]) -> None

Check whether a set of uploaded keys are valid targets for validation, and, if so, submit them to the thread pool for validation. Should typically only be called from ValidationSession._pipe_loop_inner() in response to a client message that it has successfully uploaded more files.

update_validation_results

update_validation_results() -> dict[str, Any]

Check our validation tasks to see if any have completed. Return a dict like {object_key: validation_result} for each completed task. Also remove any completed tasks from self.futures.

ValidationSession

ValidationSession(vstate: ValidationState, manager: ValidationManager, logger: S3TSVWriter, settings: ValPipeSettings, identifiers: ValIdent, index: FileIndex, label: Label)

Top-level handler object for the validation pipeline.

Caution

ValidationSession.from_launch_parameters() should typically be preferred to ValidationSession.init(). Call __init___() directly only if you have a special reason to manually construct one or more of the support objects it accepts as arguments.

agent_id instance-attribute

agent_id: str

randomly-generated identifier for this particular session

az_id instance-attribute

az_id: str

AWS Availability Zone ID of our control bucket (and ideally also whatever we're running on)

cb_name instance-attribute

cb_name: str

name of associated control bucket

crashed property

crashed: bool

Does the pipeline appear to have encountered an unhandled exception?

dataset instance-attribute

dataset: str

name of dataset

delivery_id instance-attribute

delivery_id: str

id of specific delivery

keepalive_threshold instance-attribute

keepalive_threshold: float

After how many seconds without writing any sort of message (keepalive or otherwise) should we write a keepalive message to our log?

last_lock_timestamp class-attribute instance-attribute

last_lock_timestamp: float | None = None

When was the last time we wrote a lock (if at all)?

lock_staleness_threshold class-attribute instance-attribute

lock_staleness_threshold: int = 3600

Default staleness threshold for locks (can be overridden by a value in fetched netconf_params).

loop_rate instance-attribute

loop_rate: float

How many seconds should we wait between iterations of the main loop (including polling validation threads and the client log?)

missing_timeout instance-attribute

missing_timeout: float

how many seconds can elapse between messages from the client before we start getting suspicious?

n_val_threads instance-attribute

n_val_threads: int

How many threads can we spawn at once for file validations?

running property

running: bool

Have we actually launched the pipeline, and is it still going?

tb_name instance-attribute

tb_name: str

name of associated transfer bucket

transfer_timeout instance-attribute

transfer_timeout: float

how many seconds can elapse between messages from the client before we decide they've stopped without telling us?

transfer_type instance-attribute

transfer_type: TransferType

is this a staging or sample transfer?

acquire_lock

acquire_lock(*, refresh: bool = False, category: str = 'init') -> tuple[bool | dict, Exception | None]

Attempt to acquire the lock object in order to make sure another instance of the pipeline isn't running for this dataset and delivery. If refresh is True, write the lock file even if already held.

Returns:

  • result ( bool | dict ) –

    True if successful, dict giving reason for failure if not, formatted for logging

  • exception ( Exception | None ) –

    None if successful, encountered exception if not

from_launch_parameters classmethod

from_launch_parameters(dataset: str, delivery_id: str, transfer_type: TransferType, settings: ValPipeSettings = MPt(conf.VAL_PIPE_SETTINGS), cb_name: str | None = None, tb_name: str | None = None) -> ValidationSession

Constructor for ValidationSession. Accepts simple string arguments that, in normal system operation, will be known to and can easily be remotely passed by the pipeline launch script. Also performs some basic validation of system state (e.g. label match between accepted and user-submitted labels).

Should typically be preferred to directly calling ValidationSession.init().

Parameters:

  • dataset (str) –

    dataset from the label

  • delivery_id (str) –

    delivery_id from the label

  • transfer_type (TransferType) –

    "sample" or "staging" as appropriate

  • settings (ValPipeSettings, default: MappingProxyType(VAL_PIPE_SETTINGS) ) –

    ValPipeSettings

  • cb_name (str | None, default: None ) –

    Name of the control bucket, or None (default). If None, uses standard name construction rules.

  • tb_name (str | None, default: None ) –

    Name of the transfer bucket, or None (default). If None, uses standard name construction rules.

Returns:

release_lock

release_lock() -> None

Release the lock object if held. Does nothing if lock is not held.

add_filetype_classification

add_filetype_classification(typename: str, filetype: Filetype, index: DataFrame, *, missing_filetypes_ok: bool) -> pd.DataFrame

Helper function for parse_index_file(). Return a copy of index with its 'type' column populated with typename where a file matches the naming pattern defined in filetype. Raise an exception if the filetype is entirely absent in the index, or if its naming pattern collides with an already-populated filetype's.

load_data

load_data(filetype_spec: Filetype, key: str, bucket: str | Bucket | None = None, *, debug: bool = False) -> tuple[ParquetFile | AsdfFile | HDUList | None, dict | None]

Load an object into memory using the method defined for its standard.

Returns:

  • data ( ParquetFile | AsdfFile | HDUList | None ) –

    ParquetFile, AsdfFile, or HDUList as appropriate. Or, if loading fails, None.

  • exception_report ( dict | None ) –

    None if loading succeeds. Information on exception if loading fails.

parse_index_file

parse_index_file(file: str | Path | IOBase, label: Label) -> FileIndex

Validate and preprocess a client-uploaded index file, adding filetypes from a label. Primarily intended as a helper function for ValidationSession._init_launch_objs().

validate_file

validate_file(key: str, bucket: Bucket, *, checksum: str | None, spec: Filetype | None) -> dict[str, str | dict | None]

Handler function for individual file validation. Checks that the object is actually present; if it is and the defined filetype implies that data-level validation should be performed, perform that validation. Return a report properly formatted for use by ValidationManager, whether the validation succeeded, failed, or crashed. In normal operation, this is intended to be submitted by ValidationSession to its thread manager for aysnchronous execution.

validate_file_content

validate_file_content(bucket: str | Bucket | None, key: str, *, spec: Filetype, debug: bool = False) -> dict

Perform data-level validation as specified by filetype. Intended primarily to be called by validate_file().

this function should only be called if we are performing

some task that requires actually loading some of the contents of a file, even if it's just validating the basic file format (e.g. 'is this valid FITS?')

Returns:

  • dict

    dict like {'status': status, 'message': message}.