Commit 2e92426f authored by 박준형's avatar 박준형

Upload New File

parent 94698dff
package rtu.process
import address.AddressSearch
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SparkSession}
import org.apache.spark.sql.functions.{asc, broadcast, col, concat_ws, desc, expr, lag, udf, when}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, TimestampType}
import rtu.StatisticsProcess
import rtu.schema.RTUFileSchema
import scala.collection.{Iterator, mutable}
class StatisticsAreaDeviceGen extends StatisticsProcess {
override var name: String = _
override var isLocal : Boolean = _
override var spark: SparkSession = _
override var properties: mutable.Map[String, String] = _
override def process(date: String): Unit = {
/**
*
* 1. 피크 발전 시간(허용 용량의 N%이상 발전되는 경우)
a. 계절/지역에 따른 일/월 평균 발전량 자료 제공
b. 계절/지역/설비용량에 따라 허용 용량 70%~90% 사이로 제공
2. 평균 피크 발전 용량/시간
* time(15min intervals >> 00, 15, 30, 45), dongcode, gen(avg for 15min), avg(max(gen) for hoyr) for day
*
* use data : RTU history data file
* RTU Info data file
* */
val propSparkSqlshufflePartitions = properties.get("spark.sql.shuffle.partitions")
val propPrintlnPartitionSize = properties.get("print.partition.size")
if (!propSparkSqlshufflePartitions.isEmpty) {
val sparkSqlshufflePartitions: Int = propSparkSqlshufflePartitions.get.toInt
println("Set spark.sql.shuffle.partitions : " + sparkSqlshufflePartitions)
spark.conf.set("spark.sql.shuffle.partitions", sparkSqlshufflePartitions)
}
var isPrintPartitionSize: Boolean = false
if (!propPrintlnPartitionSize.isEmpty) {
isPrintPartitionSize = propPrintlnPartitionSize.get.equals("true")
}
val dataArray = date.split("-")
val yearVal = dataArray.apply(0)
val monthVal = dataArray.apply(1)
val fileName = properties.get("file.path").get + "/" + yearVal + "/" + monthVal + "/" + date + ".csv.gz"
val rtufileName = properties.get("file.path").get + "/" + properties.get("info.file").get
println("fileName : " + fileName + ", rtufileName : " + rtufileName)
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * (1024 * 256))
val df = spark.read.option("header", "true")
.option("io.compression.codecs", "nl.basjes.hadoop.io.compress.SplittableGzipCodec")
.schema(RTUFileSchema.schema_data).csv(fileName)
println("read csv")
/**
+-----------+------------------------------+------------+-------------------+--------------+----------+----------+-------------------+-------------------+-------------------+
|ID |DEVICE_ID |RESOURCE_URI|RESOURCE_NAME |RESOURCE_VALUE|IS_CHANGED|EVENT_TYPE|CREATE_TIME |UPDATE_TIME |DATA_TIME |
+-----------+------------------------------+------------+-------------------+--------------+----------+----------+-------------------+-------------------+-------------------+
|37606397725|SICOMS-SCMQW1_H20-3C71BF38129D|/100/0/0 |총누적 (사용)전력량|714923 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397726|SICOMS-SCMQW1_H20-3C71BF381580|/100/0/0 |총누적 (사용)전력량|5973612 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397727|SICOMS-SCMQW1_H20-A4CF12F5666C|/100/0/0 |총누적 (사용)전력량|7577493 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397737|SICOMS-SCMQW1-3C71BF3814F6 |/100/0/0 |총누적 (사용)전력량|1728773 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397738|SICOMS-SCMQW1-3C71BF3819FB |/100/0/0 |총누적 (사용)전력량|914628 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397739|SICOMS-SCMQW1_H20-A4CF12F56526|/100/0/0 |총누적 (사용)전력량|8159684 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397740|SICOMS-SCMQW1_H20-A4CF12F55E54|/100/0/0 |총누적 (사용)전력량|3198706 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397753|SICOMS-SCMQW1_H20-A4CF12F55AF6|/100/0/0 |총누적 (사용)전력량|8207157 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397765|SICOMS-SCMQW1-3C71BF38186E |/100/0/0 |총누적 (사용)전력량|56876 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
|37606397766|SICOMS-SCMQW1_H20-A4CF12F56C12|/100/0/0 |총누적 (사용)전력량|5131775 |0 |1 |2022-05-30 00:00:00|2022-05-30 00:00:00|2022-05-30 00:00:00|
+-----------+------------------------------+------------+-------------------+--------------+----------+----------+-------------------+-------------------+-------------------+
* */
val df_accumulated_power = df.selectExpr("DEVICE_ID",
"RESOURCE_URI",
"RESOURCE_VALUE",
"IS_CHANGED",
"EVENT_TYPE",
"DATA_TIME",
"DATE_FORMAT(DATA_TIME, 'yyyyMMdd') AS TIME_DAY",
"DATE_FORMAT(DATA_TIME, 'yyyyMMddHH') AS TIME_HOUR",
"CAST(DATE_FORMAT(DATA_TIME, 'mm') AS INT) AS MIN",
"UPDATE_TIME",
"SUBSTR(RESOURCE_URI, LENGTH(RESOURCE_URI), LENGTH(RESOURCE_URI)) as TYPE",
"RESOURCE_NAME",
"CASE SUBSTR(DEVICE_ID, 0, 1) WHEN 'v' THEN 'RTU' ELSE 'OTHER' END AS DEVICE_TYPE",
"REPLACE(SUBSTRING_INDEX(RESOURCE_URI, '/', 2),'/', '') AS TYPE_1",
"SUBSTRING_INDEX(RESOURCE_URI, '/', -1) AS TYPE_2"
).filter("DEVICE_TYPE = 'RTU'")
.filter("TYPE_1 = '100'")
.filter("TYPE_2 = '3'")
.filter("RESOURCE_VALUE > 0")
/**
누적 발전량과 RTU 기기만 필터링, 하단에서 사용하기 위해 날짜값과 분값을 분리
분값은 int로 치환
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+
|DEVICE_ID |RESOURCE_URI|RESOURCE_VALUE|IS_CHANGED|EVENT_TYPE|DATA_TIME |TIME_DAY|TIME_HOUR |MIN|UPDATE_TIME |TYPE|RESOURCE_NAME |DEVICE_TYPE|TYPE_1|TYPE_2|
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |7 |1 |1 |2022-05-30 00:09:00|20220530|2022053000|9 |2022-05-30 00:09:12|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |21 |1 |1 |2022-05-30 00:26:00|20220530|2022053000|26 |2022-05-30 00:26:01|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |37 |1 |1 |2022-05-30 00:43:00|20220530|2022053000|43 |2022-05-30 00:43:16|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |51 |1 |1 |2022-05-30 00:59:00|20220530|2022053000|59 |2022-05-30 00:59:28|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |67 |1 |1 |2022-05-30 01:16:00|20220530|2022053001|16 |2022-05-30 01:16:33|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |79 |1 |1 |2022-05-30 01:32:00|20220530|2022053001|32 |2022-05-30 01:32:32|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |93 |1 |1 |2022-05-30 01:48:00|20220530|2022053001|48 |2022-05-30 01:48:47|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |108 |1 |1 |2022-05-30 02:06:00|20220530|2022053002|6 |2022-05-30 02:05:59|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |123 |1 |1 |2022-05-30 02:23:00|20220530|2022053002|23 |2022-05-30 02:23:09|3 |당일누적 발전시간|RTU |100 |3 |
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |138 |1 |1 |2022-05-30 02:40:00|20220530|2022053002|40 |2022-05-30 02:40:21|3 |당일누적 발전시간|RTU |100 |3 |
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+
* */
// make time value
val df_accumulated_power_withMin = df_accumulated_power.withColumn("TIME_MIN",
when(col("MIN") < 15, expr("CONCAT(TIME_HOUR, '00')"))
.when(col("MIN") < 30, expr("CONCAT(TIME_HOUR, '15')"))
.when(col("MIN") < 45, expr("CONCAT(TIME_HOUR, '30')"))
.otherwise(expr("CONCAT(TIME_HOUR, '45')"))
)
/**
시간별 정리
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+
|DEVICE_ID |RESOURCE_URI|RESOURCE_VALUE|IS_CHANGED|EVENT_TYPE|DATA_TIME |TIME_DAY|TIME_HOUR |MIN|UPDATE_TIME |TYPE|RESOURCE_NAME |DEVICE_TYPE|TYPE_1|TYPE_2|TIME_MIN |
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |7 |1 |1 |2022-05-30 00:09:00|20220530|2022053000|9 |2022-05-30 00:09:12|3 |당일누적 발전시간|RTU |100 |3 |202205300000|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |21 |1 |1 |2022-05-30 00:26:00|20220530|2022053000|26 |2022-05-30 00:26:01|3 |당일누적 발전시간|RTU |100 |3 |202205300015|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |37 |1 |1 |2022-05-30 00:43:00|20220530|2022053000|43 |2022-05-30 00:43:16|3 |당일누적 발전시간|RTU |100 |3 |202205300030|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |51 |1 |1 |2022-05-30 00:59:00|20220530|2022053000|59 |2022-05-30 00:59:28|3 |당일누적 발전시간|RTU |100 |3 |202205300045|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |67 |1 |1 |2022-05-30 01:16:00|20220530|2022053001|16 |2022-05-30 01:16:33|3 |당일누적 발전시간|RTU |100 |3 |202205300115|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |79 |1 |1 |2022-05-30 01:32:00|20220530|2022053001|32 |2022-05-30 01:32:32|3 |당일누적 발전시간|RTU |100 |3 |202205300130|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |93 |1 |1 |2022-05-30 01:48:00|20220530|2022053001|48 |2022-05-30 01:48:47|3 |당일누적 발전시간|RTU |100 |3 |202205300145|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |108 |1 |1 |2022-05-30 02:06:00|20220530|2022053002|6 |2022-05-30 02:05:59|3 |당일누적 발전시간|RTU |100 |3 |202205300200|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |123 |1 |1 |2022-05-30 02:23:00|20220530|2022053002|23 |2022-05-30 02:23:09|3 |당일누적 발전시간|RTU |100 |3 |202205300215|
|v-836afa5cd5b92e902e154fb224a34573|/100/3/3 |138 |1 |1 |2022-05-30 02:40:00|20220530|2022053002|40 |2022-05-30 02:40:21|3 |당일누적 발전시간|RTU |100 |3 |202205300230|
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+
* */
// make window
val windowSpec = Window.partitionBy("DEVICE_ID").orderBy(asc("TIME_MIN"))
val windos_df_statistics = df_accumulated_power_withMin
.withColumn("OLD_RESOURCE_VALUE",
lag("RESOURCE_VALUE", 1, -1).over(windowSpec))
/**
window를 통한 바로 전 시간대의 데이터 추출
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+------------------+
|DEVICE_ID |RESOURCE_URI|RESOURCE_VALUE|IS_CHANGED|EVENT_TYPE|DATA_TIME |TIME_DAY|TIME_HOUR |MIN|UPDATE_TIME |TYPE|RESOURCE_NAME |DEVICE_TYPE|TYPE_1|TYPE_2|TIME_MIN |OLD_RESOURCE_VALUE|
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+------------------+
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |2 |1 |1 |2022-05-30 05:33:00|20220530|2022053005|33 |2022-05-30 05:33:29|3 |당일누적 발전시간|RTU |100 |3 |202205300530|-1 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |14 |1 |1 |2022-05-30 05:48:00|20220530|2022053005|48 |2022-05-30 05:48:40|3 |당일누적 발전시간|RTU |100 |3 |202205300545|2 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |29 |1 |1 |2022-05-30 06:03:00|20220530|2022053006|3 |2022-05-30 06:03:48|3 |당일누적 발전시간|RTU |100 |3 |202205300600|14 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |45 |1 |1 |2022-05-30 06:18:00|20220530|2022053006|18 |2022-05-30 06:18:59|3 |당일누적 발전시간|RTU |100 |3 |202205300615|29 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |60 |1 |1 |2022-05-30 06:34:00|20220530|2022053006|34 |2022-05-30 06:34:13|3 |당일누적 발전시간|RTU |100 |3 |202205300630|45 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |75 |1 |1 |2022-05-30 06:49:00|20220530|2022053006|49 |2022-05-30 06:49:25|3 |당일누적 발전시간|RTU |100 |3 |202205300645|60 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |90 |1 |1 |2022-05-30 07:04:00|20220530|2022053007|4 |2022-05-30 07:04:31|3 |당일누적 발전시간|RTU |100 |3 |202205300700|75 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |105 |1 |1 |2022-05-30 07:19:00|20220530|2022053007|19 |2022-05-30 07:19:33|3 |당일누적 발전시간|RTU |100 |3 |202205300715|90 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |120 |1 |1 |2022-05-30 07:34:00|20220530|2022053007|34 |2022-05-30 07:34:49|3 |당일누적 발전시간|RTU |100 |3 |202205300730|105 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |136 |1 |1 |2022-05-30 07:49:00|20220530|2022053007|49 |2022-05-30 07:49:57|3 |당일누적 발전시간|RTU |100 |3 |202205300745|120 |
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+------------------+
* */
// calculate value with old_value in before data
val calculate_df_statistics = windos_df_statistics
.withColumn("GEN", when(col("OLD_RESOURCE_VALUE") < 0, -1).otherwise(col("RESOURCE_VALUE") - col("OLD_RESOURCE_VALUE")))
// .withColumn("GEN", expr("CASE WHEN OLD_RESOURCE_VALUE < 0 THEN -1 ELSE RESOURCE_VALUE - OLD_RESOURCE_VALUE END"))
.filter("GEN > 0")
println("make data with window")
/**
현시간과 전 시간의 누적 사용량을 통한 사용량 계산
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+------------------+---+
|DEVICE_ID |RESOURCE_URI|RESOURCE_VALUE|IS_CHANGED|EVENT_TYPE|DATA_TIME |TIME_DAY|TIME_HOUR |MIN|UPDATE_TIME |TYPE|RESOURCE_NAME |DEVICE_TYPE|TYPE_1|TYPE_2|TIME_MIN |OLD_RESOURCE_VALUE|GEN|
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+------------------+---+
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |14 |1 |1 |2022-05-30 05:48:00|20220530|2022053005|48 |2022-05-30 05:48:40|3 |당일누적 발전시간|RTU |100 |3 |202205300545|2 |12 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |29 |1 |1 |2022-05-30 06:03:00|20220530|2022053006|3 |2022-05-30 06:03:48|3 |당일누적 발전시간|RTU |100 |3 |202205300600|14 |15 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |45 |1 |1 |2022-05-30 06:18:00|20220530|2022053006|18 |2022-05-30 06:18:59|3 |당일누적 발전시간|RTU |100 |3 |202205300615|29 |16 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |60 |1 |1 |2022-05-30 06:34:00|20220530|2022053006|34 |2022-05-30 06:34:13|3 |당일누적 발전시간|RTU |100 |3 |202205300630|45 |15 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |75 |1 |1 |2022-05-30 06:49:00|20220530|2022053006|49 |2022-05-30 06:49:25|3 |당일누적 발전시간|RTU |100 |3 |202205300645|60 |15 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |90 |1 |1 |2022-05-30 07:04:00|20220530|2022053007|4 |2022-05-30 07:04:31|3 |당일누적 발전시간|RTU |100 |3 |202205300700|75 |15 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |105 |1 |1 |2022-05-30 07:19:00|20220530|2022053007|19 |2022-05-30 07:19:33|3 |당일누적 발전시간|RTU |100 |3 |202205300715|90 |15 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |120 |1 |1 |2022-05-30 07:34:00|20220530|2022053007|34 |2022-05-30 07:34:49|3 |당일누적 발전시간|RTU |100 |3 |202205300730|105 |15 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |136 |1 |1 |2022-05-30 07:49:00|20220530|2022053007|49 |2022-05-30 07:49:57|3 |당일누적 발전시간|RTU |100 |3 |202205300745|120 |16 |
|v-003ee658badd816dca4eea634e6ab8f7|/100/1/3 |151 |1 |1 |2022-05-30 08:05:00|20220530|2022053008|5 |2022-05-30 08:05:13|3 |당일누적 발전시간|RTU |100 |3 |202205300800|136 |15 |
+----------------------------------+------------+--------------+----------+----------+-------------------+--------+----------+---+-------------------+----+-----------------+-----------+------+------+------------+------------------+---+
* */
// read RTU info (dongcode per device)
val df_rtu_info = spark.read.option("header", "true")
.option("io.compression.codecs", "nl.basjes.hadoop.io.compress.SplittableGzipCodec")
.schema(RTUFileSchema.schema_info).csv(rtufileName)
.filter("dongcode != '4113565500'").filter("flg = 1")
/**
주소 정보가 헤리트 주소 (가계통)이거나 가계통 (flg=-1)인 경우 제외
+--------------------+-----------+-------------------------------------+------+--------------------------+-------------------------+--------------+---------------------+-----------+--------------+----+----------------------------------+----------------------------------+----------------------------------------+----------+-------+----------+----------+---+-----------+-----------------+
|company |name |location |energy|panel_model |panel_manufacturer |inverter_model|inverter_manufacturer|panel_count|inverter_count|gen |device_id |v_device_id |address |cido |sigugun|dongcode |dong |flg|customer_id|install_rtu_count|
+--------------------+-----------+-------------------------------------+------+--------------------------+-------------------------+--------------+---------------------+-----------+--------------+----+----------------------------------+----------------------------------+----------------------------------------+----------+-------+----------+----------+---+-----------+-----------------+
|건물지원 |케이티무역A|경충대로 187-26 |태양광|Q.PEAK DUO XL-G9.3 KC1 460|한화솔루션 ㈜ |DSP-3315K-J |(주)다쓰테크 |21 |1 |9.66|JHENERGY-HC1000NW4-359369080245240|v-511f3647bb332cc6e97e68d927f46081|경기도 광주시 곤지암읍 경충대로 187-26 |경기도 |광주시 |4161025900|곤지암읍 |1 |48 |1 |
|건물지원 |케이티무역B|경충대로 187-26 |태양광|Q.PEAK DUO XL-G9.3 KC1 460|한화솔루션 ㈜ |DSP-3315K-J |(주)다쓰테크 |21 |1 |9.66|JHENERGY-HC1000NW4-359369080310549|v-9b8b76f4643d6afda9be23d061b66e2d|경기도 광주시 곤지암읍 경충대로 187-26 |경기도 |광주시 |4161025900|곤지암읍 |1 |49 |1 |
|(주)제이에이치에너지|박상규 |인천시 남동구 논현동 592-33 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000NW4-359369080298124|v-e2678a75084d018d1bd6cd59153d4202|인천광역시 남동구 논고개로176번길 41 |인천광역시|남동구 |2820069000|논현1동 |1 |51 |1 |
|(주)제이에이치에너지|김은태 |인천시 남동구 논현동 607-2 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000NW4-359369080284967|v-a2cc2af013dca55e2c2d1623927be074|인천광역시 남동구 남동대로 419 |인천광역시|남동구 |2820066000|남촌도림동|1 |50 |1 |
|(주)제이에이치에너지|김은경 |인천시 남동구 논현동 760-5 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000N4-359369080999374 |v-76dc8beba9b59c7fcf41be1ae5b989c6|인천광역시 남동구 에코중앙로139번길 9-17|인천광역시|남동구 |2820071000|논현고잔동|1 |52 |1 |
|(주)제이에이치에너지|김규주 |인천시 남동구 논현동 583-4 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000N4-359369081058055 |v-8cba27f65dca8a59b775e059d0c3ebfe|인천광역시 남동구 은봉로225번길 13-9 |인천광역시|남동구 |2820070000|논현2동 |1 |53 |1 |
|(주)제이에이치에너지|김희주 |인천시 남동구 에코중앙로 139번길 9-23|태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000NW4-359369080299171|v-bce98398c16529c4a56a50895f5a3128|인천광역시 남동구 에코중앙로139번길 9-23|인천광역시|남동구 |2820071000|논현고잔동|1 |54 |1 |
|(주)제이에이치에너지|박천도 |인천시 남동구 구월동 1100-5 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000NW4-359369080273457|v-e3cf4472b7690d50dbf64389446f2d4f|인천광역시 남동구 문화서로89번길 32 |인천광역시|남동구 |2820052100|구월3동 |1 |57 |1 |
|(주)제이에이치에너지|최정용 |인천시 남동구 논현동 595-1 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000NW4-359369080310630|v-0b5cb587637f4e2dbf26c8a4a0b2326f|인천광역시 남동구 은봉로351번길 3 |인천광역시|남동구 |2820069000|논현1동 |1 |55 |1 |
|(주)제이에이치에너지|이송묵 |인천시 남동구 논현동 771-6 |태양광|HiS-S300RG-FEHA |현대중공업 그린에너지(주)|ESP3K5-KRB |동양이엔피(주) |8 |1 |2.4 |JHENERGY-HC1000NW4-359369080281260|v-ee1de3ac013af198ac5dbf852039ab66|인천광역시 남동구 평온로 11 |인천광역시|남동구 |2820055000|간석3동 |1 |58 |1 |
+--------------------+-----------+-------------------------------------+------+--------------------------+-------------------------+--------------+---------------------+-----------+--------------+----+----------------------------------+----------------------------------+----------------------------------------+----------+-------+----------+----------+---+-----------+-----------------+
* */
// join calculrated data with rtu_info to make dongcode per device..
val deviceStatisticsDF = calculate_df_statistics.alias("A").join(broadcast(df_rtu_info.alias("B")),
col("A.DEVICE_ID") === col("B.v_device_id")
).selectExpr("A.TIME_MIN", "A.TIME_HOUR", "A.TIME_DAY", "A.DEVICE_ID", "A.GEN",
"B.ADDRESS", "B.DONGCODE", "B.DONG", "B.CIDO", "B.SIGUGUN")
/**
디바이스 별 동코드 join
+------------+----------+--------+----------------------------------+---+----------------------------------+----------+------+--------+-------+
|TIME_MIN |TIME_HOUR |TIME_DAY|DEVICE_ID |GEN|ADDRESS |DONGCODE |DONG |CIDO |SIGUGUN|
+------------+----------+--------+----------------------------------+---+----------------------------------+----------+------+--------+-------+
|202205300545|2022053005|20220530|v-003ee658badd816dca4eea634e6ab8f7|12 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300600|2022053006|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300615|2022053006|20220530|v-003ee658badd816dca4eea634e6ab8f7|16 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300630|2022053006|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300645|2022053006|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300700|2022053007|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300715|2022053007|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300730|2022053007|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300745|2022053007|20220530|v-003ee658badd816dca4eea634e6ab8f7|16 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
|202205300800|2022053008|20220530|v-003ee658badd816dca4eea634e6ab8f7|15 |경상북도 영덕군 영해면 원당길 51-3|4777036000|영해면|경상북도|영덕군 |
+------------+----------+--------+----------------------------------+---+----------------------------------+----------+------+--------+-------+
* */
/**
* 하단에서 동코드와 시간(min) 별 최대값, 평균
* 동코드와 시간(hour)별 최대값
* 을 구하기 때문에 cache...
*
* + df - df --+
* df - df - df -| | - df
* + df ------+
* */
println("deviceStatisticsDF cache..")
deviceStatisticsDF.cache()
// make avg value per dongcode and time
val areaStatisticsForMinDF = deviceStatisticsDF.groupBy("DONGCODE", "TIME_MIN")
.agg(
expr("MAX(TIME_DAY) AS TIME_DAY"),
expr("MAX(GEN) AS MAX_GEN"),
expr("format_number(AVG(GEN), '#.##') AS GEN")
)
println("make min data")
/**
동코드,시간(min)별 max, avg, 시간(hour)
+----------+------------+--------+-------+---+
|DONGCODE |TIME_MIN |TIME_DAY|MAX_GEN|GEN|
+----------+------------+--------+-------+---+
|1111061500|202205300815|20220530|17 |17 |
|1111061500|202205301315|20220530|15 |15 |
|1111061500|202205301445|20220530|17 |17 |
|1111061500|202205301745|20220530|16 |16 |
|1111061500|202205301800|20220530|16 |16 |
|1114057000|202205301030|20220530|15 |15 |
|1114057000|202205301230|20220530|17 |17 |
|1114057000|202205301445|20220530|18 |18 |
|1114058000|202205300830|20220530|17 |17 |
|1114058000|202205301200|20220530|16 |16 |
+----------+------------+--------+-------+---+
* */
// make avg value of max value per dongcode and hour
val areaStatisticsForHourDF = deviceStatisticsDF.groupBy("DONGCODE", "TIME_HOUR", "DEVICE_ID")
.agg(
expr("MAX(TIME_DAY) AS TIME_DAY"),
expr("MAX(GEN) AS MAX_GEN")
)
println("make hour data")
/**
동코드, 디바이스별 시간당 최대값
+----------+----------+----------------------------------+--------+-------+
|DONGCODE |TIME_HOUR |DEVICE_ID |TIME_DAY|MAX_GEN|
+----------+----------+----------------------------------+--------+-------+
|1154568000|2022053006|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|16 |
|1154568000|2022053007|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|17 |
|1154568000|2022053008|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|17 |
|1154568000|2022053009|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|16 |
|1154568000|2022053010|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|17 |
|1154568000|2022053011|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|17 |
|1154568000|2022053012|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|16 |
|1154568000|2022053013|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|17 |
|1154568000|2022053014|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|17 |
|1154568000|2022053015|v-3d594fc0399ea87ce09310b1f5a7cbdb|20220530|16 |
+----------+----------+----------------------------------+--------+-------+
* */
// make avg of max-gen
val maxValueDF = areaStatisticsForHourDF.groupBy("TIME_DAY", "DONGCODE").agg(
expr("format_number(AVG(MAX_GEN), '#.##') as AVG_MAX_GEN")
)
/**
동코드별 시간당 최대 생산량의 일 평균
+--------+----------+-----------+
|TIME_DAY|DONGCODE |AVG_MAX_GEN|
+--------+----------+-----------+
|20220530|2914082400|16.26 |
|20220530|2914078000|18.37 |
|20220530|4825063000|17.7 |
|20220530|4615059000|17.71 |
|20220530|2914075100|16.9 |
|20220530|3120057000|20.93 |
|20220530|4888025000|17.5 |
|20220530|4183034000|31.66 |
|20220530|4888039000|16.84 |
|20220530|2915570000|17.96 |
+--------+----------+-----------+
* */
println("make maxValueDF")
// make final data
val statisticsDF = areaStatisticsForMinDF.alias("A").join(broadcast(maxValueDF.alias("B")),
col("A.TIME_DAY") === col("B.TIME_DAY") &&
col("A.DONGCODE") === col("B.DONGCODE")
).selectExpr("A.TIME_MIN AS TIME", "A.DONGCODE", "A.GEN", "B.AVG_MAX_GEN AS AVG_MAX_GEN")
/**
동코드,시간(min)별 max, avg, 시간(hour)와 동코드별 시간당 최대 생산량의 일 평균 join
시간(min), 동코드 별 평균 생산량과 시간당 최대생산량의 일평균
+------------+----------+---+-----------+
|TIME |DONGCODE |GEN|AVG_MAX_GEN|
+------------+----------+---+-----------+
|202205300815|1111061500|17 |16.38 |
|202205301315|1111061500|15 |16.38 |
|202205301445|1111061500|17 |16.38 |
|202205301745|1111061500|16 |16.38 |
|202205301800|1111061500|16 |16.38 |
|202205301030|1114057000|15 |16.25 |
|202205301230|1114057000|17 |16.25 |
|202205301445|1114057000|18 |16.25 |
|202205300830|1114058000|17 |17.33 |
|202205301200|1114058000|16 |17.33 |
+------------+----------+---+-----------+
* */
statisticsDF.show(10, false )
println("join areaStatisticsDF and maxValueDF")
// println(statisticsDF.count())
// too slow...
// val coreCount = java.lang.Runtime.getRuntime.availableProcessors * (spark.sparkContext.statusTracker.getExecutorInfos.length -1)
// println("core : " + coreCount)
val mongoDBVal = properties.get("mongodb").get
if (mongoDBVal.toBoolean) {
// val countForMongoDB = df_for_mongo.count();
// println("Write to MongoDB : " + countForMongoDB)
/**
data count : 963,465 for device
data count : 263,920 for area..
* */
println("Write to MongoDB")
statisticsDF.write.format("mongo")
.option("uri", properties.get("mongodb.uri").get)
.option("user", properties.get("mongodb.id").get)
.option("password", properties.get("mongodb.pass").get)
.option("database", properties.get("mongodb.database").get)
.option("collection", properties.get("mongodb.collection").get)
.mode(properties.get("mongodb.mode").get)
.save()
}
else {
println("No write to MongoDB")
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment