Control IoT Devices Using Scala on Databricks (Based on ML Model Output)

Featured

A few weeks ago I did a talk at AI Bootcamp here in Melbourne on how we can build a serverless solution on Azure that would take us one step closer to powering industrial machines with AI, using the same technology stack that is typically used to deliver IoT analytics use cases. I demoed a solution that received data from an IoT device, in this case a crane, compared the data with the result of a machine learning model that has ran and written its predictions to a repository, in this case a CSV file, and then decided if any actions needs to be taken on the machine, e.g. slowing the crane down if the wind picks up. My solution had 3 main components:

  1- IoT Hub: The gateway to cloud, where IoT devices connect and send data to

 2- Databricks: The brain of the solution where the data received from IoT device is compared with what the ML algorithm has predicted, and then decided if to take any actions

  3- Azure Functions: A Java function was deployed to Azure Functions to call a Direct Method on my simulated crane and instruct it to slow down. Direct Method, or device method, provide the ability to control the behaviour of IoT devices from the cloud.

Since then, I upgraded my solution by moving the part responsible for sending direct methods to IoT devices from Azure Functions into Databricks, as I promised at the end of my talk. Doing this has a few advantages:

  • The solution will be more efficient since the data received from IoT devices hops one less step before the command is sent back and there are no delays caused by Azure Functions.
  • The solution will be more simplified, since there are less components deployed in it to get the job done. The less the components and services in a solution, the lower the complexity of managing and running it
  • The solution will be cheaper. We won’t be paying for Azure Functions calls, which could be pretty considerable amount of money when there are millions of devices connecting to cloud

This is what the solution I will go through in this post looks like:

I must mention here that I am not going to store the incoming sensor data as part of this blog post, but I would recommend to do so in Delta tables if you’re looking for a performant and modern storage solution.

Azure IoT Hub

IoT Hub is the gateway to our cloud-based solution. Devices connect to IoT Hub and start sending their data across. I explained what needs to be done to set up IoT Hub and register IoT devices with it in my previous post. I just upgraded “SimulatedDevice.java” to resemble my simulated crane and send additional metrics to cloud: “device_id”, “temperature”,”humidity”,”height”,”device_time”,”latitude”,”longitude”,”wind_speed”,”load_weight” and”lift_angle”. Most of these metrics will be used in my solution in one way or another. A sample record sent by the simulated crane looks like below:

Machine Learning

Those who know me are aware that I’m not a data scientist of any sort. I understand and appreciate most of the machine learning algorithms and the beautiful mathematical formulas behind them, but what I’m more interested in is how we can enable using those models and apply them to our day to day lives to solve problems in form of industry use cases.

For the purpose of this blog post, I assumed that there was an ML model that has ran and made its predictions on how much the crane needs to slow down based on what is happening in the field in terms of metrics sent by sensors installed on the crane:

This is how the sample output from our assumed ML model looks like:

As an example to clarify what this output means, let’s look at the first row. It specifies if temperature is between 0 and 10 degrees of celsius, and wind speed is between 0 and 5 km/h, and load height is between 15 and 20 meters, and load weight is between 0 and 2 tons, and load lift angle was between 0 and 5 degrees, then the crane needs to slow down by 5 percent. Let’s see how we can use this result set and take actions on our crane based on the data we receive in real time.

Azure Key Vault

There are a couple of different connection strings that we need for our solution to work, such as “IoT Hub event hub-compatible” and “service policy” connection strings. When building production grade solutions, it’s important to store sensitive information such as connection strings in a secure way. I will show how we can use plain-text connection string as well as accessing one stored in Azure Key Vault in the following sections of this post.

Our solution needs a way of connecting to the IoT Hub to invoke direct methods on IoT devices. For that, we need to get the connection string associated with Service policy of the IoT Hub using the following Azure cli command:

az iot hub show-connection-string --policy-name service --name <IOT_HUB_NAME> --output table

And store it in Azure Key Vault. So go ahead and create an Azure Key Vault and then:

  • Click on Secrets on the left pane and then click on Generate/Import
  • Select Manual for Upload options
  • Specify a Name such as <IOT_HUB_NAME>-service-policy-cnn-string
  • Paste the value you got from the above Azure Cli command in the Value text box

