Core
Core objects for the FAST validation server. In normal operation, these expect to be running in an isolated remote environment, communicating asynchronously via "log" objects in an S3 directory bucket. They are not intended for local use. See the validation commands in upload.cli, or their Pythonic interfaces in validation, for local alternatives.
ValidationManager
ValidationManager(bucket: Bucket, index: FileIndex, label: Label, *, n_threads: int = 1)
Helper class for managing validation worker threads. Should typically only
be used by ValidationSession.
queue_validation
queue_validation(keys: Sequence[str]) -> None
Check whether a set of uploaded keys are valid targets for validation, and, if so, submit them to the thread pool for validation. Should typically only be called from ValidationSession._pipe_loop_inner() in response to a client message that it has successfully uploaded more files.
update_validation_results
update_validation_results() -> dict[str, Any]
Check our validation tasks to see if any have completed. Return a dict like {object_key: validation_result} for each completed task. Also remove any completed tasks from self.futures.
ValidationSession
ValidationSession(vstate: ValidationState, manager: ValidationManager, logger: S3TSVWriter, settings: ValPipeSettings, identifiers: ValIdent, index: FileIndex, label: Label)
Top-level handler object for the validation pipeline.
Caution
ValidationSession.from_launch_parameters() should typically be preferred to ValidationSession.init(). Call __init___() directly only if you have a special reason to manually construct one or more of the support objects it accepts as arguments.
agent_id
instance-attribute
agent_id: str
randomly-generated identifier for this particular session
az_id
instance-attribute
az_id: str
AWS Availability Zone ID of our control bucket (and ideally also whatever we're running on)
cb_name
instance-attribute
cb_name: str
name of associated control bucket
crashed
property
crashed: bool
Does the pipeline appear to have encountered an unhandled exception?
dataset
instance-attribute
dataset: str
name of dataset
delivery_id
instance-attribute
delivery_id: str
id of specific delivery
keepalive_threshold
instance-attribute
keepalive_threshold: float
After how many seconds without writing any sort of message (keepalive or otherwise) should we write a keepalive message to our log?
last_lock_timestamp
class-attribute
instance-attribute
last_lock_timestamp: float | None = None
When was the last time we wrote a lock (if at all)?
lock_staleness_threshold
class-attribute
instance-attribute
lock_staleness_threshold: int = 3600
Default staleness threshold for locks (can be overridden by a value in fetched netconf_params).
loop_rate
instance-attribute
loop_rate: float
How many seconds should we wait between iterations of the main loop (including polling validation threads and the client log?)
missing_timeout
instance-attribute
missing_timeout: float
how many seconds can elapse between messages from the client before we start getting suspicious?
n_val_threads
instance-attribute
n_val_threads: int
How many threads can we spawn at once for file validations?
running
property
running: bool
Have we actually launched the pipeline, and is it still going?
tb_name
instance-attribute
tb_name: str
name of associated transfer bucket
transfer_timeout
instance-attribute
transfer_timeout: float
how many seconds can elapse between messages from the client before we decide they've stopped without telling us?
transfer_type
instance-attribute
transfer_type: TransferType
is this a staging or sample transfer?
acquire_lock
acquire_lock(*, refresh: bool = False, category: str = 'init') -> tuple[bool | dict, Exception | None]
Attempt to acquire the lock object in order to make sure another instance of the pipeline isn't running for this dataset and delivery. If refresh is True, write the lock file even if already held.
Returns:
-
result(bool | dict) –True if successful, dict giving reason for failure if not, formatted for logging
-
exception(Exception | None) –None if successful, encountered exception if not
from_launch_parameters
classmethod
from_launch_parameters(dataset: str, delivery_id: str, transfer_type: TransferType, settings: ValPipeSettings = MPt(conf.VAL_PIPE_SETTINGS), cb_name: str | None = None, tb_name: str | None = None) -> ValidationSession
Constructor for ValidationSession. Accepts simple string arguments that, in normal system operation, will be known to and can easily be remotely passed by the pipeline launch script. Also performs some basic validation of system state (e.g. label match between accepted and user-submitted labels).
Should typically be preferred to directly calling ValidationSession.init().
Parameters:
-
dataset(str) –dataset from the label
-
delivery_id(str) –delivery_id from the label
-
transfer_type(TransferType) –"sample" or "staging" as appropriate
-
settings(ValPipeSettings, default:MappingProxyType(VAL_PIPE_SETTINGS)) –ValPipeSettings
-
cb_name(str | None, default:None) –Name of the control bucket, or None (default). If None, uses standard name construction rules.
-
tb_name(str | None, default:None) –Name of the transfer bucket, or None (default). If None, uses standard name construction rules.
Returns:
-
ValidationSession–A ValidationSession initialized from passed parameters
release_lock
release_lock() -> None
Release the lock object if held. Does nothing if lock is not held.
add_filetype_classification
add_filetype_classification(typename: str, filetype: Filetype, index: DataFrame, *, missing_filetypes_ok: bool) -> pd.DataFrame
Helper function for parse_index_file(). Return a copy of index with its
'type' column populated with typename where a file matches the naming
pattern defined in filetype. Raise an exception if the filetype is
entirely absent in the index, or if its naming pattern collides with an
already-populated filetype's.
load_data
load_data(filetype_spec: Filetype, key: str, bucket: str | Bucket | None = None, *, debug: bool = False) -> tuple[ParquetFile | AsdfFile | HDUList | None, dict | None]
Load an object into memory using the method defined for its standard.
Returns:
-
data(ParquetFile | AsdfFile | HDUList | None) –ParquetFile, AsdfFile, or HDUList as appropriate. Or, if loading fails, None.
-
exception_report(dict | None) –None if loading succeeds. Information on exception if loading fails.
parse_index_file
parse_index_file(file: str | Path | IOBase, label: Label) -> FileIndex
Validate and preprocess a client-uploaded index file, adding filetypes from a label. Primarily intended as a helper function for ValidationSession._init_launch_objs().
validate_file
validate_file(key: str, bucket: Bucket, *, checksum: str | None, spec: Filetype | None) -> dict[str, str | dict | None]
Handler function for individual file validation. Checks that the object is actually present; if it is and the defined filetype implies that data-level validation should be performed, perform that validation. Return a report properly formatted for use by ValidationManager, whether the validation succeeded, failed, or crashed. In normal operation, this is intended to be submitted by ValidationSession to its thread manager for aysnchronous execution.
validate_file_content
validate_file_content(bucket: str | Bucket | None, key: str, *, spec: Filetype, debug: bool = False) -> dict
Perform data-level validation as specified by filetype. Intended primarily to be called by validate_file().
this function should only be called if we are performing
some task that requires actually loading some of the contents of a file, even if it's just validating the basic file format (e.g. 'is this valid FITS?')
Returns:
-
dict–dict like {'status': status, 'message': message}.