Commit 5b3c14d8 authored by 박준형's avatar 박준형

Upload New File

parent 2e92426f
package rtu.process
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, TimestampType}
import rtu.StatisticsProcess
import rtu.schema.RTUFileSchema
import scala.collection.mutable
class StatisticsPersonalGen extends StatisticsProcess {
override var name: String = _
override var isLocal : Boolean = _
override var spark: SparkSession = _
override var properties: mutable.Map[String, String] = _
override def process(date: String): Unit = {
val propSparkSqlshufflePartitions = properties.get("spark.sql.shuffle.partitions")
val propPrintlnPartitionSize = properties.get("print.partition.size")
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * (1024 * 256))
if (!propSparkSqlshufflePartitions.isEmpty) {
val sparkSqlshufflePartitions: Int = propSparkSqlshufflePartitions.get.toInt
println("Set spark.sql.shuffle.partitions : " + sparkSqlshufflePartitions)
spark.conf.set("spark.sql.shuffle.partitions", sparkSqlshufflePartitions)
}
var isPrintPartitionSize: Boolean = false
if (!propPrintlnPartitionSize.isEmpty) {
isPrintPartitionSize = propPrintlnPartitionSize.get.equals("true")
}
val dataArray = date.split("-")
val yearVal = dataArray.apply(0)
val monthVal = dataArray.apply(1)
val fileName = properties.get("file.path").get + "/" + yearVal + "/" + monthVal + "/" + date + ".csv.gz"
val rtufileName = properties.get("file.path").get + "/" + properties.get("info.file").get
println("fileName : " + fileName + ", rtufileName : " + rtufileName)
val df = spark.read.option("header", "true")
.option("io.compression.codecs", "nl.basjes.hadoop.io.compress.SplittableGzipCodec")
.schema(RTUFileSchema.schema_data).csv(fileName)
println("read csv")
/**
* time(per 15min), dongcode, customer_id, gen(avg for 15min), avg(max(gen) for 15min) for day
* */
val df_accumulated_power = df.selectExpr("DEVICE_ID",
"RESOURCE_URI",
"RESOURCE_VALUE",
"IS_CHANGED",
"EVENT_TYPE",
"DATA_TIME",
"DATE_FORMAT(DATA_TIME, 'yyyyMMdd') AS TIME_DAY",
"DATE_FORMAT(DATA_TIME, 'yyyyMMddHH') AS TIME_HOUR",
"CAST(DATE_FORMAT(DATA_TIME, 'mm') AS INT) AS MIN",
"UPDATE_TIME",
"SUBSTR(RESOURCE_URI, LENGTH(RESOURCE_URI), LENGTH(RESOURCE_URI)) as TYPE",
"RESOURCE_NAME",
"CASE SUBSTR(DEVICE_ID, 0, 1) WHEN 'v' THEN 'RTU' ELSE 'OTHER' END AS DEVICE_TYPE",
"REPLACE(SUBSTRING_INDEX(RESOURCE_URI, '/', 2),'/', '') AS TYPE_1",
"SUBSTRING_INDEX(RESOURCE_URI, '/', -1) AS TYPE_2"
).filter("DEVICE_TYPE = 'RTU'")
.filter("TYPE_1 = '100'")
.filter("TYPE_2 = '3'")
.filter("RESOURCE_VALUE > 0")
// make time value
val df_accumulated_power_withMin = df_accumulated_power.withColumn("TIME_MIN",
when(col("MIN") < 15, expr("CONCAT(TIME_HOUR, '00')"))
.when(col("MIN") < 30, expr("CONCAT(TIME_HOUR, '15')"))
.when(col("MIN") < 45, expr("CONCAT(TIME_HOUR, '30')"))
.otherwise(expr("CONCAT(TIME_HOUR, '45')"))
)
// make window
val windowSpec = Window.partitionBy("DEVICE_ID").orderBy(asc("TIME_MIN"))
val windos_df_statistics = df_accumulated_power_withMin
.withColumn("OLD_RESOURCE_VALUE", lag("RESOURCE_VALUE", 1, -1).over(windowSpec))
// calculate value with old_value in before data
val calculate_df_statistics = windos_df_statistics
.withColumn("GEN", when(col("OLD_RESOURCE_VALUE") < 0, -1).otherwise(col("RESOURCE_VALUE") - col("OLD_RESOURCE_VALUE")))
// .withColumn("GEN", expr("CASE WHEN OLD_RESOURCE_VALUE < 0 THEN -1 ELSE RESOURCE_VALUE - OLD_RESOURCE_VALUE END"))
.filter("GEN > 0")
println("make data with window")
// read RTU info (dongcode per device),
val df_rtu_info = spark.read.option("header", "true")
.option("io.compression.codecs", "nl.basjes.hadoop.io.compress.SplittableGzipCodec")
.schema(RTUFileSchema.schema_info).csv(rtufileName)
.filter("dongcode != '4113565500'").filter("flg = 1")
// join calculrated data with rtu_info to make dongcode per device..
val deviceStatisticsDF = calculate_df_statistics.alias("A").join(broadcast(df_rtu_info.alias("B")),
col("A.DEVICE_ID") === col("B.v_device_id")
).selectExpr("A.TIME_MIN", "A.TIME_HOUR", "A.TIME_DAY", "A.DEVICE_ID", "A.GEN",
"B.ADDRESS", "B.DONGCODE", "B.DONG", "B.CIDO", "B.SIGUGUN", "B.CUSTOMER_ID", "B.INSTALL_RTU_COUNT")
deviceStatisticsDF.cache()
// make avg value per dongcode and time, customer_id
val areaStatisticsForMinDF = deviceStatisticsDF.groupBy("DONGCODE", "TIME_MIN", "CUSTOMER_ID")
.agg(
expr("MAX(TIME_DAY) AS TIME_DAY"),
expr("MAX(GEN) AS MAX_GEN"),
expr("format_number(AVG(GEN), '#.##') AS GEN")
)
// make avg value of max value per dongcode and hour, customer_id
val areaStatisticsForHourDF = deviceStatisticsDF.groupBy("DONGCODE", "TIME_HOUR", "CUSTOMER_ID")
.agg(
expr("MAX(TIME_DAY) AS TIME_DAY"),
expr("MAX(GEN) AS MAX_GEN")
)
// make avg of max-gen
val maxValueDF = areaStatisticsForHourDF.groupBy("TIME_DAY", "DONGCODE", "CUSTOMER_ID").agg(
expr("format_number(AVG(MAX_GEN), '#.##') as AVG_MAX_GEN")
)
println("make maxValueDF")
// make final data
val statisticsDF = areaStatisticsForMinDF.alias("A").join(broadcast(maxValueDF.alias("B")),
col("A.TIME_DAY") === col("B.TIME_DAY") &&
col("A.DONGCODE") === col("B.DONGCODE") &&
col("A.CUSTOMER_ID") === col("B.CUSTOMER_ID")
).selectExpr("A.TIME_MIN AS TIME", "A.DONGCODE", "A.CUSTOMER_ID", "A.GEN", "B.AVG_MAX_GEN AS AVG_MAX_GEN")
println("join areaStatisticsDF and maxValueDF")
// println(statisticsDF.count())
// too slow...
// val coreCount = java.lang.Runtime.getRuntime.availableProcessors * (spark.sparkContext.statusTracker.getExecutorInfos.length -1)
// println("core : " + coreCount)
val mongoDBVal = properties.get("mongodb").get
if (mongoDBVal.toBoolean) {
// val countForMongoDB = df_for_mongo.count();
// println("Write to MongoDB : " + countForMongoDB)
val startTime = System.currentTimeMillis();
println("Write to MongoDB")
/**
statisticsDF partition size : WrappedArray(79639, 87787, 87960, 79210, 87728, 87836, 79264, 79139, 79195, 131722)
default-paritions : 10 : 225370 : 3.75min
repartition : 5 : 292513 : 4.8min
repartition : 10 : 240257 : 4 min
repartition : 20 : 215967 : 3.6min ( work 10 >> close >> work another 10)
* */
statisticsDF.repartition(10).write.format("mongo")
.option("uri", properties.get("mongodb.uri").get)
.option("user", properties.get("mongodb.id").get)
.option("password", properties.get("mongodb.pass").get)
.option("database", properties.get("mongodb.database").get)
.option("collection", properties.get("mongodb.collection").get)
.mode(properties.get("mongodb.mode").get)
.save()
val spendTime = System.currentTimeMillis() - startTime
println("spendTime : " + spendTime)
}
else {
println("No write to MongoDB")
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment