Failure Recovery System ===================== The CCAT Data Transfer system implements a robust two-tier failure recovery mechanism to handle various types of failures that can occur during data transfer operations. Overview -------- The system consists of two complementary recovery mechanisms: 1. Immediate Task Recovery (Celery-based) 2. Stalled Task Recovery (Monitor-based) This dual approach ensures that both expected failures (handled by Celery) and unexpected interruptions (handled by the monitor) can be properly managed and recovered from. Immediate Task Recovery ---------------------- The immediate recovery system is built on top of Celery's task failure handling mechanism and provides: - Custom error classification - Automatic retry logic - Operation-specific recovery procedures - Structured error logging - Administrator notifications Error Classification ~~~~~~~~~~~~~~~~~~ The system defines a hierarchy of custom exceptions in ``exceptions.py``: - ``CCATDataOperationError``: Base exception for all operations - Operation-specific errors (e.g., ``PackageError``, ``TransferError``) - Retryable vs. non-retryable error classification Each error can specify: - Whether it is retryable - Maximum number of retries allowed - Operation-specific context Recovery Implementation ~~~~~~~~~~~~~~~~~~~~~~ Each pipeline component implements custom recovery logic through: - ``reset_state_on_failure``: Handles retryable errors - ``mark_permanent_failure``: Handles non-retryable errors Example: :: def reset_state_on_failure(self, session, operation_id, exc): """Reset operation state for retry.""" data_transfer = session.query(models.DataTransfer).get(operation_id) if data_transfer: data_transfer.status = models.Status.PENDING data_transfer.retry_count += 1 Stalled Task Recovery ------------------- The stalled task recovery system operates independently of Celery and handles cases where tasks are interrupted unexpectedly. Components ~~~~~~~~~ 1. Task Monitor Service - Monitors task heartbeats - Detects stalled tasks - Initiates recovery procedures 2. Recovery Service Runner - Runs as a standalone process - Manages the monitoring loop - Handles service lifecycle Recovery Process ~~~~~~~~~~~~~~ 1. Task State Tracking - Tasks send periodic heartbeats to Redis - Monitor service checks for stale heartbeats - Stalled tasks are identified based on timeout 2. Recovery Actions - Operation-specific recovery handlers - State reset to allow retry - Notification of administrators Example Recovery Handler: :: def _recover_package(self, session, package_id): """Reset a data transfer package's state.""" package = session.query(models.DataTransferPackage).get(package_id) if package: package.status = models.Status.PENDING Notification System ----------------- Both recovery systems integrate with a notification service that: - Sends alerts for critical errors - Reports recovery attempts - Provides detailed error context - Supports different notification levels (ERROR, WARNING, INFO) Configuration ------------ The recovery system can be configured through: - Maximum retry counts - Heartbeat timeouts - Notification recipients - Recovery intervals Example Configuration: :: TASK_RECOVERY = { 'heartbeat_timeout': 3600, # 1 hour 'loop_interval': 300, # 5 minutes } Best Practices ------------- 1. Error Classification - Use appropriate error types - Set correct retryability - Include relevant context 2. Recovery Implementation - Implement both recovery methods - Handle database state properly - Log recovery actions 3. Monitoring - Monitor recovery success rates - Track retry counts - Review notification patterns 4. Maintenance - Regular review of error patterns - Update recovery strategies - Adjust timeouts as needed