Failure Recovery System

The CCAT Data Transfer system implements a robust two-tier failure recovery mechanism to handle various types of failures that can occur during data transfer operations.

Overview

The system consists of two complementary recovery mechanisms:

  1. Immediate Task Recovery (Celery-based)

  2. Stalled Task Recovery (Monitor-based)

This dual approach ensures that both expected failures (handled by Celery) and unexpected interruptions (handled by the monitor) can be properly managed and recovered from.

Immediate Task Recovery

The immediate recovery system is built on top of Celery’s task failure handling mechanism and provides:

  • Custom error classification

  • Automatic retry logic

  • Operation-specific recovery procedures

  • Structured error logging

  • Administrator notifications

Error Classification

The system defines a hierarchy of custom exceptions in exceptions.py:

  • CCATDataOperationError: Base exception for all operations

  • Operation-specific errors (e.g., PackageError, TransferError)

  • Retryable vs. non-retryable error classification

Each error can specify: - Whether it is retryable - Maximum number of retries allowed - Operation-specific context

Recovery Implementation

Each pipeline component implements custom recovery logic through:

  • reset_state_on_failure: Handles retryable errors

  • mark_permanent_failure: Handles non-retryable errors

Example:

def reset_state_on_failure(self, session, operation_id, exc):
    """Reset operation state for retry."""
    data_transfer = session.query(models.DataTransfer).get(operation_id)
    if data_transfer:
        data_transfer.status = models.Status.PENDING
        data_transfer.retry_count += 1

Stalled Task Recovery

The stalled task recovery system operates independently of Celery and handles cases where tasks are interrupted unexpectedly.

Components

  1. Task Monitor Service - Monitors task heartbeats - Detects stalled tasks - Initiates recovery procedures

  2. Recovery Service Runner - Runs as a standalone process - Manages the monitoring loop - Handles service lifecycle

Recovery Process

  1. Task State Tracking - Tasks send periodic heartbeats to Redis - Monitor service checks for stale heartbeats - Stalled tasks are identified based on timeout

  2. Recovery Actions - Operation-specific recovery handlers - State reset to allow retry - Notification of administrators

Example Recovery Handler:

def _recover_package(self, session, package_id):
    """Reset a data transfer package's state."""
    package = session.query(models.DataTransferPackage).get(package_id)
    if package:
        package.status = models.Status.PENDING

Notification System

Both recovery systems integrate with a notification service that:

  • Sends alerts for critical errors

  • Reports recovery attempts

  • Provides detailed error context

  • Supports different notification levels (ERROR, WARNING, INFO)

Configuration

The recovery system can be configured through:

  • Maximum retry counts

  • Heartbeat timeouts

  • Notification recipients

  • Recovery intervals

Example Configuration:

TASK_RECOVERY = {
    'heartbeat_timeout': 3600,  # 1 hour
    'loop_interval': 300,       # 5 minutes
}

Best Practices

  1. Error Classification - Use appropriate error types - Set correct retryability - Include relevant context

  2. Recovery Implementation - Implement both recovery methods - Handle database state properly - Log recovery actions

  3. Monitoring - Monitor recovery success rates - Track retry counts - Review notification patterns

  4. Maintenance - Regular review of error patterns - Update recovery strategies - Adjust timeouts as needed