728x90
반응형
https://docs.delta.io/latest/quick-start.html
parquet를 upsert하는 방식이 있다고 해서 따라해봄요
pip download pyspark delta-spark
* jupyter에서는 환경변수 세팅을 위해서 import os를 해줘야합니다.
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable
import shutil
import os
os.environ['JAVA_HOME'] = '/home/java/jdk1.8.0_301'
#os.environ['SPARK_HOME'] = '/home/bigdata/hgkim/spark-3.1.2-bin-hadoop3.2'
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:1.0.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'
spark = SparkSession.builder \
.appName("quickstart") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
def getDf(spark, resultList):
# # Create a schema for the dataframe
schema = []
for i in resultList[0].keys():
schema.append(StructField(i, StringType(), True))
structed_schema = StructType(schema)
rdd = spark.sparkContext.parallelize(resultList)
df = spark.createDataFrame(rdd, structed_schema)
return df
print("############# Init : Delete parquet ###############")
try:
# Clear any previous runs
shutil.rmtree("./tmp2/delta_sample_data")
except:
pass
print("############# Init : save as parquet ###############")
init_df = getDf(spark,
[
{'name': 'ted', 'email': 'aa@naver.com', 'age': 20, "action": "raw" },
{'name': 'john', 'email': 'bb@naver.com', 'age': 21, "action": "raw"}
])
init_df.write.format("delta").save("./tmp2/delta_sample_data")
print("############ Check : Raw Data from parquet ###############")
spark.read.format("delta").load("./tmp2/delta_sample_data").show()
print("########### Load : Data for Upsert #############")
updates_df = getDf(spark,
[
{'name': 'ted', 'email': 'aa@naver.com', 'age': 21, "action": "edit" },
{'name': 'david', 'email': 'cc@naver.com', 'age': 22, "action": "add" }
])
updates_df.show()
print("########### Action : Upserting #############")
deltaTable = DeltaTable.forPath(spark, "./tmp2/delta_sample_data")
deltaTable.alias("s") \
.merge(updates_df.alias("t"),"s.name = t.name and s.email = t.email") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll()\
.execute()
print("############ Check : Raw Data from parquet ###############")
spark.read.format("delta").load("./tmp2/delta_sample_data").show()
----------------------------
############# Init : Delete parquet ###############
############# Init : save as parquet ###############
############ Check : Raw Data from parquet ###############
+----+------------+---+------+
|name| email|age|action|
+----+------------+---+------+
|john|bb@naver.com| 21| raw|
| ted|aa@naver.com| 20| raw|
+----+------------+---+------+
########### Load : Data for Upsert #############
+-----+------------+---+------+
| name| email|age|action|
+-----+------------+---+------+
| ted|aa@naver.com| 21| edit|
|david|cc@naver.com| 22| add|
+-----+------------+---+------+
########### Action : Upserting #############
############ Check : Raw Data from parquet ###############
+-----+------------+---+------+
| name| email|age|action|
+-----+------------+---+------+
|david|cc@naver.com| 22| add|
| john|bb@naver.com| 21| raw|
| ted|aa@naver.com| 21| edit|
+-----+------------+---+------+
주의할 점
No Module Named 'delta.tables'
- `pip install delta-spark` 확인
- `--packages io.delta:delta-core_2.12:1.0.0` 의 jar를 확인 -> 오프라인이면 로그에 나오는 cache 폴더에 주면됨
Exception: Java gateway process exited before sending the driver its port number
- JAVA 못잡는거라 echo !JAVA_HOME 해보기
- os.environ['JAVA_HOME'] = '/home/java/jdk1.8.0_301'
참고
- https://github.com/prasannakumar2012/spark_experiments/blob/master/examples/Delta_Lake.ipynb
728x90
반응형
'기타 > Python' 카테고리의 다른 글
Python) code 내에서 변수 초기화 (0) | 2022.11.22 |
---|---|
Python) pyspark dataframe append (0) | 2021.10.13 |
Python) pyspark dataframe overwrite (0) | 2021.10.12 |