After completing the steps above and creating the secret, we will be able to use the service connection string associated with the IoT Hub in the next stages of our solution, which will be built in Databricks.

Databricks

In my opinion, the second most important factor separating Artificial Intelligence from other kinds of intelligent systems, after the complexity and type of algorithms, is the ability to process data and respond to events in real time. If the aim of AI is to replace most of human’s functionalities, the AI-powered systems must be able to mimic and replicate human brain’s ability to scan and act as events happen.

I am going to use Spark Streaming as the mechanism to process data in real time in this solution. The very first step is to set up Databricks, you can read on how to do that in my previous blog post. Don’t forget to install “azure-eventhubs-spark_2.11:2.3.6” library as instructed there. The code snippets you will see in the rest of this post are in Scala.

Load ML Model results

To be able to use the the results of our ML model, we need to load it as a Dataframe. I have the file containing the sample output saved in Azure Blob Storage. What I’ll do first is to mount that blob in DBFS:

dbutils.fs.mount( source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name(if any)>", mountPoint = "/mnt/ml-output", extraConfigs = Map("fs.azure.account.key.<storage-account-name>" -> "<Storage_Account_Key>"))

To get Storage_Account_Key, navigate to your storage account in Azure portal, click on Access Keys from left pane and copy the string under Key1 -> Key.

After completing above steps, we will be able to use the mount point in the rest of our notebook, which is much easier than having to refer to the storage blob every time. The next code snippet shows how we can create a Dataframe using the mount point we just created:

val CSV_FILE="/mnt/ml-output/ml_results.csv" val mlResultsDF = sqlContext.read.format("csv") .option("header","true") .option("inferSchema","true") .load(CSV_FILE)

Executing the cell above in Databricks notebook creates a DataFrame containing the fields in the sample output file:

Connect to IoT Hub and read the stream

This step is explained in my previous blog post as well, make sure you follow the steps in “Connect to IoT Hub and read the stream” section. For the reference, Below is the Scala code you would need to have in the next cell in the Databricks notebook:

import org.apache.spark.eventhubs._ import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition } import org.apache.spark.sql.functions.{ explode, split } val connectionString = ConnectionStringBuilder("<Event hub connection string from Azure portal>").setEventHubName("<Event Hub-Compatible Name>") .build val eventHubsConf = EventHubsConf(connectionString) .setStartingPosition(EventPosition.fromEndOfStream) .setConsumerGroup("<Consumer Group>") val sensorDataStream = spark.readStream .format("eventhubs") .options(eventHubsConf.toMap) .load()

Apply structure to incoming data

Finishing previous step gives us a DataFrame containing the data we receive from our connected crane. If we run a display command on the DataFrame, we see that the data we received does not really resemble what is sent by the simulator code. That’s because the data sent by our code actually goes into the first column, body, in binary format. We need to apply appropriate schema on top of that column to be able to work with the incoming data with structured mechanism, e.g. SQL.

import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ val schema = (new StructType) .add("device_id", StringType) .add("temperature", DoubleType) .add("humidity", DoubleType) .add("height", DoubleType) .add("device_time", MapType( StringType, new StructType() .add("year", IntegerType) .add("month", IntegerType) .add("day", IntegerType) .add("hour", IntegerType) .add("minute", IntegerType) .add("second", IntegerType) .add("nano", IntegerType) ) ) .add("latitude", DoubleType) .add("longitude", DoubleType) .add("wind_speed", DoubleType) .add("load_weight", DoubleType) .add("lift_angle", DoubleType) val sensorDataDF = sensorDataStream .select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub-connection-device-id").as("Device_ID") ,(from_json($"body".cast("string"), schema).as("telemetry_json"))) .withColumn("eventTimeString", concat($"telemetry_json.device_time.date.year".cast("string"),lit("-"), $"telemetry_json.device_time.date.month".cast("string") ,lit("-"), $"telemetry_json.device_time.date.day".cast("string") ,lit(" "), $"telemetry_json.device_time.time.hour".cast("string") ,lit(":"), $"telemetry_json.device_time.time.minute".cast("string") ,lit(":"), $"telemetry_json.device_time.time.second".cast("string") )) .withColumn("eventTime", to_timestamp($"eventTimeString")) .select("Device_ID","telemetry_json.temperature","telemetry_json.height","telemetry_json.wind_speed","telemetry_json.load_weight","telemetry_json.lift_angle" ,"eventTimeString","eventTime") .withWatermark("eventTime", "5 seconds")

