AWS Glue Part 1: Discover and Catalogue Data Stored in s3

AWS Glue

Glue is a fully managed extract, transform, and load (ETL) service offered by Amazon Web Services. Glue discovers your data (stored in S3 or other databases) and stores the associated metadata (e.g. table definition and schema) in the Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

Once your ETL job is ready, you can schedule it to run on Glue’s fully managed, scale-out Apache Spark environment. It provides a flexible scheduler with dependency resolution, job monitoring, and alerting.

Glue provides out-of-the-box integration with Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and any Apache Hive Metastore-compatible application.

Discover Data Using Crawlers

AWS Glue is able to traverse data stores using Crawlers and populate data catalogues with one or more metadata tables. These tables could be used by ETL jobs later as source or target.

Below are the steps to add a crawler to analyse and catalogue data in an s3 bucket:

1. Sign in to the AWS Management Console and open the AWS Glue console. Choose the         Crawlers tab.

2. Choose Add crawler, it’ll lunch the Add crawler wizard. Follow the Wizard:

a. Specify a name and description for your crawler.

b. Add a data store. Here you have options to specify an s3 bucket or a JDBC connection. After selecting s3, select option for “Specified path in my account” and select folder icon next to “Include path” to select where the data to be crawled is:

Crawler Add Data Source

c. You can add another data source, in case you want to join data from 2 different places together:

Crawler Add Another Datasource

d. Choose an IAM role that has permissions to work with Glue. This role should have full access to run Glue jobs as well as access to the s3 buckets it reads data from and stores script to:

Crawler Choose IAM Role

e. Create a schedule for your Crawler. You can have it run on demand or chose one of the options in drop-down:

Crawler Schedule

f. The next step is to chose the location where the output from your crawler will be stored. This is a database in Athena, and you can pre-fix the name of the tables created by your crawler to be distinguishable easily from other tables in the database:

Crawler Configure Output

g. Review your crawler’s settings and click on Finish. You’ll be redirected to the main Crawlers page, where your crawler is listed.

h. Click on “Run it now?”:

Crawlers Main 2

When crawler finished running, go to Athena console and check your table’s there:

Athena Source Table

Examine table’s DDL. It’s an external table pointing to the location in s3 where your Crawler “crawled”. And start writing queries on it. It’s the first table you created using Glue crawlers. First of many. 🙂

 

Airflow & Celery on Redis: when Airflow picks up old task instances

This is going to be a quick post on Airflow. We realized that in one of our environments, Airflow scheduler picks up old task instances that were already a success (whether marked as success or completed successfully). You can verify this is actually your issue by ssh into your Airflow workers, and run:

ps -ef | grep airflow

And check the DAG Run IDs: most of them are for old runs.

This happens when Celery’s Backend, in our case Redis, has old keys (or duplicate keys) of task runs. So the solution would be to clear Celery queue. And here are the steps to do it when Celery runs on Redis:

1- Stop Airflow Scheduler:

sudo initctl status airflow-scheduler

sudo initctl stop airflow-scheduler

2- Stop webserver:

sudo initctl status airflow-webserver

sudo initctl stop airflow-webserver

3- Stop Celery Flower:

cd /var/lib/airflow/bin

./airflow.sh flower status

./airflow.sh flower stop

4- Stop workers:

cd /var/lib/airflow/bin

./airflow.sh worker status

./airflow.sh worker stop

Now ssh into the server where Redis is running and type “redis-cli” and press enter to get into Redis CLI. Follow steps below to flush Redis DB:

  1. INFO keyspace — List keyspaces

    a. You should get only 1 result back

  2. SELECT 0 — Select Database
  3. config get dir —  Get database file location to take backup
  4. Copy file “xxxx.db” from above location to your home directory
  5. FLUSHDB — Flush database

Now you can start all Airflow services:

1- Scheduler commands

sudo initctl start airflow-scheduler

sudo initctl status airflow-scheduler

2- Webserver commands

sudo initctl start airflow-webserver

sudo initctl status airflow-webserver

3- Flower commands

cd /var/lib/airflow/prd/bin

nohup ./airflow.sh flower start &

./airflow.sh flower status

4- Worker commands

cd /var/lib/airflow/prd/bin

nohup ./airflow.sh worker start &

./airflow.sh worker status

 

Go back to Airflow and validate all DAGs are starting and completing successfully.

And he happy ever after! 🙂

How to import spark.implicits._ in Spark 2.2: error “value toDS is not a member of org.apache.spark.rdd.RDD”

I wrote about how to import implicits in spark 1.6 more than 2 years ago. But things have changed in Spark 2.2: the first thing you need to do when coding in Spark 2.2 is to set up an SparkSession object. SparkSession is the entry point to programming Spark with DataSet and DataFrame.

Like Spark 1.6, spark.implicits are required to be able to use Spark’s API for DataSets and DataFrames in version 2.2. And like version 1.6, an instance of SparkContext is needed in Spark 2.2 before being able to import spark.implicits. Since each instance of SparkSession comes with and an instance of SparkContext associated with it, all you have to do is to create an object of SparkSession and you’re set.

I have seen other posts that mention bits and pieces of how to do it. Here I give you the full code that works just fine and you can tweek it based on your requirements:

import org.apache.spark.sql._

import org.apache.log4j._

object sparkSQLWithCaseClass {

case class Person (ID: Int, name: String)

def mapper(l: String): Person = {

val fields = l.split(‘,’)

val person: Person = Person(fields(0).toInt, fields(1))

return person

}

 

def main(args: Array[String]){

Logger.getLogger(“org”).setLevel(Level.ERROR)

val spark = SparkSession.builder.appName(“Spark SQL”).getOrCreate()

 

val lines = spark.sparkContext.textFile(“../../people.csv”)

val people = lines.map(mapper)

 

import spark.implicits._

val schemaPeople = people.toDS()

schemaPeople.printSchema()

schemaPeople.createOrReplaceTempView(“people”)

 

val t = spark.sql(“select * from people where age >= 13”)

val res = t.collect()

res.foreach(println)

spark.stop()

}

}

