ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • SPARK ElasticSearch 연동
    프로그래밍/spark 2018. 1. 2. 18:27
    728x90


    SPARK ElasticSearch 연동

    Spark의 장점은 다른 Database와 연동이라고 볼수 있을 것이다.

    다른 DataBase의 데이터를 Spark로 가져와 빠르게 분석 할 수 있다.

    ElasticSearch과의 연동이 필요해서 관련된 글이다.

    zepplien notebook으로 연동한 것이다.

    ElasticSearch에서 데이터 가져오기

    //elastic search의 데이터를 가져와 DataFrame 형태로 만듬
    //elastic search의 array타입은 string으로 넘어오는 듯
    import org.apache.spark.sql.SparkSession
    import org.elasticsearch.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext    
    import org.apache.spark.SparkContext._
    import java.util._
    
    val spark = SparkSession.builder().getOrCreate()
    
    val conf = new SparkConf().setAppName("elastic-test")
    conf.set("spark.driver.allowMultipleContexts", "true")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes.discovery", "true")
    conf.set("es.nodes", "ip:port") // 각자 사용하는 ES 주소를 적는다.
    
    val elssc = new SparkContext(conf)  
    
    val RDD = elssc.esRDD("index/doc")
    
    val dff = RDD
    
    case class ADPOI(adpoi: String, admgu : String, kind_sum:String,sum_rsc:String, level_uv:String, first_pid:String, cneterx:String, centery:String,sum_uv:String,fpid_x:String, fpid_y:String, polylist:String, pidlist:Array[String])
    
    case class ADPOI(adpoi: String, pidlist:String)
    
    def printRow(adpid:String, gMap:scala.collection.Map[String, AnyRef]) : (String, String) = {
        // println(gMap)
        val pid_list =  gMap.get("pid_list")                                            //ADPOI - PID LIST
        val admgu = gMap.get("admgu").toArray                              //ADPOI - ADMCODE_GU
        val m_admgu = admgu(0).toString
        // val kind_sum = gMap.get("kind_sum").toArray           //ADPOI - INCLUDE KIND SUM
        // val m_kind_sum = kind_sum(0).toString
        // val sum_rsc =gMap.get("sum_rsc").toArray              //ADPOI - RSC
        // val m_sum_rsc = sum_rsc(0).toString
        // val level_uv = gMap.get("level_uv").toArray           //ADPOI - UV
        // val m_level_uv = level_uv(0).toString
        // val first_pid = gMap.get("first_pid").toArray         //ADPOI - 대표 POI
        // val m_first_pid = first_pid(0).toString
        // val center_x = gMap.get("center_pt_x").toArray        //ADPOI - ADPOI X
        // val m_center_x = center_x(0).toString
        // val center_y = gMap.get("center_pt_y").toArray        //ADPOI - ADPOI Y
        // val m_center_y = center_y(0).toString
        // val sum_uv = gMap.get("sum_uv").toArray                  //ADPOI - SUM UV
        // val m_sum_uv = sum_uv(0).toString
        // val fpid_dpx = gMap.get("fpid_dpx").toArray              //ADPOI - FIRST PID DPX
        // val m_fpid_dpx = fpid_dpx(0).toString
        // val fpid_dpy = gMap.get("fpid_dpy").toArray              //ADPOI - FIRST PID DPY
        // val m_fpid_dpy = fpid_dpy(0).toString
        // val polylist = gMap.get("polylist").toArray           //ADPOI - POLYGON LIST
        // val m_polylist = polylist(0).toString
        val pidarray = pid_list.toArray
        val m_Pid = pidarray(0).toString.replace("Buffer","").replace("(","").replace(")","").split(", ",-1)
        
        var result_list = ""
        for(mPID <- m_Pid) {
            result_list += mPID + "_"
        }
        
        result_list = result_list.substring(0, result_list.length-1)
        
        (adpid,result_list)
    }
    //collect()를 하지 않으면 seriallize 에러가 발생한다.
    val m_dff = dff.collect().map(s=>printRow(s._1, s._2)).map(s=>ADPOI(s._1,s._2))
    
    val adpoi_df = spark.createDataFrame(m_dff)

    최종적으로 데이터 프레임 형식으로 만들어 분석에 용이하게 했다.

    다음은 저장

    여러가지 방식이 인터넷상에 존재하는데 현재 버젼에 가장 호환이 잘되는 방식은 아래 방식인거 같다.

    dataset을 바로 저장하는 방식은 타입 문제 뿐만아니라 localhost로 접근하는데

    접근 설정을 제어하는 방법을 아직 찾지 못했다... 찾으면 편할텐데...

    곧 수정 될 수도 있다. 눈 깜짝 할 사이에...

    결국은 Dataset데이터를 RDD형태로 변경하는 방식이 필요하다.

    import org.apache.spark.sql.SparkSession
    import org.elasticsearch.spark.sql._
    import org.elasticsearch.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext    
    import org.apache.spark.SparkContext._
    import org.apache.spark.sql.SQLContext
    import org.elasticsearch.spark.rdd.EsSpark 
    
    val adidDS = spark.sql("select adid, ukey,last_dt from logtext.adid_match limit 10")
    val adidDF = adidDS.as[(String, String,String)].rdd
    
    case class ADID(adid:String, ukey:String, lastdt:String)
    
    // val rdd = sc.makeRDD(Seq(numbers,airports))
    val adidDF2 = adidDF.map(s=>ADID(s._1,s._2,s._3))
    
    EsSpark.saveToEs(adidDF2,"spark/docs", Map("es.nodes"->"ip:port"))


    728x90

    '프로그래밍 > spark' 카테고리의 다른 글

    Spark Serialized Task is Too Large Error  (0) 2018.01.02
    SPARK와 HBASE 연동  (3) 2018.01.02
Designed by Tistory.