Spark SQL을 이용하여 avro 파일과 parquet 파일 다루기

준비

Scala & Spark SQL에서 avro, parquet 파일을 읽고 쓰는 것은 어떻게 하는지 그리고 간단한 예제를 통해 실습한 내용을 정리한다. (avro와 parquet에 대한 설명은 여기서는 생략)
먼저 아래 내용들을 통해 실습 환경을 셋팅하자.

  1. CDH 설치

    • Google에서 Cloudera CDH 검색
    • CDH > quickstart Virtual Box 버전을 다운로드
    • 압축 해제 후 Virtual Box에서 실행
  2. sqoop을 통한 avro 파일로 hdfs에 import 작업 진행

    • hdfs의 적당한 곳에 디렉토리를 생성한다.

      1
      $ hdfs dfs -mkdir /user/cloudera/test_avro_warehouse
    • sqoop을 통해 MySQL 데이터를 HDFS로 import 한다. (파일 포맷은 avro. 압축 형태는 snappy)

      1
      2
      3
      4
      5
      6
      7
      8
      $ sqoop import-all-tables \
      -m 1 \
      --connect jdbc:mysql://quickstart:3306/retail_db \
      --username=retail_dba \
      --password=cloudera \
      --as-avrodatafile \
      --compression-codec=snappy \
      --warehouse-dir=/user/cloudera/test_avro_warehouse
  3. 이번에는 parquet 파일로 hdfs에 import 진행

    • sqoop을 통해 MySQL 데이터를 HDFS로 import 한다. (파일 포맷은 parquet. 압축 형태는 snappy)
      1
      2
      3
      4
      5
      6
      7
      8
      $ sqoop import-all-tables \
      -m 1 \
      --connect jdbc:mysql://quickstart:3306/retail_db \
      --username=retail_dba \
      --password=cloudera \
      --as-parquetfile \
      --compression-codec=snappy \
      --warehouse-dir=/user/cloudera/test_avro_warehouse
  4. spark-shell 실행

    1
    $ spark-shell

Avro

Spark에서 avro 파일을 읽고 쓰려면 avro 관련 라이브러리를 import 해야 된다.

1
import com.databricks.spark.avro._

그 다음 sqlContext.read.avro(“…”) 혹은 sqlContext().read.format(“com.databricsk.spark.avro”).load(“…”)을 통해 파일을 읽는다.

1
2
import com.databricks.spark.avro._
val df = sqlContext.read.avro("input dir")

1
2
import com.databricks.spark.avro._
val df = sqlContext.read.format("com.databricks.spark.avro").load("input dir")
연습1. hdfs://quickstart/user/cloudera/test_avro_warehouse/orders에서 order_status가 COMPLETE 면서 customer_id 별로 주문을 몇건씩 했는지 살펴본 뒤 결과는 parquet로 저장 해보자. (parquet 압축은 gzip)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.databricks.spark.avro._
import org.apache.spark.sql._

val ordersDf = sqlContext.read.avro("/user/cloudera/test_avro_warehouse/orders")
val countRdd = ordersDf.filter("order_status = 'COMPLETE'").map(row => (row.getAs[Integer]("order_customer_id"), 1)).reduceByKey(_ + _).map(pair => Row(pair._1, pair._2))

sqlContext.setConf("spark.sql.parquet.compression.codec", "gzip")

val schema = StructType(
StructField("customer_id", IntegerType, false) ::
StructField("count", IntegerType, false) :: Nil
)
val countDf = sqlContext.createDataFrame(countRdd, schema)
countDf.write.parquet("/user/cloudera/test_parquet_warehouse/orders_count")
연습2. json 파일을 읽어서 HDFS에 avro 파일로 저장해보자. (with snappy compression)
1
2
3
4
5
6
7
8
9
10
import com.databricks.spark.avro._

val personJsonDf = sqlContext.read.json("/user/cloudera/test_json_warehouse")

sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
personJsonDf.write.avro("/user/cloudera/test_avro_warehouse/person")

val personAvroDf = sqlContext.read.avro("/user/cloudera/test_avro_warehouse/person")
personAvroDf.printSchema()
personAvroDf.show(3)

Parquet

Parquet 파일을 다룰 때는 따로 import 해줘야할 라이브러리가 없다. 그냥 sqlContext.read.parquet("inpurt file")을 통해 parquet 파일을 읽으면 된다.

연습3. hdfs://quickstart/user/cloudera/test_parquet_warehouse/orders을 읽어서 HDFS에 avro 파일로 저장해보자.
1
2
3
4
5
6
import com.databricks.spark.avro._

val ordersCountDf = sqlContext.read.parquet("/user/cloudera/test_parquet_warehouse/orders_count")

sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
ordersCountDf.write.avro("/user/cloudera/test_avro_warehouse/orders_count")

참고 자료