Spark Error “java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE” in Spark 1.6

RDDs are the building blocks of Spark and what make it so powerful: they are stored in memory for fast processing. RDDs are broken down into partitions (blocks) of data, a logical piece of distributed dataset.

The underlying abstraction for blocks in Spark is a ByteBuffer, which limits the size of the block to 2 GB.

In brief, this error means that the block size for the resulting RDD is larger than 2GB: https://issues.apache.org/jira/browse/SPARK-1476

One way to work around this issue is to increase application’s parallelism. We can define the default number of partitions in RDDs returned by join and reduceByKey, by adjusting

spark.default.parallelism

What this configuration parameter does is basically to define how many blocks of data our dataset, in this case RDD, is going to be divided into.

As you have probably realized by now, we would need to set spark.default.parallelism to a higher value when processing large datasets. This way we can make sure the size of data blocks do not exceed 2GB limitations.

Spark Error CoarseGrainedExecutorBackend Driver disassociated! Shutting down: Spark Memory & memoryOverhead

Another common error we saw in yarn application logs was this:

17/08/31 15:58:07 WARN CoarseGrainedExecutorBackend: An unknown (datanode-022:43969) driver disconnected.

17/08/31 15:58:07 ERROR CoarseGrainedExecutorBackend: Driver 10.1.1.111:43969 disassociated! Shutting down.

Googling this error suggests increasing spark.yarn.driver.memoryOverhead or spark.yarn.executor.memoryOverhead or both. That has apparently worked for a lot of people. Or at least those who were smart enough to understand how these properties work.

What you need to consider here is that memoryOverhead is allocated out of the total amount of memory available to driver or executor, which is controlled by spark.driver.memory & spark.executor.memory.

What this means is that if you’re increasing executor’s or driver’s memoryOverhead, double check if there is enough memory allocated to driver and executor or not. In our case, the user was allocating all the memory available to driver as memoryOverhead, which meant there was none left for other other driver operations:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=8g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=8g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

You can clearly see what I meant in above paragraph. Instead of doing this, user should have increased executor and driver memory according to increase in executor memory overhead:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=16g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=16g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

 

Spark Error: Failed to Send RPC to Datanode

This past week we had quite few issues with users not being able to run Spark jobs running in YARN Cluster mode. Particularly a team that was on tight schedule used to get errors like this all the time:

java.io.IOException: Failed to send RPC 8277242275361198650 to datanode-055: java.nio.channels.ClosedChannelException

Mostly accompanied by error messages like:

org.apache.spark.SparkException: Error sending message [message = Heartbeat(9,[Lscala.Tuple2;@e47ba81,BlockManagerId(9, datanode-50 , 43381))]

ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times

These errors basically mean the connection between Spark driver and executors are broken, mainly because executor is killed. This could happen because of a number of reasons:

1- We realized this happens a lot more often when our cluster is too busy and has hit maximum usage. What it means is that executors are accepted to DataNodes, but they fail to acquire enough memory on the datanode and therefore get killed.

2- Metaspace attempts to grow beyond the executor(JVM) memory limits, resulting in loss of executors.The best way to stop this error from appearing is to set below properties when launching Spark-Shell or submitting application using spark-submit:

spark.driver.extraJavaOptions = -XX:ReservedCodeCacheSize=100M-XX:MaxMetaspaceSize=256m

-XX:CompressedClassSpaceSize=256m

spark.executor.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

-XX:MaxMetaspaceSize=256m

-XX:CompressedClassSpaceSize=256m

Please note that depending on your project and code, you may need to increase the values mentioned above.

3- Network is slow for whatever reason. In our case, this was caused by a change in DNS which resulted in turning off caching.This case could be fixed by adjusting spark.executor.heartbeatInterval and spark.network.timeout. Default values for these 2 parameters are 10s and 120s. You can adjust these 2 values based on how your network, the only point to consider here is that the later property, spark.network.timeout, should be greater than the first one.

If none of what mentioned above helps your situation, then it is something you need to take to your cluster’s administrator. There could be something wrong with the datanodes where executors are sent to that admins are not aware of.

Happy coding!

YARN Capacity Scheduler: Queue Priority

Capacity Scheduler is designed to run Hadoop jobs in a shared, multi-tenant cluster in a friendly manner. Its main strength is that it guarantees specific capacity for a certain group of users by supporting multiple queues and allowing users to submit their queries into their dedicated queues. Each queue is given a fraction of total cluster capacity (RAM and CPU) and all jobs submitted to a queue will have access to the capacity dedicated to that queue.

Queue priority in Capacity Scheduler is implemented by assigning higher/lower capacity to the queues which should have higher/lower priority. Another way of making sure of this arrangement is by setting the maximum percentage of cluster resources each queue can use. Therefore to assign lower priority to a queue we should limit the amount of resource it can use.

Doing so on default queue is a bit tricky, as all the jobs submitted to the platform go through the default queue and get their Application Master Container created in there. It is a very small container that controls application execution and requests resources for YARN job submiited to the cluster. Having said that, we can use another setting in our platform that allows assigning higher priority to more important applications by setting mapred.capacity-scheduler.queue.<queue-name>.supports-priority.

To see the settings of each queue in the cluster, you should navigate to resource manager’s web UI and click on Scheduler from left menu. Then click the arrow on the left hand side of each queue to expand the settings. The 2 most important settings to check are Absolute Capacity (Queue capacity in percentage) and Absolute Max Capacity (Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue):

C Scheduler