A Kotlin program that downloads source files and manages its own connection to a PostgreSQL server can be created to automate the process of data loading and achieve optimal performance.
First, create functions for downloading files and establishing a connection to the database:
val files: ZipInputStream = getFilesFromS3()
val connection = createDbConnection()
connection.autoCommit = false
Next, implement methods for processing the files, ideally with ZipInputStream, which allows a remote archive to be directly processed and loaded into the database:
val logger = Logger.getLogger("FileProcessor")
val rowsPerTempFile = 100000
val copySql = """
COPY my_schema.property_graph_202301_classx (
class, primary_key, secondary_key, type, state
) FROM STDIN WITH (FORMAT CSV, NULL ' ')
""".trimIndent ()
Sample implementation:
fun processfiles(
files: List<File>?,
copySql: String,
connection: Connection,
rowsPerTempFile: Int)
{
files?.forEach { file ->
logger.info("Processing: $file")
processFile(file, copySql, connection, rowsPerTempFile)
logger.info("Finished processing: $file")
}
connection.close()
}
fun processfile(file: File, copySql: string, connection, Connection,
rowsPerTempFile: Int)
{
ZipInputStream(file,inputStream()).use { zipStream ->
InputStreamReader(zipStream).use { reader ->
BufferedReader(reader).use { bufferedReader ->
var rowCount = 0
var tempFile: File? = null
var writer: BufferedWriter? = null
bufferedReader.forEachLine { line ->
if (rowCount % rowsPerTempFile ==0) {
closeTempFileAndLoadData(copySql, tempFile, writer, connection)
tempFile = Files.createTempFile("temp", ".csv").toFile()
writer = tempFile!!.bufferedWriter()
}
writer?.write("$line\n")
rowCount++
}
closeTempFileAndLoadData(copySql, tempFile, writer, connection)
}
}
fun closeTempFileAndLoadData(
copySql: String,
tempFile: File?,
writer: BufferedWriter?,
connection: Connection)
{
writer?.close()
tempFile?.let { temp ->
FileInputStream(temp).use { inputStream ->
val copyManager = CopyManager(
connection.unwrp(BaseConnection::class.java)
}
copymanager.copyIn(copySql, inputStream)
}
connection.commit()
logger.info("Finished processing chunk of $rowsPerTempFile rows")
temp.delete()
}
}