Step 1: Create Cluster
Create a new cluster as per your requirements. We support EMR versions starting from emr-6.2.0+. You can choose any version among the supported versions. We recommend using a cluster with minimum of 32 GB memory and 4 cores.
Step 2: Install SDK jar
First, you need to upload the distribution zip to an AWS S3 bucket. After your cluster has started and is in waiting state, you can copy the distribution zip from S3 to primary node of the cluster and extract it. Copy all folders inside the zip to hdfs path of your choice. We will need this path later to configure our spark session.
Step 3: Create a notebook to perform the operations.
Go to EMR studio and create a notebook and attach your cluster with notebook.
- Configure spark session first as below:
%%configure -f { "conf": { "spark.jars": "<hdfs-path-to-extracted-zip>/spark3/driver/location-intelligence-bigdata-spark3drivers_2.12-5.2.1-all.jar", "spark.sql.legacy.allowUntypedScalaUDF": true } }
- Now check whether you can import SQLRegistrator class.
import com.precisely.bigdata.li.spark.api.udf.SQLRegistrator import org.apache.spark.sql.SparkSession var sparkSession = SparkSession.builder() .appName("testApplication") .getOrCreate() SQLRegistrator.registerAll()
This will register all the custom functions that we offer for usage.
- Access the data from DBFS and open it as a Databricks table.
var input_file_path = "<path_to_input_data>" var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(input_file_path); df.createOrReplaceTempView("quickGuideTable")
This uses read command of spark to create a dataframe out of csv file which can be used to perform different spark operations. Use different csv options as per requirement to read your data as accurately as possible. For more information about CSV options, see spark csv options.
Last line creates a databricks view from dataframe against which we are going to run SQL commands.
-
Run spatial operations like SQL commands.
val resultDf = spark.sql("select CONCAT(ROUND(ST_Area(ST_GeomFromWKT(WKT), 'sq mi', 'SPHERICAL'), 2), ' sq mi') as area, ST_Perimeter(ST_GeomFromWKT(WKT), 'mi', 'SPHERICAL') as perimeter, State_Name, State FROM quickGuideTable ORDER BY perimeter")
First, we convert the geometry data from WKT format to Geom format using ST_GeomFromWKT() and then run ST_Area() and ST_Perimeter() functions to calculate area and perimeter of geometry respectively. Dataframe 'resultDf' stores the result of SQL query. For more information about spatial functions, see Spark SQL functions.
- Write the resultant dataframe into DBFS.
resultDf.write.mode(SaveMode.Overwrite).option("header", "true") .option("delimiter", ",").format("csv") .save("dbfs:/<dbfs_file_path>/output_area_and_perimter")
This writes or replaces the output of SQL query in a csv file with the options specified at a path specified.