Airflow & Celery on Redis: when Airflow picks up old task instances

This is going to be a quick post on Airflow. We realized that in one of our environments, Airflow scheduler picks up old task instances that were already a success (whether marked as success or completed successfully). You can verify this is actually your issue by ssh into your Airflow workers, and run:

ps -ef | grep airflow

And check the DAG Run IDs: most of them are for old runs.

This happens when Celery’s Backend, in our case Redis, has old keys (or duplicate keys) of task runs. So the solution would be to clear Celery queue. And here are the steps to do it when Celery runs on Redis:

1- Stop Airflow Scheduler:

sudo initctl status airflow-scheduler

sudo initctl stop airflow-scheduler

2- Stop webserver:

sudo initctl status airflow-webserver

sudo initctl stop airflow-webserver

3- Stop Celery Flower:

cd /var/lib/airflow/bin

./ flower status

./ flower stop

4- Stop workers:

cd /var/lib/airflow/bin

./ worker status

./ worker stop

Now ssh into the server where Redis is running and type “redis-cli” and press enter to get into Redis CLI. Follow steps below to flush Redis DB:

  1. INFO keyspace — List keyspaces

    a. You should get only 1 result back

  2. SELECT 0 — Select Database
  3. config get dir —  Get database file location to take backup
  4. Copy file “xxxx.db” from above location to your home directory
  5. FLUSHDB — Flush database

Now you can start all Airflow services:

1- Scheduler commands

sudo initctl start airflow-scheduler

sudo initctl status airflow-scheduler

2- Webserver commands

sudo initctl start airflow-webserver

sudo initctl status airflow-webserver

3- Flower commands

cd /var/lib/airflow/prd/bin

nohup ./ flower start &

./ flower status

4- Worker commands

cd /var/lib/airflow/prd/bin

nohup ./ worker start &

./ worker status


Go back to Airflow and validate all DAGs are starting and completing successfully.

And he happy ever after! 🙂

YARN Capacity Scheduler: Queue Priority

Capacity Scheduler is designed to run Hadoop jobs in a shared, multi-tenant cluster in a friendly manner. Its main strength is that it guarantees specific capacity for a certain group of users by supporting multiple queues and allowing users to submit their queries into their dedicated queues. Each queue is given a fraction of total cluster capacity (RAM and CPU) and all jobs submitted to a queue will have access to the capacity dedicated to that queue.

Queue priority in Capacity Scheduler is implemented by assigning higher/lower capacity to the queues which should have higher/lower priority. Another way of making sure of this arrangement is by setting the maximum percentage of cluster resources each queue can use. Therefore to assign lower priority to a queue we should limit the amount of resource it can use.

Doing so on default queue is a bit tricky, as all the jobs submitted to the platform go through the default queue and get their Application Master Container created in there. It is a very small container that controls application execution and requests resources for YARN job submiited to the cluster. Having said that, we can use another setting in our platform that allows assigning higher priority to more important applications by setting mapred.capacity-scheduler.queue.<queue-name>.supports-priority.

To see the settings of each queue in the cluster, you should navigate to resource manager’s web UI and click on Scheduler from left menu. Then click the arrow on the left hand side of each queue to expand the settings. The 2 most important settings to check are Absolute Capacity (Queue capacity in percentage) and Absolute Max Capacity (Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue):

C Scheduler