ccat_data_transfer package
Subpackages
Submodules
ccat_data_transfer.ccat_data_transfer module
ccat_data_transfer.transfer_manager module
- class ccat_data_transfer.transfer_manager.DataTransferTask
Bases:
CCATEnhancedSQLAlchemyTaskBase class for data transfer tasks.
- __init__()
- get_retry_count(session, data_transfer_id)
Get current retry count for this data transfer operation.
- reset_state_on_failure(session, data_transfer_id, exc)
Reset data transfer state for retry.
- mark_permanent_failure(session, data_transfer_id, exc)
Mark data transfer as permanently failed.
- get_operation_info(args, kwargs)
Get additional context for data transfer tasks.
- on_failure(exc, task_id, args, kwargs, einfo)
Handle task failure with recovery for specific error cases.
- ignore_result = False
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- priority = None
Default task priority.
- rate_limit = None
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>
Task request stack, the current request will be the topmost.
- serializer = 'json'
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transfer.transfer_manager.transfer_transfer_packages(verbose: bool = False, session: Session = None) None
Find not yet transferred data transfer packages and schedule their transfer.
- Parameters:
verbose (bool, optional) – If True, sets the logging level to DEBUG. Default is False.
- Return type:
None
Notes
Updates the logging level if verbose is True.
Retrieves pending data transfers from the database.
Schedules Celery tasks for file transfers.
Updates data transfer statuses in the database.
Logs information about the transfer process.
Handles database errors and unexpected exceptions.
ccat_data_transfer.data_integrity_manager module
- class ccat_data_transfer.data_integrity_manager.UnpackTask
Bases:
CCATEnhancedSQLAlchemyTaskBase class for unpacking tasks.
- __init__()
- get_retry_count(session, data_transfer_id)
Get current retry count for this unpack operation.
- reset_state_on_failure(session, data_transfer_id, exc)
Reset unpack state for retry.
- mark_permanent_failure(session, data_transfer_id, exc)
Mark unpack as permanently failed.
- get_operation_info(args, kwargs)
Get additional context for unpack tasks.
- ignore_result = False
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- priority = None
Default task priority.
- rate_limit = None
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>
Task request stack, the current request will be the topmost.
- serializer = 'json'
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transfer.data_integrity_manager.unpack_and_verify_files(verbose: bool = False, session: Session = None) None
Unpack transferred files and verify their xxHash checksums.
This function retrieves all completed data transfers that are pending unpacking, and schedules Celery tasks to unpack and verify each package.
- Parameters:
verbose (bool, optional) – If True, sets logging level to DEBUG. Default is False.
session (Session, optional) – An existing database session to use. If None, a new session will be created.
- Return type:
None
- Raises:
SQLAlchemyError – If there’s an issue with database operations.
ccat_data_transfer.archive_manager module
- class ccat_data_transfer.archive_manager.LongTermArchiveTask
Bases:
CCATEnhancedSQLAlchemyTaskBase class for long term archive tasks.
- __init__()
- get_retry_count(session, operation_id)
Get current retry count for this operation.
- reset_state_on_failure(session, long_term_archive_transfer_id, exc)
Reset long term archive transfer state for retry.
- mark_permanent_failure(session, long_term_archive_transfer_id, exc)
Mark long term archive transfer as permanently failed.
- get_operation_info(args, kwargs)
Get additional context for long term archive tasks.
- ignore_result = False
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- priority = None
Default task priority.
- rate_limit = None
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>
Task request stack, the current request will be the topmost.
- serializer = 'json'
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transfer.archive_manager.transfer_raw_data_packages_to_long_term_archive(verbose: bool = False, site_name: str | None = None) None
Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system.
- Parameters:
verbose (bool) – If True, sets logging to DEBUG level. Defaults to False.
site_name (Optional[str]) – If provided, only schedules transfers for the specified site.
- Raises:
SQLAlchemyError – If there’s an issue with database operations.
ccat_data_transfer.deletion_manager module
Deletion Manager for CCAT Data Transfer System
This module implements deletion logic for RawDataFiles, RawDataPackages, and DataTransferPackages across all DataLocations (SOURCE, BUFFER, LONG_TERM_ARCHIVE, PROCESSING) using the new Site/DataLocation architecture and automatic queue discovery system. It ensures that data is only deleted when safely archived in LTA, retention periods and disk thresholds are respected, and processing files are kept as long as needed by active staging jobs.
The deletion manager now supports bulk operations to efficiently handle large numbers of files and packages, reducing the overhead of individual task scheduling.
- class ccat_data_transfer.deletion_manager.DeletionTask
Bases:
CCATEnhancedSQLAlchemyTaskBase class for deletion tasks.
- __init__()
- get_retry_count(session, operation_id)
Get current retry count for this operation.
- reset_state_on_failure(session, physical_copy_id, exc)
Reset deletion state for retry.
- mark_permanent_failure(session, physical_copy_id, exc)
Mark deletion as permanently failed.
- get_operation_info(args, kwargs)
Get additional context for deletion tasks.
- ignore_result = False
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- priority = None
Default task priority.
- rate_limit = None
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>
Task request stack, the current request will be the topmost.
- serializer = 'json'
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transfer.deletion_manager.delete_data_packages(verbose=False)
Main entry point for deletion operations.
- ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk(verbose=False)
Bulk deletion of raw data packages and their associated files from source locations.
This function finds raw data packages that have been fully archived in LTA and can be safely deleted from source locations. It schedules bulk deletion tasks for both the packages and their associated raw data files, taking into account that SOURCE and BUFFER locations can be on different computers.
- ccat_data_transfer.deletion_manager.schedule_bulk_file_deletions(session: Session, packages: List[RawDataPackage], package_location: DataLocation)
Schedule bulk deletion of raw data files associated with packages.
This function handles the fact that SOURCE and BUFFER locations can be on different computers, so it schedules separate bulk deletion tasks for each unique source location where the files exist.
- ccat_data_transfer.deletion_manager.find_deletable_raw_data_packages_by_location(session: Session) Dict[DataLocation, List[RawDataPackage]]
Find raw data packages that can be safely deleted, grouped by location.
Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.
- Returns:
Dictionary mapping DataLocation to list of deletable RawDataPackage objects
- ccat_data_transfer.deletion_manager.delete_data_transfer_packages(verbose=False)
Manage deletions across all archives based on archival status and transfer completion.
- ccat_data_transfer.deletion_manager.get_physical_copy_context(session: Session, physical_copy_id: int) dict
Get extended context information about a physical copy and its related records.
- Parameters:
session (Session) – The database session to use.
physical_copy_id (int) – The ID of the PhysicalCopy to get context for.
- Returns:
Dictionary containing detailed information about the physical copy and related records.
- Return type:
dict
- ccat_data_transfer.deletion_manager.find_deletable_data_transfer_packages(session) list[tuple[DataTransferPackage, DataLocation]]
Find all DataTransferPackages that can be safely deleted from their respective DataLocations.
Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.
- Returns:
List of tuples containing (DataTransferPackage, DataLocation) pairs that can be deleted.
- ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files(session) list[RawDataFilePhysicalCopy]
Find RawDataFilePhysicalCopy objects in PROCESSING locations that are not needed by any active StagingJob.
- ccat_data_transfer.deletion_manager.delete_processing_raw_data_files(verbose=False)
Delete raw data files from processing locations if not needed by any active staging job.
- ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing(verbose=False)
Delete RawDataFiles from processing areas when their associated staging jobs are completed.
This function finds RawDataFiles in PROCESSING locations that were staged as part of completed staging jobs (jobs with active=False) and schedules them for bulk deletion, following the same pattern as SOURCE bulk deletion.
- ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location(session: Session) Dict[DataLocation, List[RawDataFilePhysicalCopy]]
Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location.
A file can be deleted if: 1. It’s in a PROCESSING location 2. It’s part of a RawDataPackage that has been staged (STAGED status) 3. All staging jobs for that package are completed (active=False)
- Returns:
Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects
- ccat_data_transfer.deletion_manager.get_disk_usage(location: DataLocation) float
Get the current disk usage percentage from Redis.
- ccat_data_transfer.deletion_manager.find_completed_long_term_archive_transfers(session, raw_data_package_id, location_id=None)
Find completed long term archive transfers for a raw data package.
- Parameters:
session (sqlalchemy.orm.Session) – Database session
raw_data_package_id (int) – ID of the raw data package
location_id (int, optional) – ID of the specific location to filter for
- Returns:
List of completed LongTermArchiveTransfer objects
- Return type:
list
- ccat_data_transfer.deletion_manager.get_long_term_archive_transfer_status_counts(session, raw_data_package_id)
Get counts of long term archive transfers by status for a raw data package.
- Parameters:
session (sqlalchemy.orm.Session) – Database session
raw_data_package_id (int) – ID of the raw data package
- Returns:
Dictionary mapping status values to counts
- Return type:
dict
- ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) bool
Check if RawDataPackage can be deleted from SOURCE site.
Conditions: 1. Must be BUFFER location type 2. Must be SOURCE Site 2. Must exist in at least one LTA DataLocation (not just LTA site buffer)
- ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer(session: Session, raw_data_package: RawDataPackage, lta_buffer_location: DataLocation) bool
Check if RawDataPackage can be deleted from LTA site buffer.
Conditions: 1. Must be BUFFER location at LTA site 2. Must exist in LTA DataLocation at same site
- ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer(session: Session, package: DataTransferPackage, source_buffer: DataLocation) bool
Check if DataTransferPackage can be deleted from SOURCE site buffer.
Conditions: 1. Must be BUFFER location at SOURCE site 2. Must have completed DataTransfer with unpack_status=COMPLETED to LTA site 3. ALL transfers from this source buffer must be completed and unpacked
- ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer(session: Session, package: DataTransferPackage, lta_buffer: DataLocation) bool
Check if DataTransferPackage can be deleted from LTA site buffer.
Conditions: 1. Must be BUFFER location at LTA site 2. Must be synced to ALL other LTA site buffers (using round-robin logic)
- ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) None
When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE. Uses bulk update to avoid looping through potentially massive PhysicalCopies.
- ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files(session: Session, location: DataLocation) None
Process RawDataFiles marked as DELETION_POSSIBLE based on retention and disk buffer logic.
- ccat_data_transfer.deletion_manager.is_lta_site(site: Site) bool
Check if a site has LTA DataLocations.
- ccat_data_transfer.deletion_manager.is_source_site(site: Site) bool
Check if a site has SOURCE DataLocations.
- ccat_data_transfer.deletion_manager.get_buffer_status_for_location(location_name: str) dict
Get buffer status from Redis for a specific location.
- ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status(location: DataLocation, buffer_status: dict) bool
Enhanced buffer status checking with location-specific logic.