YARN Capacity Scheduler: Queue Priority

Capacity Scheduler is designed to run Hadoop jobs in a shared, multi-tenant cluster in a friendly manner. Its main strength is that it guarantees specific capacity for a certain group of users by supporting multiple queues and allowing users to submit their queries into their dedicated queues. Each queue is given a fraction of total cluster capacity (RAM and CPU) and all jobs submitted to a queue will have access to the capacity dedicated to that queue.

Queue priority in Capacity Scheduler is implemented by assigning higher/lower capacity to the queues which should have higher/lower priority. Another way of making sure of this arrangement is by setting the maximum percentage of cluster resources each queue can use. Therefore to assign lower priority to a queue we should limit the amount of resource it can use.

Doing so on default queue is a bit tricky, as all the jobs submitted to the platform go through the default queue and get their Application Master Container created in there. It is a very small container that controls application execution and requests resources for YARN job submiited to the cluster. Having said that, we can use another setting in our platform that allows assigning higher priority to more important applications by setting mapred.capacity-scheduler.queue.<queue-name>.supports-priority.

To see the settings of each queue in the cluster, you should navigate to resource manager’s web UI and click on Scheduler from left menu. Then click the arrow on the left hand side of each queue to expand the settings. The 2 most important settings to check are Absolute Capacity (Queue capacity in percentage) and Absolute Max Capacity (Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue):

C Scheduler

Hive Performance Tuning

If you have been working in Big Data, you have definitely heard of Hive. Apache Hive is the data warehouse infrastructure build on top of Hadoop. I did a presentation on how to best use Apache Hive and few tips on how to best use it for one of our clients last week that I would like to share with you here. This is designed to help developers and analysts writing better queries and get result faster from Hive.

To best understand how Hive works, we need to picture it as a file system. Data in each table is divided into partitions based on partitioning strategy and each partition is stored as a physical file in hdfs. Files are replicated as many times as hdfs’s replication factor dictates, which is usually 3.


Use Hive Partitions

As you have probably guessed, partitioning can be used to limit the files our query scans to complete its job, instead of having to go through each and every file. Using partitions is more efficient than creating and then using indexes as well, as it physically limits the set of files each query scans.

The question for a developer or analyst would then be “How can I know what partitions exist for a table?” SHOW PARTITIONS are the keywords to achieve this. It can be used in Hue, Aginity, Hive CLI, or any other way you would use to query Hive tables. Partitions could be used in a query’s where clause, the same way we filter the query on any other column. you can see that as well as the difference using a partition in a query makes below:


Check Query Plan

Like any other database, hive provides a way to see the query plan it’ll use to execute the query. A query plan is the set of steps and commands the DB engine takes to execute the query and produce the result. You can use EXPLAIN  in the beginning of your query to see the query plan, or use Explain button in Hue to do the same:


I used the query we looked at previously to explain a couple of points on what to look for when checking query plans:

  1. TableScan: As its name implies, this step reads (scans) the tables for a purpose. In this example, we can clearly see that our filter on src_date is applied to the table scan. And since this column is our partition column, Hive knows clearly which files it should open and read to get the result it is looking for
  2. Check the step at which the filter on partition column is being applied. The sooner this filter is applied, the less records will be passed to the downstream steps of query execution and the more efficient it is.


Hive and Joins

When joining 2 tables, apply as much filter as possible on the bigger table in join itself, instead of where clause. This limits the number of records being joined to the smaller table and therefore, less records to work on or filter later.


Using all these techniques will help getting faster results from Hive, but nothing is more important than writing a good query. I was contacted by one of the analysts working for my client a few weeks ago, complaining that his query takes more than 12 hours to run. At the very first glance I realized he is scanning one of the biggest tables we have 5 times, with no filter on partitioned column. After spending a good hour on it and re-writing the query, we managed to get the same job done in about 25 minutes.

So, please write good queries. And always filter on partition column. Good luck!

Hadoop Error org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block

Like almost all Mondays, today was a very challenging one. The first thing I noticed was that our primary namenode had faced some issues over the weekend and went down. Which means secondary namenode, namenode-02, was active. I checked namenode-01 and made sure it is okay before making it active again. After that, I was made aware of when I arrived at office was that a very critical range of our ETL jobs has failed for over 12 hours.

Like everyone else would do when they get failed jobs, the first thing I did was to look into the logs for those jobs. All of them have failed with this error:

org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block

It is not hard to guess that hdfs is complaining about not being able to find some blocks of data it needs. So I navigated to Ambari’s HDFS page. But there were not any missing blocks being reported.

Therefore we can conclude that the data blocks are there, but for some reason namenode is not able to access them when jobs are submitted to cluster.

The second thing I noticed was that after primary namenode was made active, jobs started working fine and completing successfully. That hints there should have been something with namenode-02. So I navigated to our 2 namenode’s web UI:


Namenode-02 Before

There it is! I know we have 33 datanodes in our cluster, but the secondary namenode shows only 30. So what I did was to restart node manager on those datanodes that were not listed for namenode-02 and refreshed the page:

Namenode-02 After

Now all the datanodes are recognized by both namenodes and everyone lives happily ever after!

Note that you may check namenodes’ web UI and don’t see any missing datanodes. But still, restarting node managers on all datanodes will resolve your issue.








How to import org.apache.spark.sql.SQLContext.implicits in Spark 1.6: error “value toDF is not a member of org.apache.spark.rdd.RDD”

Note: If you’re using Spark 2.2, please read this post

I am doing a mini project for my company using Spark/Scala and have been stuck with the error mentioned in the title for a couple of days. Googling that error suggested to import org.apache.spark.sql.SQLContext.implicits, and that’s what I did:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext.implicits

import org.apache.spark.SparkConf
object TestSQLContext {
def main(args:Array[String]) {

And that was the start of the problem: my application started to give a new error:

object SQLContext is not a member of package org.apache.spark.sql
[error] Note: class SQLContext exists, but it has no companion object.

The problem is, none of those online posts mention that we need to create an instance of org.apache.spark.sql.SQLContext before being able to use its members and methods. This is the right way to do it:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
object Hi {

case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)

def main(args:Array[String]) {
val conf = new SparkConf().setAppName(“LoadDW”)
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val fDimCustomer = sc.textFile(“DimCustomer.txt”)

var dimCustomer1 =   fDimCustomer.map(_.split(‘,’)).map(r=>DimC(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt)).toDF


val customers = sqlContext.sql(“select * from Cust_1”)


Hope this post helps and please do not hesitate to ask your questions in comments section.