The code above does the following:

  • Defines the schema matching the data we produce at source
  • Applies the schema on the incoming binary data
  • Extracts the fields in a structured format including the time the record was generated at source, eventTime
  • Uses the eventTime column to define watermarking to deal with late arriving records and drop data older than 5 seconds

The last point is important to notice. The solution we’re building here is to deal with changes in the environment where the crane is operating in, in real-time. This means that the solution should wait for the data sent by the crane only for a limited time, in this case 5 seconds. The idea is that the solution shouldn’t take actions based on old data, since a lot may change in the environment in 5 seconds. Therefore, it drops late records and will not consider them. Remember to change that based on the use case you’re working on.

Running display on the resulting DataFrame we get following:

display(sensorDataDF)

Decide when to take action

Now that we have both the ML model results and IoT Device data in separate DataFrames, we are ready to code the logic that defines when our solution should send a command back to the device and command it to slow down. One point that is worth noticing here is that the mlResultsDF is a static DataFrame whereas sensorDataDF is a streaming one. You can check that by running:

println(sensorDataDF.isStreaming)

Let’s go through what needs to be done at this stage one more time: as the data streams in from the crane, we need to compare it with the result of the ML model and slow it down when the incoming data falls in the range defined by the ML model. This is easy to code: all we need to do is to join the 2 datasets and check for when this rule is met:

val joinedDF = sensorDataDF
.join(mlResultsDF, $"temperature" >= $"Min_Temperature" && $"temperature" < $"Max_Temperature" && $"wind_speed" >= $"Min_Wind_Speed" && $"wind_speed" < $"Max_Wind_Speed" && $"load_weight" >= $"Min_Load_Weight" && $"load_weight" < $"Max_Load_Weight" && $"height" >= $"Min_Load_Height" && $"height" < $"Max_Load_Height" && $"lift_angle" >= $"Min_Lift_Angle" && $"lift_angle" < $"Max_Lift_Angle")

The result of running above cell in Databricks notebook would be a streaming DataFrame which contains the records our solution need to act upon.

Connect to IoT Hub where devices send data to

Now that we have worked out when our solution should react to incoming data, we can move to the next interesting part which is defining how and what of taking action on the device. We already discussed that an action is taken on the crane by calling a direct method. This method instructs crane to slow down by the percentage that is passed in as the parameter. If you look into the SimulatedDevice.java file in the azure-iot-samples-java github repo, there is a switch expression in the “call” function of “DirectMethodCallback” class which defines the code to be executed based on the different direct methods called on the IoT device. I extended that to simulate crane being slowed down, but you can work with the existing “SetTelemetryInterval” method.

So, what we need to do at this stage is to connect to the IoT Hub where the device is registered with from Databricks notebook, and then invoke the direct method URL which should be in the form of “https://{iot hub}/twins/{device id}/methods/?api-version=2018-06-30

To authenticate with IoT Hub, we need to create a SAS token in form of “SharedAccessSignature sig=<signature>&se=<expiryTime>&sr=<resourceURI>”. Remember the service level policy connection string we put in Azure Key Vault? We are going to use that to build the SAS Token. But first, we need to extract the secrets stored in Key Vault:

val vaultScope = "kv-scope-01" var keys = dbutils.secrets.list(vaultScope) val keysAndSecrets = collection.mutable.Map[String, String]() for (x <- keys){ val scopeKey = x.toString().substring(x.toString().indexOf("(")+1,x.toString().indexOf(")")) keysAndSecrets += (scopeKey -> dbutils.secrets.get(scope = vaultScope, key = scopeKey) ) }

