Advanced data loading - Latest

Property Graph Getting Started Guide

Product type
Data
Portfolio
Enrich
Product family
Enrich Addresses > World Addresses
Product
Property Graph
Version
Latest
Language
English
Product name
Property Graph
Title
Property Graph Getting Started Guide
Copyright
2024
First publish date
2021
Last updated
2024-06-25
Published on
2024-06-25T18:43:55.764364

A Kotlin program that downloads source files and manages its own connection to a PostgreSQL server can be created to automate the process of data loading and achieve optimal performance.

First, create functions for downloading files and establishing a connection to the database:

val files: ZipInputStream = getFilesFromS3()
val connection = createDbConnection()
connection.autoCommit = false

Next, implement methods for processing the files, ideally with ZipInputStream, which allows a remote archive to be directly processed and loaded into the database:

val logger = Logger.getLogger("FileProcessor")
val rowsPerTempFile = 100000

val copySql = """
	COPY my_schema.property_graph_202301_classx (
		class, primary_key, secondary_key, type, state
	) FROM STDIN WITH (FORMAT CSV, NULL ' ')
		""".trimIndent ()

Sample implementation:

fun processfiles(
    files: List<File>?,
    copySql: String,
    connection: Connection, 
    rowsPerTempFile: Int)
{
    files?.forEach { file ->
        logger.info("Processing: $file")
        processFile(file, copySql, connection, rowsPerTempFile)
        logger.info("Finished processing: $file")
    }
    connection.close()
}
fun processfile(file: File, copySql: string, connection, Connection, 
rowsPerTempFile: Int)
{
    ZipInputStream(file,inputStream()).use { zipStream ->
InputStreamReader(zipStream).use { reader ->
    BufferedReader(reader).use { bufferedReader ->
        var rowCount = 0
        var tempFile: File? = null
        var writer: BufferedWriter? = null
        bufferedReader.forEachLine { line ->
            if (rowCount % rowsPerTempFile ==0) {
                closeTempFileAndLoadData(copySql, tempFile, writer, connection)
                tempFile = Files.createTempFile("temp", ".csv").toFile()
                writer = tempFile!!.bufferedWriter()
            }
            writer?.write("$line\n")
            rowCount++
        }
        closeTempFileAndLoadData(copySql, tempFile, writer, connection)
    }
}
fun closeTempFileAndLoadData(
    copySql: String, 
    tempFile: File?,
    writer: BufferedWriter?,
    connection: Connection)
{
    writer?.close()
    tempFile?.let { temp ->
        FileInputStream(temp).use { inputStream ->
            val copyManager = CopyManager(
                connection.unwrp(BaseConnection::class.java)
            }
            copymanager.copyIn(copySql, inputStream)
        }
        connection.commit()
        logger.info("Finished processing chunk of $rowsPerTempFile rows")
        temp.delete()
    }
}