'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

기타/Python

Python) parquet upsert with delta table

MightyTedKim 2021. 10. 19. 08:43
728x90
반응형

https://docs.delta.io/latest/quick-start.html

 

Quickstart — Delta Lake Documentation

This guide helps you quickly explore the main features of Delta Lake. It provides code snippets that show how to read from and write to Delta tables from interactive, batch, and streaming queries. To create a Delta table, write a DataFrame out in the delta

docs.delta.io

 

parquet를 upsert하는 방식이 있다고 해서 따라해봄요

 

pip download pyspark delta-spark

* jupyter에서는 환경변수 세팅을 위해서 import os를 해줘야합니다.

from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable
import shutil

import os

os.environ['JAVA_HOME'] = '/home/java/jdk1.8.0_301'
#os.environ['SPARK_HOME'] = '/home/bigdata/hgkim/spark-3.1.2-bin-hadoop3.2'
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:1.0.0  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'

spark = SparkSession.builder \
    .appName("quickstart") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

def getDf(spark, resultList):
    # # Create a schema for the dataframe
    schema = []
    for i in resultList[0].keys(): 
        schema.append(StructField(i, StringType(), True))    
    structed_schema = StructType(schema)
    
    rdd = spark.sparkContext.parallelize(resultList)

    df = spark.createDataFrame(rdd, structed_schema)
    return df


print("############# Init : Delete parquet ###############")
try:
    # Clear any previous runs
    shutil.rmtree("./tmp2/delta_sample_data")
except:
    pass

print("############# Init : save as parquet ###############")
init_df = getDf(spark, 
                [
                 {'name': 'ted', 'email': 'aa@naver.com', 'age': 20,  "action": "raw" },
                 {'name': 'john', 'email': 'bb@naver.com', 'age': 21,  "action": "raw"}
                ])
init_df.write.format("delta").save("./tmp2/delta_sample_data")

print("############ Check : Raw Data from parquet ###############")
spark.read.format("delta").load("./tmp2/delta_sample_data").show()

print("########### Load : Data for Upsert #############")
updates_df  = getDf(spark,
                [
                 {'name': 'ted', 'email': 'aa@naver.com', 'age': 21, "action": "edit" },
                 {'name': 'david', 'email': 'cc@naver.com', 'age': 22, "action": "add" }
                ])
updates_df.show()
print("########### Action : Upserting #############")
deltaTable = DeltaTable.forPath(spark, "./tmp2/delta_sample_data")
deltaTable.alias("s") \
  .merge(updates_df.alias("t"),"s.name = t.name and s.email = t.email") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll()\
  .execute()

print("############ Check : Raw Data from parquet ###############")
spark.read.format("delta").load("./tmp2/delta_sample_data").show()

----------------------------
############# Init : Delete parquet ###############
############# Init : save as parquet ###############
############ Check : Raw Data from parquet ###############
+----+------------+---+------+
|name|       email|age|action|
+----+------------+---+------+
|john|bb@naver.com| 21|   raw|
| ted|aa@naver.com| 20|   raw|
+----+------------+---+------+

########### Load : Data for Upsert #############
+-----+------------+---+------+
| name|       email|age|action|
+-----+------------+---+------+
|  ted|aa@naver.com| 21|  edit|
|david|cc@naver.com| 22|   add|
+-----+------------+---+------+

########### Action : Upserting #############
############ Check : Raw Data from parquet ###############
+-----+------------+---+------+
| name|       email|age|action|
+-----+------------+---+------+
|david|cc@naver.com| 22|   add|
| john|bb@naver.com| 21|   raw|
|  ted|aa@naver.com| 21|  edit|
+-----+------------+---+------+

 

주의할 점

 

No Module Named 'delta.tables'

- `pip install delta-spark` 확인

- `--packages io.delta:delta-core_2.12:1.0.0` 의 jar를 확인 -> 오프라인이면 로그에 나오는 cache 폴더에 주면됨

 

Exception: Java gateway process exited before sending the driver its port number

- JAVA 못잡는거라 echo !JAVA_HOME 해보기

- os.environ['JAVA_HOME'] = '/home/java/jdk1.8.0_301'

 

 

참고

 - https://github.com/prasannakumar2012/spark_experiments/blob/master/examples/Delta_Lake.ipynb

728x90
반응형

'기타 > Python' 카테고리의 다른 글

Python) code 내에서 변수 초기화  (0) 2022.11.22
Python) pyspark dataframe append  (0) 2021.10.13
Python) pyspark dataframe overwrite  (0) 2021.10.12