ccat_data_transfer package

Subpackages

Submodules

ccat_data_transfer.ccat_data_transfer module

ccat_data_transfer.transfer_manager module

class ccat_data_transfer.transfer_manager.DataTransferTask

Bases: CCATEnhancedSQLAlchemyTask

Base class for data transfer tasks.

__init__()
get_retry_count(session, data_transfer_id)

Get current retry count for this data transfer operation.

reset_state_on_failure(session, data_transfer_id, exc)

Reset data transfer state for retry.

mark_permanent_failure(session, data_transfer_id, exc)

Mark data transfer as permanently failed.

get_operation_info(args, kwargs)

Get additional context for data transfer tasks.

on_failure(exc, task_id, args, kwargs, einfo)

Handle task failure with recovery for specific error cases.

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the :setting:`task_track_started` setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transfer.transfer_manager.transfer_transfer_packages(verbose: bool = False, session: Session = None) None

Find not yet transferred data transfer packages and schedule their transfer.

Parameters:

verbose (bool, optional) – If True, sets the logging level to DEBUG. Default is False.

Return type:

None

Notes

  • Updates the logging level if verbose is True.

  • Retrieves pending data transfers from the database.

  • Schedules Celery tasks for file transfers.

  • Updates data transfer statuses in the database.

  • Logs information about the transfer process.

  • Handles database errors and unexpected exceptions.

ccat_data_transfer.data_integrity_manager module

class ccat_data_transfer.data_integrity_manager.UnpackTask

Bases: CCATEnhancedSQLAlchemyTask

Base class for unpacking tasks.

__init__()
get_retry_count(session, data_transfer_id)

Get current retry count for this unpack operation.

reset_state_on_failure(session, data_transfer_id, exc)

Reset unpack state for retry.

mark_permanent_failure(session, data_transfer_id, exc)

Mark unpack as permanently failed.

get_operation_info(args, kwargs)

Get additional context for unpack tasks.

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the :setting:`task_track_started` setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transfer.data_integrity_manager.unpack_and_verify_files(verbose: bool = False, session: Session = None) None

Unpack transferred files and verify their xxHash checksums.

This function retrieves all completed data transfers that are pending unpacking, and schedules Celery tasks to unpack and verify each package.

Parameters:
  • verbose (bool, optional) – If True, sets logging level to DEBUG. Default is False.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:

SQLAlchemyError – If there’s an issue with database operations.

ccat_data_transfer.archive_manager module

class ccat_data_transfer.archive_manager.LongTermArchiveTask

Bases: CCATEnhancedSQLAlchemyTask

Base class for long term archive tasks.

__init__()
get_retry_count(session, operation_id)

Get current retry count for this operation.

reset_state_on_failure(session, long_term_archive_transfer_id, exc)

Reset long term archive transfer state for retry.

mark_permanent_failure(session, long_term_archive_transfer_id, exc)

Mark long term archive transfer as permanently failed.

get_operation_info(args, kwargs)

Get additional context for long term archive tasks.

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the :setting:`task_track_started` setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transfer.archive_manager.transfer_raw_data_packages_to_long_term_archive(verbose: bool = False, site_name: str | None = None) None

Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system.

Parameters:
  • verbose (bool) – If True, sets logging to DEBUG level. Defaults to False.

  • site_name (Optional[str]) – If provided, only schedules transfers for the specified site.

Raises:

SQLAlchemyError – If there’s an issue with database operations.

ccat_data_transfer.deletion_manager module

Deletion Manager for CCAT Data Transfer System

This module implements deletion logic for RawDataFiles, RawDataPackages, and DataTransferPackages across all DataLocations (SOURCE, BUFFER, LONG_TERM_ARCHIVE, PROCESSING) using the new Site/DataLocation architecture and automatic queue discovery system. It ensures that data is only deleted when safely archived in LTA, retention periods and disk thresholds are respected, and processing files are kept as long as needed by active staging jobs.

The deletion manager now supports bulk operations to efficiently handle large numbers of files and packages, reducing the overhead of individual task scheduling.

class ccat_data_transfer.deletion_manager.DeletionTask

Bases: CCATEnhancedSQLAlchemyTask

Base class for deletion tasks.

__init__()
get_retry_count(session, operation_id)

Get current retry count for this operation.

reset_state_on_failure(session, physical_copy_id, exc)

Reset deletion state for retry.

mark_permanent_failure(session, physical_copy_id, exc)

Mark deletion as permanently failed.

get_operation_info(args, kwargs)

Get additional context for deletion tasks.

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the :setting:`task_track_started` setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transfer.deletion_manager.delete_data_packages(verbose=False)

Main entry point for deletion operations.

ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk(verbose=False)

Bulk deletion of raw data packages and their associated files from source locations.

This function finds raw data packages that have been fully archived in LTA and can be safely deleted from source locations. It schedules bulk deletion tasks for both the packages and their associated raw data files, taking into account that SOURCE and BUFFER locations can be on different computers.

ccat_data_transfer.deletion_manager.schedule_bulk_file_deletions(session: Session, packages: List[RawDataPackage], package_location: DataLocation)

