Running Spark SQL functions on AWS EMR - Spectrum_Location_Intelligence_for_Big_Data - 5.2.1

Location Intelligence SDK for Big Data Guide

Product type
Software
Portfolio
Locate
Product family
Spectrum
Product
Spatial Big Data > Location Intelligence SDK for Big Data
Version
5.2.1
Language
English
Product name
Location Intelligence for Big Data
Title
Location Intelligence SDK for Big Data Guide
Copyright
2024
First publish date
2015
Last updated
2024-10-16
Published on
2024-10-16T13:55:01.634374

Step 1: Create Cluster

Create a new cluster as per your requirements. We support EMR versions starting from emr-6.2.0+. You can choose any version among the supported versions. We recommend using a cluster with minimum of 32 GB memory and 4 cores.

Step 2: Install SDK jar

First, you need to upload the distribution zip to an AWS S3 bucket. After your cluster has started and is in waiting state, you can copy the distribution zip from S3 to primary node of the cluster and extract it. Copy all folders inside the zip to hdfs path of your choice. We will need this path later to configure our spark session.

Step 3: Create a notebook to perform the operations.

Go to EMR studio and create a notebook and attach your cluster with notebook.

  • Configure spark session first as below:
    %%configure -f
    {
     "conf": {
     "spark.jars": "<hdfs-path-to-extracted-zip>/spark3/driver/location-intelligence-bigdata-spark3drivers_2.12-5.2.1-all.jar",
     "spark.sql.legacy.allowUntypedScalaUDF": true
     }
    }     
  • Now check whether you can import SQLRegistrator class.
    import com.precisely.bigdata.li.spark.api.udf.SQLRegistrator
    import org.apache.spark.sql.SparkSession
                
    var sparkSession = SparkSession.builder()
     .appName("testApplication")
     .getOrCreate()
    SQLRegistrator.registerAll()

    This will register all the custom functions that we offer for usage.

  • Access the data from DBFS and open it as a Databricks table.
    var input_file_path = "<path_to_input_data>"
    var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(input_file_path);
    df.createOrReplaceTempView("quickGuideTable")         

    This uses read command of spark to create a dataframe out of csv file which can be used to perform different spark operations. Use different csv options as per requirement to read your data as accurately as possible. For more information about CSV options, see spark csv options.

    Last line creates a databricks view from dataframe against which we are going to run SQL commands.

  • Run spatial operations like SQL commands.

    val resultDf = spark.sql("select CONCAT(ROUND(ST_Area(ST_GeomFromWKT(WKT), 'sq mi', 
    'SPHERICAL'), 2), ' sq mi') as area, ST_Perimeter(ST_GeomFromWKT(WKT), 'mi', 'SPHERICAL') 
    as perimeter,  State_Name, State FROM quickGuideTable ORDER BY perimeter")

    First, we convert the geometry data from WKT format to Geom format using ST_GeomFromWKT() and then run ST_Area() and ST_Perimeter() functions to calculate area and perimeter of geometry respectively. Dataframe 'resultDf' stores the result of SQL query. For more information about spatial functions, see Spark SQL functions.

  • Write the resultant dataframe into DBFS.
    resultDf.write.mode(SaveMode.Overwrite).option("header", "true")
     .option("delimiter", ",").format("csv")
      .save("dbfs:/<dbfs_file_path>/output_area_and_perimter")   

    This writes or replaces the output of SQL query in a csv file with the options specified at a path specified.