Failure Recovery System
The CCAT Data Transfer system implements a robust two-tier failure recovery mechanism to handle various types of failures that can occur during data transfer operations.
Overview
The system consists of two complementary recovery mechanisms:
Immediate Task Recovery (Celery-based)
Stalled Task Recovery (Monitor-based)
This dual approach ensures that both expected failures (handled by Celery) and unexpected interruptions (handled by the monitor) can be properly managed and recovered from.
Immediate Task Recovery
The immediate recovery system is built on top of Celery’s task failure handling mechanism and provides:
Custom error classification
Automatic retry logic
Operation-specific recovery procedures
Structured error logging
Administrator notifications
Error Classification
The system defines a hierarchy of custom exceptions in exceptions.py:
CCATDataOperationError: Base exception for all operationsOperation-specific errors (e.g.,
PackageError,TransferError)Retryable vs. non-retryable error classification
Each error can specify: - Whether it is retryable - Maximum number of retries allowed - Operation-specific context
Recovery Implementation
Each pipeline component implements custom recovery logic through:
reset_state_on_failure: Handles retryable errorsmark_permanent_failure: Handles non-retryable errors
Example:
def reset_state_on_failure(self, session, operation_id, exc):
"""Reset operation state for retry."""
data_transfer = session.query(models.DataTransfer).get(operation_id)
if data_transfer:
data_transfer.status = models.Status.PENDING
data_transfer.retry_count += 1
Stalled Task Recovery
The stalled task recovery system operates independently of Celery and handles cases where tasks are interrupted unexpectedly.
Components
Task Monitor Service - Monitors task heartbeats - Detects stalled tasks - Initiates recovery procedures
Recovery Service Runner - Runs as a standalone process - Manages the monitoring loop - Handles service lifecycle
Recovery Process
Task State Tracking - Tasks send periodic heartbeats to Redis - Monitor service checks for stale heartbeats - Stalled tasks are identified based on timeout
Recovery Actions - Operation-specific recovery handlers - State reset to allow retry - Notification of administrators
Example Recovery Handler:
def _recover_package(self, session, package_id):
"""Reset a data transfer package's state."""
package = session.query(models.DataTransferPackage).get(package_id)
if package:
package.status = models.Status.PENDING
Notification System
Both recovery systems integrate with a notification service that:
Sends alerts for critical errors
Reports recovery attempts
Provides detailed error context
Supports different notification levels (ERROR, WARNING, INFO)
Configuration
The recovery system can be configured through:
Maximum retry counts
Heartbeat timeouts
Notification recipients
Recovery intervals
Example Configuration:
TASK_RECOVERY = {
'heartbeat_timeout': 3600, # 1 hour
'loop_interval': 300, # 5 minutes
}
Best Practices
Error Classification - Use appropriate error types - Set correct retryability - Include relevant context
Recovery Implementation - Implement both recovery methods - Handle database state properly - Log recovery actions
Monitoring - Monitor recovery success rates - Track retry counts - Review notification patterns
Maintenance - Regular review of error patterns - Update recovery strategies - Adjust timeouts as needed