python Logocelery

Celery is an open-source, distributed task queue written in Python. It is designed to process large amounts of messages (tasks) asynchronously and out-of-band from the main application flow. By offloading long-running, computationally intensive, or I/O-bound operations to Celery, a primary application (e.g., a web server) can remain responsive and not block waiting for these tasks to complete.

Core Concepts and Components:
1. Celery Client (Producer): This is the part of your main application (e.g., a Django view, a Flask endpoint, or a standalone script) that calls or "sends" tasks to Celery for execution. When a task is called using methods like `.delay()` or `.apply_async()`, it's essentially packaged and sent to the broker.
2. Celery Broker (Message Transport): A crucial middleware that receives tasks from the client and delivers them to workers. The broker ensures that task messages are reliably queued, stored, and then dispatched. Popular choices for brokers include RabbitMQ (a robust message broker) and Redis (a high-performance in-memory data store, often used as a simple broker).
3. Celery Worker: A separate, long-running process that continuously monitors the broker for new task messages. When a worker receives a task, it retrieves the task's details, executes the corresponding Python function (the task itself), and processes any results or exceptions.
4. Celery Backend (Result Store - Optional): Used to store the results, status, or state of tasks after they have been executed by a worker. This allows the client application to later retrieve the outcome of a task (e.g., whether it succeeded, failed, and its return value). Common backends include Redis, databases (SQLAlchemy, Django ORM), Memcached, or even a filesystem.

How Celery Works:
- An application (the client) calls a Celery task function (e.g., `my_task.delay(arg1, arg2)`).
- Celery serializes the task's name, arguments, and other metadata into a message and sends it to the configured broker.
- The broker stores this message in a queue, waiting for an available worker.
- A Celery worker process, which is constantly polling the broker, picks up the task message from the queue.
- The worker deserializes the message and executes the Python function associated with the task.
- (Optionally) If a backend is configured, the worker updates the task's status (e.g., 'PENDING', 'STARTED', 'SUCCESS', 'FAILURE') and stores its return value or any exceptions in the backend.
- The client application can later use the task's ID (returned by `.delay()` or `.apply_async()`) to query the backend and check the task's status or retrieve its result.

Common Use Cases:
- Sending Emails: Dispatching welcome emails, password reset links, or notification emails in the background.
- Image/Video Processing: Resizing, watermarking, compressing, or transcoding media files after user uploads.
- Report Generation: Creating complex PDFs or CSV reports that take significant time to compile.
- Data Imports/Exports: Processing large datasets asynchronously to avoid timeouts.
- Scheduled Tasks: Using `Celery Beat`, tasks can be scheduled to run at specific intervals (like cron jobs).
- Third-Party API Integrations: Making requests to external services that might be slow or have rate limits.

Benefits:
- Asynchronous Processing: Improves application responsiveness and user experience by offloading long-running operations.
- Scalability: Easily scale task processing by adding more workers as demand grows.
- Reliability: Provides mechanisms for retries, message acknowledgments, and graceful handling of task failures.
- Decoupling: Separates background tasks from the main application logic, leading to better modularity and easier maintenance.
- Concurrency: Workers can handle multiple tasks concurrently using multiprocessing or eventlets/gevent.

Example Code

python
 1. Installation:
    Ensure you have Python installed.
    Open your terminal or command prompt and install Celery and Redis client:
    pip install celery redis

 2. Start a Redis server:
    Celery needs a broker (and optionally a backend) to function. Redis is a common choice.
    If you don't have Redis installed, refer to its official documentation.
    Once installed, start the Redis server (usually by typing 'redis-server' in a terminal).
    If Redis is running on a different host or port, adjust the URLs below.

 --- File: tasks.py ---
 This file defines our Celery application instance and the tasks it will execute.

from celery import Celery
import time
import os

 Configure Celery with a name, broker URL, and backend URL.
 We're using environment variables for flexibility, falling back to localhost Redis.
app = Celery(
    'my_celery_app',
    broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
    backend=os.environ.get('CELERY_BACKEND_URL', 'redis://localhost:6379/0')
)

 Optional: Configure the timezone for task scheduling if needed.
app.conf.timezone = 'Europe/Istanbul'
app.conf.enable_utc = True  It's good practice to use UTC for task times

 Define a simple task to add two numbers. This simulates a computation.