Schedule bulk deletion of raw data files associated with packages.

This function handles the fact that SOURCE and BUFFER locations can be on different computers, so it schedules separate bulk deletion tasks for each unique source location where the files exist.

ccat_data_transfer.deletion_manager.find_deletable_raw_data_packages_by_location(session: Session) Dict[DataLocation, List[RawDataPackage]]

Find raw data packages that can be safely deleted, grouped by location.

Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.

Returns:

Dictionary mapping DataLocation to list of deletable RawDataPackage objects

ccat_data_transfer.deletion_manager.delete_data_transfer_packages(verbose=False)

Manage deletions across all archives based on archival status and transfer completion.

ccat_data_transfer.deletion_manager.get_physical_copy_context(session: Session, physical_copy_id: int) dict

Get extended context information about a physical copy and its related records.

Parameters:
  • session (Session) – The database session to use.

  • physical_copy_id (int) – The ID of the PhysicalCopy to get context for.

Returns:

Dictionary containing detailed information about the physical copy and related records.

Return type:

dict

ccat_data_transfer.deletion_manager.find_deletable_data_transfer_packages(session) list[tuple[DataTransferPackage, DataLocation]]

Find all DataTransferPackages that can be safely deleted from their respective DataLocations.

Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.

Returns:

List of tuples containing (DataTransferPackage, DataLocation) pairs that can be deleted.

ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files(session) list[RawDataFilePhysicalCopy]

Find RawDataFilePhysicalCopy objects in PROCESSING locations that are not needed by any active StagingJob.

ccat_data_transfer.deletion_manager.delete_processing_raw_data_files(verbose=False)

Delete raw data files from processing locations if not needed by any active staging job.

ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing(verbose=False)

Delete RawDataFiles from processing areas when their associated staging jobs are completed.

This function finds RawDataFiles in PROCESSING locations that were staged as part of completed staging jobs (jobs with active=False) and schedules them for bulk deletion, following the same pattern as SOURCE bulk deletion.

ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location(session: Session) Dict[DataLocation, List[RawDataFilePhysicalCopy]]

Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location.

A file can be deleted if: 1. It’s in a PROCESSING location 2. It’s part of a RawDataPackage that has been staged (STAGED status) 3. All staging jobs for that package are completed (active=False)

Returns:

Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects

ccat_data_transfer.deletion_manager.get_disk_usage(location: DataLocation) float

Get the current disk usage percentage from Redis.

ccat_data_transfer.deletion_manager.find_completed_long_term_archive_transfers(session, raw_data_package_id, location_id=None)

Find completed long term archive transfers for a raw data package.

Parameters:
  • session (sqlalchemy.orm.Session) – Database session

  • raw_data_package_id (int) – ID of the raw data package

  • location_id (int, optional) – ID of the specific location to filter for

Returns:

List of completed LongTermArchiveTransfer objects

Return type:

list

ccat_data_transfer.deletion_manager.get_long_term_archive_transfer_status_counts(session, raw_data_package_id)

Get counts of long term archive transfers by status for a raw data package.

Parameters:
  • session (sqlalchemy.orm.Session) – Database session

  • raw_data_package_id (int) – ID of the raw data package

Returns:

Dictionary mapping status values to counts

Return type:

dict

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) bool

Check if RawDataPackage can be deleted from SOURCE site.

Conditions: 1. Must be BUFFER location type 2. Must be SOURCE Site 2. Must exist in at least one LTA DataLocation (not just LTA site buffer)

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer(session: Session, raw_data_package: RawDataPackage, lta_buffer_location: DataLocation) bool

Check if RawDataPackage can be deleted from LTA site buffer.

Conditions: 1. Must be BUFFER location at LTA site 2. Must exist in LTA DataLocation at same site

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer(session: Session, package: DataTransferPackage, source_buffer: DataLocation) bool

Check if DataTransferPackage can be deleted from SOURCE site buffer.

Conditions: 1. Must be BUFFER location at SOURCE site 2. Must have completed DataTransfer with unpack_status=COMPLETED to LTA site 3. ALL transfers from this source buffer must be completed and unpacked

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer(session: Session, package: DataTransferPackage, lta_buffer: DataLocation) bool

Check if DataTransferPackage can be deleted from LTA site buffer.

Conditions: 1. Must be BUFFER location at LTA site 2. Must be synced to ALL other LTA site buffers (using round-robin logic)

ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) None

When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE. Uses bulk update to avoid looping through potentially massive PhysicalCopies.

ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files(session: Session, location: DataLocation) None

Process RawDataFiles marked as DELETION_POSSIBLE based on retention and disk buffer logic.

ccat_data_transfer.deletion_manager.is_lta_site(site: Site) bool

Check if a site has LTA DataLocations.

ccat_data_transfer.deletion_manager.is_source_site(site: Site) bool

Check if a site has SOURCE DataLocations.

ccat_data_transfer.deletion_manager.get_buffer_status_for_location(location_name: str) dict

Get buffer status from Redis for a specific location.

ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status(location: DataLocation, buffer_status: dict) bool

Enhanced buffer status checking with location-specific logic.