Icednut's Note

스파크 강의 노트 Day 3

2017-05-29

Spark Intro

  • Zeppelin > Notebook > 입력 패널 첫번쨰 줄에 %를 입력 후 사용할 언어 입력 (%는 인터프리터 지정을 의미)
  • 실제 업무에서는 DataFrames를 많이씀. RDD를 할 때는 DataSets를 쓰는게 속도 향상을 볼 수 있음.
  • DataFrames를 쓰면 Python으로 포팅하기도 쉬움. (DataFrames 추천)

Spark DataFrames

  • RDD와 DataFrame의 차이점? Catelog optimizer의 유무 (DataFrames에만 있음. RDD는 디시리얼라이제이션 때문에 Heap 사용량이 많아 성능 저하.)
  • DataFrames는 테이블 형태로 데이터를 한정했기 때문에 SQL에 국한된 로직만 작성할 수 있는 단점이 있다. (DataFrames를 보완하기 위해 Datasets가 나왔음)
    • ex)
      1
      2
      dataFrames.filter($"weight" < 60) // $"weight"와 같이 컬럼을 지정하여 필터링 할 수 있다.
      // 컬럼($)으로 쓰면 해당 컬럼에 대한 오퍼레이션을 더 추가할 수 있다. (ex: cast)
  • DataFrames를 쓰면 많이 쓰는 메소드: show, printSchema
    • ex)
      1
      2
      3
      4
      val peopleRDD = spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
      val people = spark.read.json(peopleRDD)
      people.show()
    • ex)
      1
      2
      3
      4
      val wikiDF = spark.read.json("/sparklab/dataset/wikiticker-2015-09-12-sampled.json.gz")
      wikiDF.show()
      wikiDF.printSchema()
  • DataFrames에서 explain 메소드를 쓰면 RDBMS와 같이 쿼리 수행 계획을 볼 수 있다.
    • ex)
      1
      wikiDF.select($"page", $"added" + 1).explain(true)
  • Shuffle Read, Shuffle Write를 염두해두자. (엄청크면 뭔가 문제가 있다는 신호. 조인을 잘못걸경우 커짐. shuffle은 노드간에 데이터를 파티셔닝 및 섞는 것을 의미)
  • explain 볼 때 밑에서부터 위로 읽어나가자.
  • JSON을 RDD로 만든 뒤 DataFrame로 변환해서 출력해보면 알아서 컬럼 타입을 지정해준다.
    • ex)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      val cityRDD = spark.sparkContext.makeRDD(
      """{"cityName":"Seoul","countryName":"Republic of Korea","gdp":1321200}""" ::
      """{"cityName":"Tokyo","countryName":"Japan","gdp":4412600}""" ::
      """{"cityName":"Moscow","countryName":"Russia","gdp":113240}""" ::
      """{"cityName":"London","countryName":"United Kingdom","gdp":2760960}""" :: Nil)
      val cityDF = spark.read.json(cityRDD)
      cityDF.printSchema()
      cityDF.show
  • agg는 org.apache.spark.sql.functions를 참조
  • DataFrame는 압축된 데이터 바이너리인 텅스텐으로 메모리에 저장됨
  • z는 제플린 컨텍스트라고 해서 제플린 관련 설정을 바꿀 때 사용
    • ex)
      1
      2
      val shufflePartitions = z.input("spark.sql.shuffle.partition", "200")
      .asInstanceOf[String].toInt
  • 일반적인 경우는 파티션의 크기를 10MB로 잡자.
    • CPU가 많이 먹으면 파티션의 크기를 조정해야됨
  • parquet로 쓰면 컬럼 기반으로 데이터 구조가 잡혀있기 때문에 spark에서 읽을 때 오버헤드가 줄어든다.
    • parquet를 쓰면 SQL From 절에 바로 써서 조회할 수 있다.
    • parquet를 쓰면 컬럼 기반으로 조회(column pruning)를 해서 분석 프로세스를 더 빨리 진행할 수 있다. (FileScan step에서 이미 필터링을 진행)
    • ex)
      1
      val sqlDF = spark.sql("SELECT countryName, cityName, sum(gdp) FROM parquet.`/sparklab/tmp-output/wiki_gdp` WHERE gdp > 1321200 GROUP BY countryName, cityName")

UDF (User Defined Function)

  • 사용자가 만든 함수. 조인을 할 때 udf를 잘못 쓰면 옵티마이저가 안먹힐 수도 있다.
    • ex)
      1
      2
      3
      4
      5
      6
      import org.apache.spark.sql.functions._
      val coder: (Int => String) = (added: Int) => if (added > 10) "frequent" else "rare"
      val sqlfunc = udf(coder)
      wikiDF.withColumn("Frequency", sqlfunc(col("added"))).select($"page", $"added", $"frequency").show()
  • SQL로 만든것을 Spark DataFrame 메소드로 바꿔보기 연습
    • ex)
      1
      2
      3
      4
      5
      6
      7
      8
      wikiDF
      .select($"countryName", $"cityName", $"comment", $"added")
      .groupBy($"countryName", $"cityName")
      .agg(avg($"added").alias("avg_added"), max($"added").alias("max_added"))
      .orderBy($"avg_added".desc)
      .limit(10)
      .show
      // select 컬럼에 내가 보고자 하는 컬럼을 추가해도 groupBy에 지정 안한 컬럼은 결과로 나오지 않음

Join

Pivoting

  • 데이터를 Row로 쫙 늘리는 것?

Wikipedia 연습

1
2
3
4
5
6
pageCountsDF.withColumn("masked_project", regexp_replace(regexp_replace(regexp_replace($"project", "[0-9]+", "9"), "[A-Z]+", "A"), "[a-z]+", "a"))
.filter($"masked_project" === "a-a" or $"masked_project" === "a-a-a" or $"masked_project" === "a-a-a.a") // "masked_project = 'a-a' or masked_project = 'a-a-a'" 이렇게도 쓸 수 있음
.groupBy($"masked_project")
.count
.orderBy($"count".desc)
.show(100)

Wikipedia 실습

  • spark.catalog.listTables().show(false)
    • 메소드의 파라미터를 false로 주면 안짤리고 다 나옴

SKT에서 Spark 활용 사례

  • Flume, Kafka로 데이터를 메세지큐에 담음 -> Batch 처리 or Streaming 처리에 Spark 사용 > 저장소에 저장 (Cache, HDFS)
  • 카프카는 최소 3대 이상으로 구축되어야 함.
  • 카프카도 토픽에 데이터가 너무 많이 들어오면 repartition을 진행해야 한다.
  • Streaming에서는 Transaction 처리가 힘듬.
  • Data를 Write하는 것도 고려해야됨. > 대량으로 write하기 위해서는 MQ를 중간에 사용하여 저장 처리 > 그러나 중복이 발생할 수 있다?
  • Catalog API가 뭘까?
  • DataSet의 장점: Strong Type (Untyped API, Type API 모두 제공함)
  • Shared Variable 사용 방법: Accumulator, Broadcast Variables
  • Whole Stage CodeGen (실행 시점에 필요한 데이터만 필터링해서 데이터를 올림, Tungsten vetorization)

Structured Streaming

  • Streaming 처리를 할 때 unbounded table 형태로 데이터 전체를 사용할 수 있음 (structured stream outputMode: Append, Complete, Update 때문)
  • 10분 단위로 트리거링해서 Result Table 생성할 수 있음
  • Structured Stream은 아직 알파버전
Tags: Spark