After running the code in above cell in Databricks notebook, we get a map of all the secrets stored in the Key Vault in form of (“Secret_Name” -> “Secret_Value”).

Next, we need a function to build the SAS token that will be accepted by IoT Hub when authenticating the requests. This function will do the followings:

  • Uses the IoT Hub name to get the service policy connection string from the Map of secrets we built previously
  • Extracts components of the retrieved connection string to build host name, resource uri and shared access key
  • Computes a Hash-based Message Authentication Code (HMAC) by using the SHA256 hash function, from the shared access key in the connection string
  • Uses an implementation of “Message Authentication Code” (MAC) algorithm to create the signature for the SAS Token
  • And finally returns SharedAccessSignature

import javax.crypto.Mac import javax.crypto.spec.SecretKeySpec import java.net.URLEncoder import java.nio.charset.StandardCharsets import java.lang.System.currentTimeMillis val iotHubName = "test-direct-method-scala-01" object SASToken{ def tokenBuilder(deviceName: String):String = { val iotHubConnectionString = keysAndSecrets(iotHubName+"-service-policy-cnn-string") val hostName = iotHubConnectionString.substring(0,iotHubConnectionString.indexOf(";")) val resourceUri = hostName.substring(hostName.indexOf("=")+1,hostName.length) val targetUri = URLEncoder.encode(resourceUri, String.valueOf(StandardCharsets.UTF_8)) val SharedAccessKey = iotHubConnectionString.substring(iotHubConnectionString.indexOf("SharedAccessKey=")+16,iotHubConnectionString.length)//iotHubConnectionStringComponents(2).split("=") val currentTime = currentTimeMillis() val expiresOnTime = (currentTime + (365*60*60*1000))/1000 val toSign = targetUri + "\n" + expiresOnTime; var keyBytes = java.util.Base64.getDecoder.decode(SharedAccessKey.getBytes("UTF-8")) val signingKey = new SecretKeySpec(keyBytes, "HmacSHA256") val mac = Mac.getInstance("HmacSHA256") mac.init(signingKey) val rawHmac = mac.doFinal(toSign.getBytes("UTF-8")) val signature = URLEncoder.encode( new String(java.util.Base64.getEncoder.encode(rawHmac), "UTF-8")) val sharedAccessSignature = s"Authorization: SharedAccessSignature sr=test-direct-method-scala-01.azure-devices.net&sig="+signature+"&se="+expiresOnTime+"&skn=service" return sharedAccessSignature } }

The code above is the part I am most proud of getting to work as part of this blog post. I had to go through literally several thousands of lines of Java code to figure out how Microsoft does it, and convert it to Scala. But please let me know if you can think of a better way of doing it.

Take action by invoking direct method on the device

All the work we did so far was to prepare for this moment: to be able to call a direct method on our simulated crane and slow it down based on what the ML algorithm dictates. And we need to be able to do so as records stream into our final DataFrame, joinedDF. Let’s define how we can do that.

We need to write a class that extends ForeachWrite. This class needs to implement 3 methods:

  • open: used when we need to open new connections, for example to a data store to write records to
  • process: the work to be done whenever a new record is added to the streaming DataFrame is added here
  • close: used to close the connection opened in first method, if any

We don’t need to open and therefore close any connections, so let’s check the process method line by line:

  1. Extracts device name from incoming data. If you run a display on joinedDF, you’ll see that the very first column is the device name
  2. Builds “sharedAccessSignature” bu calling SASToken.tokenBuilder and passing in the device name
  3. Builds “deviceDirectMethodUri” in ‘{iot hub}/twins/{device id}/methods/’ format
  4. Builds “cmdParams” to include the name of the method to be called, response timeout, and the payload. Payload is the adjustment percentage that will be sent to the crane
  5. Builds a curl command with the required parameters and SAS token
  6. Executes the curl command

import org.apache.spark.sql.{ForeachWriter, Row}
import java.util.ArrayList
import sys.process._
class StreamProcessor() extends ForeachWriter[Row] {
def open(partitionId: Long, epochId: Long) = {
println("Starting.. ")
true
}
def process(row: Row) = {
val deviceName = row(0).toString().slice(1,row(0).toString().length-1)
val sharedAccessSignature = SASToken.tokenBuilder(deviceName)
val deviceDirectMethodUri = "https://test-direct-method-scala-01.azure-devices.net/twins/"+deviceName+"/methods?api-version=2018-06-30"
val cmdParams = s"""{"methodName": "setHeightIncrements","responseTimeoutInSeconds": 10,"payload":"""+ row(18).toString()+"}"
val cmd = Seq("curl","-X", "POST", "-H", sharedAccessSignature, "-H","Content-Type: application/json" ,"-d", cmdParams,deviceDirectMethodUri)
cmd.!
}
def close(errorOrNull: Throwable) = {
if (errorOrNull!=null){
println(errorOrNull.toString())
}
}
}

The very last step is to call the methods we defined in the StreamProcessor class above as the records stream in from our connected crane. This is done by calling foreach sink on writeStream:

val query =
joinedDF
.writeStream
.foreach(new StreamProcessor())
.start()

And we’re done. We have solution that is able to control theoretically millions of IoT devices using only 3 services on Azure.

The next step to go from here would be to add security measures and mechanisms to our solution, as well as monitoring and alerting. Hopefully I’ll get time to do them soon.

Stream IoT sensor data from Azure IoT Hub into Databricks Delta Lake

Featured

IoT devices produce a lot of data very fast. Capturing data from all those devices, which could be at millions, and managing them is the very first step in building a successful and effective IoT platform.

Like any other data solution, an IoT data platform could be built on-premise or on cloud. I’m a huge fan of cloud based solutions specially PaaS offerings. After doing a little bit of research I decided to go with Azure since it has the most comprehensive and easy to use set of service offerings when it comes to IoT and they are reasonably priced. In this post, I am going to show how to build the architecture displayed in the diagram below: connect your devices to Azure IoT Hub and then ingest records into Databricks Delta Lake as they stream in using Spark Streaming.

Solution Architecture

Setup Azure IoT Hub and Register a Device

The very first step is to set up Azure IoT Hub, register a device with it and test it by sending data across. This is very well explained by Microsoft here. Make sure you follow all the steps and you’re able to read the messages sent to IoT Hub at the end.

The only extra step we need to take is to add a new consumer group to the IoT Hub. Doing this means our Spark Streaming application will have its own offset, tracking where in the queue it has last read the records coming from devices. By assigning unique consumer groups to each application that subscribes to IoT Hub, we can send the record coming from IoT devices to multiple destinations, for example to store them in Blob storage, send them to Azure Stream Analytics and do real-time analytics, as well as a delta table in Databricks Delta Lake.

Navigate to IoT Hub page on the Azure portal and select your hub. Click on Built-in endpoints and add a new Consumer Group:

Add Consumer Group to IoT Hub

Databricks: Unified Analytics Platform & Delta Lake

Moving on to the next layer in our architecture, we’re going to set up Databricks. Databricks offers a platform that unifies data engineering, data science and business logic. It is basically PaaS offering for Spark on cloud, which speeds up data exploration and preparation.

Why Delta?

Delta Lake is a storage layer invented by Databricks to bring ACID transactions to big data workloads. This is a response to limitation within the existing big data storage mechanisms like Parquet: They are immutable. To update a record within a Parquet file, you need to re-write the whole file. With Delta, you can easily write update statements at records level. This is all we need to know about Delta file format for the purpose of what we want to build here, more about is here.

A very important result of this feature for IoT and streaming use cases is that we will be able to query the data as they arrive, instead of having to wait for a partition to be updated (re-written)

In this solution we will see how to set up Databricks, use Spark Streaming to subscribe to records coming in to Azure IoT Hub, and write them to a Delta table.

Setup Databricks

Navigate to Azure Portal and click on Create a Resource -> Analytics -> Azure Databricks. This is where you create a workspace, which is where you can access all your databricks assets. Fill up the new form that opens up and make sure you select Standard for pricing tier. Then hit Create:

Create Databricks Workspace

When the workspace is created, go to Azure Databricks Workspace resource page and click on Lunch Workspace. You will be navigated to your workspace. Create a new cluster with the same properties you see in the picture below. You can ask for bigger nodes or enable autoscaling, but it’s not needed for this tutorial:

Create Databricks Cluster

The next step is to create a notebook. Click on Home -> <Your Email Address> -> Create -> Notebook. Give it a name, select Scala as the default language of the notebook (you can change it later using %), and select the cluster where this notebook’s commands will run on.

Structured Streaming from IoT Hub

Create and install required Maven library

For our streaming solution to work, we need to install ” azure-eventhubs-spark_2.11:2.3.6″ Maven library. The steps to do that are very easy:

Open the notebook and click on Workspace -> [User_Name] -> Create -> Library:

Select Maven and copy and paste ” com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6″ into Coordinates box, then click on Create:

After the library is created, you’ll be redirected to the page where you can install your library on the existing clusters. Select the cluster where you’ll be running your Spark streaming code and click Install:

Important: You should restart your cluster after the installation is complete for it to take effect.

Connect to IoT Hub and read the stream

import org.apache.spark.eventhubs._
import  org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import  org.apache.spark.sql.functions.{ explode, split }

// To connect to an Event Hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("--IOT HUB CONNECTION STRING FROM AZURE PORTAL--")
  .setEventHubName("--IoT Hub Name--")
  .build
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)
  .setConsumerGroup("delta")
  
val eventhubs = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

The code snippet above first builds a connection string pointing to the IoT Hub we created before. The only extra steps you need to take is to get the connection string from Azure portal and replace it in ConnectionStringBuilder and then change the name in .setEventHubName to “<Event Hub-compatible name>” accordingly. Open Azure portal and go to your IoT Hub’s page. Click on Built-in endpoints and copy what you see below and paste in the code snippet in the notebook:

IoT Hub Endpoint Details

What we get after those commands are completed successfully is a DataFrame that has the following fields in it. The messages coming from our IoT device are in the “body” field:

To see how the incoming data from the IoT sensor looks like just run:

display(eventhubs)

Extract device data and create a Spark SQL Table

The next step would be to extract the device data coming in the body field of the DataFrame we built in previous step and build the DataFrame comprising of the fields we want to store in our Delta Lake to do analytics on later:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._
val schema = (new StructType)
    .add("temperature", DoubleType)
    .add("humidity", DoubleType)
val df = eventhubs.select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub-connection-device-id")
                  .as("Device_ID"),(from_json($"body".cast("string"), schema)
                  .as("telemetry_json"))).select("Enqueued_Time","Device_ID", "telemetry_json.*")

