한줄요약: cluster로 thrift 를 올리는 정식 방벙은 아직 안나와서, 추후에 trino로 옮겨야할 것 같다.
개요
- 주제: Spark thrift server를 K8S cluster에 구현해함
- 문제: Cluster로 Thrift Server를 실행하는 방법이 없음
- 해결: 'mykidong' 블로그를 통해 예시를 찾음
이 포스팅은 mkidong님의 예시를 각색해서, 오프라인 온프렘 k8s cluster에 적용한 내용이에요
먼저, Spark Thrift Server 가 무엇인지 간단히 설명하고
예제와 함께, 참고한 블로그와 다른점을 기술하도록 할게요 ㅎ
thrift server란?
spark 에 sql을 날릴 수 있는 Thrift Server 라는게 있어요
daemon 처럼 jdbc 를 날릴 수 있는 환경을 구성해주는거죠
beeline을 이용하거나, dbeaver와 같은 툴을 이용해 조회할 수 있답니다.
+ local에서 실행하는 방법
로컬에서 thrift를 실행하는건 엄청 쉬워요.
기본 spark.tar 에 들어가 있거든요
출처 : https://spark.apache.org/docs/latest/sql-distributed-sql-engine.html
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
그런데 갑자기 큰 부하가 걸리면 아무리 설정값을 바꿔봐도 죽는다는 문제가 있어요ㅜ
maxresultsize를 늘리고 memory를 32G를 주고 shuffle을 키고 그래도..
그래서 kubernetes의 POD를 이용해 올리는 방법을 생각했어요.
참고: https://moons08.github.io/programming/thrift_idl_rpc/
K8S cluster에 thrift server 실행하기
그런데 thrift server를 k8s에 띄우려하니까 문제가 발생했어요.
어떻게 cluster로 실행하지? 아무리 찾아봐도 local에서 실행하는 방법 뿐이 없는데 ㅜ
https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1
딱 필요한 기능을 설명한 블로그가 있어서 따라해봣어요. 블로그와 차이점이 있는 부분은 따로 표시했습니다 :)
총 3가지 준비가 필요해요
- 준비
- hive-metastore, mysql (Pod)
- thrift-custom jar 준비 (Custom)
- Docker Image 빌드
- 실행
- init용 pod 띄우기
- init용 pod에서 명령어 날리기
- 결과
- spark_submit -> Driver -> Executor 1, 2, 3
- spark_submit -> Driver -> Executor 1, 2, 3
hive-metastore, mysql (Pod)
hive-metastore와 mysql이 있어야지 thrift server를 돌릴 수 있어요
먼저 hive-metastore, mysql을 띄워줘요.
kidong 선생님의 블로그 글을 따라하면 쉽게 띄울 수 있어요
https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1
$k get all -n thrift NAME READY STATUS RESTARTS AGE pod/metastore-b84879c9c-6r97b 1/1 Running 2 (4d23h ago) 16d pod/mysql-deployment-967bcd458-bhx45 1/1 Running 1 (5d13h ago) 16d $k get svc -n thrift NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE metastore ClusterIP 10.***.***.35 <none> 9083/TCP 18d mysql-service ClusterIP 10.***.***.238 <none> 3306/TCP 18d |
thrift-custom jar 준비 (Custom)
여기가 핵심인데, 사용자 정의 jar를 만드는거에요
examples/spark-thrift-server에 가서 mvn 빌드를 해줘요
mvn -e -DskipTests=true clean install shade:shade;
[여기가 핵심]
thrift server를 cluster에서 실행하기 위해서는 class를 한번 감싸주는 작업이에요
package io.mykidong.hive;
public class SparkThriftServerRunner {
public static void main(String[] args) {
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(args);
}
}
dirver안에서 class를 호출하거든요
그래서 아래처럼 --class 코드가 추가되요(자세한거는 뒤에서 설명)
opt/spark/bin/spark_submit
(...)
--class io.mykidong.hive.SparkThriftServerRunner \
(...)
local:///opt/spark/jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar
mvn build는 똑같이하면 spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar 가 나와요
여기까지 완료가 되면, hive-metastore/mysql 이 pod로 떠있고 jar 파일도 준비가 되어 있을거에요
+ 참고한 블로그와 다른점
pom.xml : 주석(aws-java-sdk-s3:1.11.375, hadoop-aws:3.2.0, delta)
- kidong 선생님의 예시에는 s3, delta 관련 dependency가 pom.xml에 잇었는데 주석을 했어요.
- 인터넷이 안되는 곳에서 진행하다보니까. --packages 로 라이브로리를 가져오지 못하고 라이브러리 정리가 안되서 dependency가 꼬이더라고요. (.m2폴더에 넣어놨는데 왜 다운로드를 못하는지 이해가 가지 않아요ㅜㅜ)
Docker Image 빌드
방금 만든 jar를 spark 이미지와 함께 빌드해줘요
이 부분은 블로그에 없는데, 내부망에서 제가 오류가 나서 적용한 우회법이에요
아까 pom.xml에서 s3, delta 관련 라이브러리를 뺏는데, delta는 안쓸거니까 s3관련된것만 copy 했어요
Dockerfile은 아래와 같아요
FROM project-private/test/mykidong/spark:v3.0.0 USER root # spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar COPY ./jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar \ /opt/spark/jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar # s3 library COPY ./jars/aws-java-sdk-s3-1.11.375.jar /opt/spark/jars/aws-java-sdk-s3-1.11.375.jar COPY ./jars/hadoop-aws-3.2.0.jar /opt/spark/jars/hadoop-aws-3.2.0.jar |
빌드하고 푸시해줘요
docker push project-private/test/spark-thrift-mykidong:v3.0.0 docker build -t project-private/test/spark-thrift-mykidong:v3.0.0 . |
빌드해주면 준비가 끝이 납니다.
실행
이 부분은 지금 어떻게 관리를 할지 고민하고 있어요.
sparkapplication으로 띄우는게 제일 이쁜거 같은데, 굳이 그래야하나..
deployment로 하면 안되나 라는 생각을 하고 있어요
init용 pod 띄우기
spark-submit만 하면되는데, 저 같은 경우는 로컬에 spark가 없어서 pod로 띄웟어요
그리고 master api에 ip 가 허용이되지 않아서, host 명으로 임시 등록했어요. (이 부분은 공부 더 하고 보충할게요)
$ k run -it temp-spark --image=project-private/test/spark-thrift-mykidong:v3.0.0 -n thrift -- /bin/sh # echo "172.241.xx.xx temp_master" >> /etc/hosts |
init용 pod에서 명령어 날리기
k exec -it -temp-spark -n thrift -- /bin/sh # spark-submit \ /opt/spark/bin/spark-submit \ --master k8s://https://temp_master:6443 \ --deploy-mode cluster \ --name spark-thrift-server \ --class io.hgkim.hive.SparkThriftServerRunner \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-driver-pvc \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-exec-pvc \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.path=/localdir \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.readOnly=false \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.options.claimName=spark-driver-localdir-pvc \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.path=/localdir \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.readOnly=false \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.options.claimName=spark-exec-localdir-pvc \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.hadoop.hive.metastore.client.connect.retry.delay=5 \ --conf spark.hadoop.hive.metastore.client.socket.timeout=1800 \ --conf spark.hadoop.hive.server2.enable.doAs=false \ --conf spark.hadoop.hive.server2.thrift.http.port=10002 \ --conf spark.hadoop.hive.server2.thrift.port=10016 \ --conf spark.hadoop.hive.server2.transport.mode=binary \ --conf spark.hadoop.hive.execution.engine=spark \ --conf spark.hadoop.metastore.catalog.default=spark \ --conf spark.hadoop.fs.s3a.connection.ssl.enabled=true \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.fast.upload=true \ --conf spark.hadoop.fs.s3a.path.style.access=true \ --verbose \ # 아래 부분이 블로그와 다른 부분 --conf spark.kubernetes.namespace=thrift \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.container.image=project-private/test/spark-thrift-mykidong:v3.0.0 \ --conf spark.hadoop.hive.metastore.uris=thrift://10.***.210.**:9083 \ --conf spark.kubernetes.file.upload.path=s3a://test/spark/thrift/extra-jars \ --conf spark.sql.warehouse.dir=s3a://test/spark/thrift/warehouse \ --conf spark.hadoop.fs.defaultFS=s3a://test/spark/thrift \ --conf spark.hadoop.fs.s3a.endpoint=http://10.***.***.***:80 \ #object storage or s3 --conf spark.hadoop.fs.s3a.access.key=test-key \ --conf spark.hadoop.fs.s3a.secret.key=test-key \ --conf spark.executor.instances=6 \ --conf spark.executor.memory=18G \ --conf spark.executor.cores=3 \ --conf spark.driver.memory=32G \ --conf spark.driver.cores=4 \ --conf spark.driver.maxResultSize=32G \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=s3a://test/spark/spark-hs \ --conf spark.shuffle.compress=true \ --conf spark.rdd.compress=true \ local:////opt/spark/jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar |
그러면 결과가 아래처럼 나와요
$k get all -n thrift NAME READY STATUS RESTARTS AGE pod/metastore-b84879c9c-6r97b 1/1 Running 2 (4d23h ago) 16d pod/mysql-deployment-967bcd458-bhx45 1/1 Running 1 (5d13h ago) 16d # init용 pod pod/temp-spark 1/1 Running 2 (5d18h ago) 12d # driver pod/spark-thrift-server-e6e3f87dc6e2c9f4-driver 1/1 Running 0 2d18h # Executor (instance) pod/spark-thrift-server-466d8b7dc6e2e61d-exec-1 1/1 Running 0 2d18h pod/spark-thrift-server-466d8b7dc6e2e61d-exec-2 1/1 Running 0 2d18h pod/spark-thrift-server-466d8b7dc6e2e61d-exec-3 1/1 Running 0 2d18h pod/spark-thrift-server-466d8b7dc6e2e61d-exec-4 1/1 Running 0 2d18h pod/spark-thrift-server-466d8b7dc6e2e61d-exec-5 1/1 Running 0 2d18h pod/spark-thrift-server-466d8b7dc6e2e61d-exec-6 1/1 Running 0 2d18h |
블로그와 다른점
1. --packages 뺌
외부망이 되지 않는 환경이어서 --packages를 사용하면 오류가 나더라고요
.ivy, mvn에 넣어도 안되고, 프록시 설정을 추가해도 안되고..
그래서 외부 repo를 찍고 오는 --package 를 주석했어요.
주석한 부분: --packages com.amazonaws:aws-java-sdk-s3:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0
시행착오
일단 실행은 되는데, 눈물 났어요.. 그래서 징징거리려고 시행착오를 적어봐요
(해결) aws 관련 라이브러리 없다고 에러 로그가 쌓임
1. 문제 : aws 관련 jar 가 있는데, 없다고 에러를 뱉음
2. 해결: pom.xml에서 관련 jar 주석하고 Dockerfile에 aws 관련 jar copy함
3. 설명 :
- 구글링해보니, aws 관련 jar가 2개가 있으면 없다는 에러를 뱉는다고 함.
- '--packages '를 이용하면 dependency를 관리해준다고 함
- 내부망 때문인지 --packages 오류가 생겨서 주석한 것이 문제라고 파악
관련 로그
21/11/25 23:50:06 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 12.3 KiB, free 997.8 MiB)
21/11/25 23:50:06 INFO FileScanRDD: Reading File path: s3a://test/hgkim/part-00000-f3f023db-0f62-4825-b4ac-d412f2a756bd.c000.snappy.parquet, range: 0-8470, partition values: [empty row]
21/11/25 23:50:06 INFO TorrentBroadcast: Started reading broadcast variable 8 with 1 pieces (estimated total size 4.0 MiB)
21/11/25 23:50:06 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 30.5 KiB, free 997.8 MiB)
21/11/25 23:50:06 INFO TorrentBroadcast: Reading broadcast variable 8 took 12 ms
21/11/25 23:50:06 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 453.9 KiB, free 997.3 MiB)
21/11/25 23:50:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 14)
java.lang.NoSuchMethodError: com.amazonaws.http.HttpResponse.getHttpRequest()Lorg/apache/http/client/methods/HttpRequestBase;
at com.amazonaws.services.s3.internal.S3ObjectResponseHandler.handle(S3ObjectResponseHandler.java:57)
at com.amazonaws.services.s3.internal.S3ObjectResponseHandler.handle(S3ObjectResponseHandler.java:29)
at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1555)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1416)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:183)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:182)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:328)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:321)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:354)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.parquet.io.DelegatingSeekableInputStream.read(DelegatingSeekableInputStream.java:61)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:80)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:520)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:272)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:271)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:275)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:490)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
(해결 중)
1. 문제: pod로 실행하다보니, OOM나서 죽으면 답이 없음(미해결)
2.설명:
- 일단 driver 메모리를 32G로 늘리니까 죽지 않음. 16G 였을 때는 OOM남
- spark application이나 eployment로 만들어야할 것같음
- memory를 32G 로 늘리고, Instance를 늘려도 무지막지한 SQL에 죽음ㅜ
$ k get pod -n thrift NAME READY STATUS RESTARTS AGE metastore-b84879c9c-6r97b 1/1 Running 2 (5d ago) 16d mysql-deployment-967bcd458-bhx45 1/1 Running 1 (5d14h ago) 16d spark-thrift-server-17d9ef7dc637cce0-driver 0/1 OOMKilled 0 2d22h spark-thrift-server-21db897dc637eded-exec-1 0/1 Error 0 2d22h spark-thrift-server-21db897dc637eded-exec-2 0/1 Error 0 2d22h spark-thrift-server-21db897dc637eded-exec-3 0/1 Error 0 2d22h spark-thrift-server-21db897dc637eded-exec-4 0/1 Error 0 2d22h spark-thrift-server-21db897dc637eded-exec-5 0/1 Error 0 2d22h spark-thrift-server-21db897dc637eded-exec-6 0/1 Error 0 2d22h |
(해결 중) spark history server에 pvc 연결이 안됨.
1. 문제: object storage pv연결이 안되서 object storage(s3)를 연결함
2. 설명:
- spark history server에는 driver 수동 종료시에만 값이 뜸
- 실시간이나 OOM나면 history server 사용 못함
- k8s 1.21에서는 되는 것 확인함
- k8s 1.22에서는 안됨,
* spark operator 버전 문제인거같음
* 1.22용으로 업그레이드되기를 기다려야하나 고민 중
spark history server 관련해서는 정보가 많이 없어서 다른 검색엔진도 찾아봐야할 것 같아요. (ex. Trino)
참조
https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1
정보가 없던 가뭄 속에서 단비 같은 블로그를 찾았는데, 한국분이셨어요. 감사합니다ㅎㅎ :)
'Data > Spark' 카테고리의 다른 글
Spark) k8s,jupyterhub에서 sparkUI 사용하기 (0) | 2022.03.24 |
---|---|
Spark) spark_submit시 spark.app.id warning_ jupyterhub (0) | 2022.03.24 |
Spark) spark volume data spill 이슈_spark-local-dir (0) | 2022.03.21 |
Spark) Thrift serverHive-Metastore OOM 해결_메모리 추가할당 (0) | 2022.01.10 |
Spark) Spark Base Image Docker Build(VM, 내부망) (0) | 2021.11.11 |