@app.task
def add(x, y):
    print(f"[Worker] Executing 'add' task: {x} + {y}...")
    time.sleep(5)   Simulate a long-running operation (e.g., complex calculation)
    result = x + y
    print(f"[Worker] 'add' task completed. Result: {result}")
    return result

 Define a task to simulate sending an email, demonstrating retries.
 'bind=True' allows the task to access its own instance ('self'), useful for retries.
 'default_retry_delay' is in seconds.
 'max_retries' limits how many times a task will be retried.
@app.task(bind=True, default_retry_delay=10, max_retries=3)
def send_email(self, to_email, subject, body):
    try:
        print(f"[Worker] Sending email to '{to_email}' with subject '{subject}'...")
         Simulate a transient network error for demonstration purposes
        if to_email == "fail@example.com" and self.request.retries < 2:
            print(f"[Worker] Simulating failure for {to_email}. Retries: {self.request.retries}")
            raise ConnectionError("SMTP server temporarily unavailable!")

        time.sleep(3)  Simulate the actual network delay for sending email
        print(f"[Worker] Email sent successfully to '{to_email}'!")
        return True
    except ConnectionError as e:
         Log the retry attempt and re-queue the task
        print(f"[Worker] Email sending failed for '{to_email}': {e}. Retrying... (Attempt {self.request.retries + 1}/{self.max_retries})")
         self.retry() re-queues the task for a future execution
        raise self.retry(exc=e)  Pass the exception to the retry
    except Exception as e:
         Handle other unexpected errors, log them, and potentially not retry
        print(f"[Worker] An unexpected error occurred for '{to_email}': {e}")
        raise  Re-raise for Celery to mark task as failed


 --- File: caller.py ---
 This file demonstrates how a client application would call Celery tasks.

 To run this example:
 1. Ensure Redis server is running.
 2. Open a new terminal and start the Celery worker (from the same directory as tasks.py):
    celery -A tasks worker --loglevel=info
    (If you are on Windows, you might need to use `celery -A tasks worker -P solo --loglevel=info`
     or use WSL/Docker for better multiprocessing support.)
 3. Open another terminal and run this caller script:
    python caller.py

from tasks import add, send_email
import time

print("--- Client Application Started ---")

print("\n[Client] Calling 'add' task asynchronously...")
 .delay() is a convenient shortcut for .apply_async()
 It sends the task to the broker and immediately returns an AsyncResult object.
add_task_result = add.delay(10, 20)
print(f"[Client] Task ID for 'add': {add_task_result.id}")

print("\n[Client] Calling 'send_email' task (success case)...")
email_success_task_result = send_email.delay("recipient@example.com", "Welcome!", "Thanks for signing up.")
print(f"[Client] Task ID for 'send_email' (success): {email_success_task_result.id}")

print("\n[Client] Calling 'send_email' task (fail and retry case)...")
email_fail_task_result = send_email.delay("fail@example.com", "Important Alert!", "Your account needs attention.")
print(f"[Client] Task ID for 'send_email' (fail): {email_fail_task_result.id}")

print("\n[Client] Main application continues processing immediately while tasks run in background...")
 Simulate other non-blocking work in your main application
time.sleep(2)  Give some time for tasks to be sent to broker and workers to start processing

print("\n[Client] Checking on task results (this part might block or poll for results)...")
 Using .get() is blocking and waits for the task to complete.
 In a real-world web application, you might poll for status via an API endpoint
 or use websockets to notify the user when a task is done.

try:
     We block for up to 10 seconds to get the result of the 'add' task.
     If the worker doesn't finish or fails within this time, a TimeoutError is raised.
    add_result_value = add_task_result.get(timeout=10)
    print(f"[Client] Result of 'add' task (ID: {add_task_result.id}): {add_result_value}")
except Exception as e:
    print(f"[Client] Could not get 'add' task result or it timed out: {e}")

 For email tasks, we often just fire-and-forget, but we can still check their state.
print(f"[Client] Current state of 'send_email' (success) (ID: {email_success_task_result.id}): {email_success_task_result.state}")
print(f"[Client] Current state of 'send_email' (fail) (ID: {email_fail_task_result.id}): {email_fail_task_result.state}")

 You can also use .ready() to check if a task is done without blocking
if email_success_task_result.ready():
    print(f"[Client] 'send_email' (success) task is ready. Final state: {email_success_task_result.state}. Result: {email_success_task_result.get()}")
else:
    print(f"[Client] 'send_email' (success) task is not yet ready.")

print("\n--- Client Application Finished ---")