The resulting DataFrame looks like:

Now we can create a table from our DataFrame and start writing SQL commands on it:

df.createOrReplaceTempView("device_telemetry_data")

Create the final DataFrame and write stream to Delta table

We’re almost there. We have the data we receive from our IoT device in a Spark SQL table, which enables us to transform it easily with SQL commands.

Tables in a Big Data ecosystem are supposed to be partitioned. I mean they better be, otherwise they’ll cause all sorts of problems. The reason I extracted Enqueued_Time from JSON was to be able to partition my table by date/hour. IoT devices produce a lot of data and partitioning them by hour not only makes each partition reasonably sized, but also enable a certain type of analytics to be performed on the data when companies need to predict the performance of their devices at different times of the day or night, for example.

val finalDF = spark.sql("Select Date(Enqueued_Time) Date_Enqueued
, Hour(Enqueued_Time) Hour_Enqueued, Enqueued_Time, Device_ID
, temperature AS Temperature, humidity as Humidity  
from device_telemetry_data")

The resulting DataFrame has the following schema:

The final step is to write the stream to a Delta table:

finalDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .format("delta")
  .partitionBy("Date_Enqueued", "Hour_Enqueued")
  .table("delta_telemetry_data")

Let’s check the options passed to writeStream:

  • outputMode: Specifies how the records of a streaming DataFrame are written to the streaming sink. There are 2 modes:
    • Append: Only the new records will be written to the sink
    • Complete: All records will be written to the sink every time there is an update
    • Update: Only the updated records will be outputed to sink
  • option: checkpointLocation
    • This is needed to ensure fault-tolerance. Basically we specify a location to save all application progress information. This is specially important in case of a Driver failure, read more here.
  • format: The output sink where the result will be written, obviously “delta”.
  • partitionBy: The column(s) by which we want our table to be partitioned by. We decided to partition our table hourly as explained above, so we pass in date and hour.
  • table: The name of the table.

If all the steps above have worked, you should be able to query your table and see the records inserted into the Delta table by running the following command:

%sql
SELECT * FROM delta_telemetry_data

And we’re done! Now we have a table in our Delta Lake that holds our IoT devices data. You can take it from here and do ML on the data collected or mix it with other tables you might have in your Delta Lake. And definitely feel free to ask your questions below in comments section.

How to import spark.implicits._ in Spark 2.2: error “value toDS is not a member of org.apache.spark.rdd.RDD”

I wrote about how to import implicits in spark 1.6 more than 2 years ago. But things have changed in Spark 2.2: the first thing you need to do when coding in Spark 2.2 is to set up an SparkSession object. SparkSession is the entry point to programming Spark with DataSet and DataFrame.

Like Spark 1.6, spark.implicits are required to be able to use Spark’s API for DataSets and DataFrames in version 2.2. And like version 1.6, an instance of SparkContext is needed in Spark 2.2 before being able to import spark.implicits. Since each instance of SparkSession comes with and an instance of SparkContext associated with it, all you have to do is to create an object of SparkSession and you’re set.

I have seen other posts that mention bits and pieces of how to do it. Here I give you the full code that works just fine and you can tweek it based on your requirements:

import org.apache.spark.sql._

import org.apache.log4j._

object sparkSQLWithCaseClass {

case class Person (ID: Int, name: String)

def mapper(l: String): Person = {

val fields = l.split(‘,’)

val person: Person = Person(fields(0).toInt, fields(1))

return person

}

 

def main(args: Array[String]){

Logger.getLogger(“org”).setLevel(Level.ERROR)

val spark = SparkSession.builder.appName(“Spark SQL”).getOrCreate()

 

val lines = spark.sparkContext.textFile(“../../people.csv”)

val people = lines.map(mapper)

 

import spark.implicits._

val schemaPeople = people.toDS()

schemaPeople.printSchema()

schemaPeople.createOrReplaceTempView(“people”)

 

val t = spark.sql(“select * from people where age >= 13”)

val res = t.collect()

res.foreach(println)

spark.stop()

}

}

Spark Error “java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE” in Spark 1.6

RDDs are the building blocks of Spark and what make it so powerful: they are stored in memory for fast processing. RDDs are broken down into partitions (blocks) of data, a logical piece of distributed dataset.

The underlying abstraction for blocks in Spark is a ByteBuffer, which limits the size of the block to 2 GB.

In brief, this error means that the block size for the resulting RDD is larger than 2GB: https://issues.apache.org/jira/browse/SPARK-1476

One way to work around this issue is to increase application’s parallelism. We can define the default number of partitions in RDDs returned by join and reduceByKey, by adjusting

spark.default.parallelism

What this configuration parameter does is basically to define how many blocks of data our dataset, in this case RDD, is going to be divided into.

As you have probably realized by now, we would need to set spark.default.parallelism to a higher value when processing large datasets. This way we can make sure the size of data blocks do not exceed 2GB limitations.

Spark Error CoarseGrainedExecutorBackend Driver disassociated! Shutting down: Spark Memory & memoryOverhead

Another common error we saw in yarn application logs was this:

17/08/31 15:58:07 WARN CoarseGrainedExecutorBackend: An unknown (datanode-022:43969) driver disconnected.

17/08/31 15:58:07 ERROR CoarseGrainedExecutorBackend: Driver 10.1.1.111:43969 disassociated! Shutting down.

Googling this error suggests increasing spark.yarn.driver.memoryOverhead or spark.yarn.executor.memoryOverhead or both. That has apparently worked for a lot of people. Or at least those who were smart enough to understand how these properties work.

What you need to consider here is that memoryOverhead is allocated out of the total amount of memory available to driver or executor, which is controlled by spark.driver.memory & spark.executor.memory.

What this means is that if you’re increasing executor’s or driver’s memoryOverhead, double check if there is enough memory allocated to driver and executor or not. In our case, the user was allocating all the memory available to driver as memoryOverhead, which meant there was none left for other other driver operations:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=8g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=8g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

You can clearly see what I meant in above paragraph. Instead of doing this, user should have increased executor and driver memory according to increase in executor memory overhead:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=16g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=16g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

 

Spark Error: Failed to Send RPC to Datanode

This past week we had quite few issues with users not being able to run Spark jobs running in YARN Cluster mode. Particularly a team that was on tight schedule used to get errors like this all the time:

java.io.IOException: Failed to send RPC 8277242275361198650 to datanode-055: java.nio.channels.ClosedChannelException

Mostly accompanied by error messages like:

org.apache.spark.SparkException: Error sending message [message = Heartbeat(9,[Lscala.Tuple2;@e47ba81,BlockManagerId(9, datanode-50 , 43381))]

ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times

These errors basically mean the connection between Spark driver and executors are broken, mainly because executor is killed. This could happen because of a number of reasons:

1- We realized this happens a lot more often when our cluster is too busy and has hit maximum usage. What it means is that executors are accepted to DataNodes, but they fail to acquire enough memory on the datanode and therefore get killed.

2- Metaspace attempts to grow beyond the executor(JVM) memory limits, resulting in loss of executors.The best way to stop this error from appearing is to set below properties when launching Spark-Shell or submitting application using spark-submit:

spark.driver.extraJavaOptions = -XX:ReservedCodeCacheSize=100M-XX:MaxMetaspaceSize=256m

-XX:CompressedClassSpaceSize=256m

spark.executor.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

-XX:MaxMetaspaceSize=256m

-XX:CompressedClassSpaceSize=256m

Please note that depending on your project and code, you may need to increase the values mentioned above.

3- Network is slow for whatever reason. In our case, this was caused by a change in DNS which resulted in turning off caching.This case could be fixed by adjusting spark.executor.heartbeatInterval and spark.network.timeout. Default values for these 2 parameters are 10s and 120s. You can adjust these 2 values based on how your network, the only point to consider here is that the later property, spark.network.timeout, should be greater than the first one.

If none of what mentioned above helps your situation, then it is something you need to take to your cluster’s administrator. There could be something wrong with the datanodes where executors are sent to that admins are not aware of.

Happy coding!

How to import org.apache.spark.sql.SQLContext.implicits in Spark 1.6: error “value toDF is not a member of org.apache.spark.rdd.RDD”

Note: If you’re using Spark 2.2, please read this post

I am doing a mini project for my company using Spark/Scala and have been stuck with the error mentioned in the title for a couple of days. Googling that error suggested to import org.apache.spark.sql.SQLContext.implicits, and that’s what I did:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext.implicits

import org.apache.spark.SparkConf
object TestSQLContext {
[…..]
def main(args:Array[String]) {
[…..]
}
}

And that was the start of the problem: my application started to give a new error:

object SQLContext is not a member of package org.apache.spark.sql
[error] Note: class SQLContext exists, but it has no companion object.

The problem is, none of those online posts mention that we need to create an instance of org.apache.spark.sql.SQLContext before being able to use its members and methods. This is the right way to do it:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
object Hi {

case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)

def main(args:Array[String]) {
val conf = new SparkConf().setAppName(“LoadDW”)
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val fDimCustomer = sc.textFile(“DimCustomer.txt”)

var dimCustomer1 =   fDimCustomer.map(_.split(‘,’)).map(r=>DimC(r(0).toInt,r(1),r(2),r(3).toInt,r(4).toInt)).toDF

dimCustomer1.registerTempTable(“Cust_1”)

val customers = sqlContext.sql(“select * from Cust_1”)

customers.show()
}
}

Hope this post helps and please do not hesitate to ask your questions in comments section.

Cheers.