'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

Data/Spark

Spark) spark_submit시 spark.app.id warning_ jupyterhub

MightyTedKim 2022. 3. 24. 11:21
728x90
반응형

jupyterhub에서 spark_submit을 하는데 warning이 뜬다. 예외처리해주면되는데, 궁금해서 알아봄

spark.app.id는 뭘까

import os, posixpath, socket
import pyspark
from pyspark.sql import SparkSession
​
spark = (
    SparkSession.builder.appName("hgkim-spark")
    .config("spark.kryoserializer.buffer.max", "1024m")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .master("k8s://https://172.17.***.56:6443")
    .config("spark.kubernetes.container.image", "hgkim/library/spark-3.2.0-base:1.0.0-20220309")
    .config("spark.kubernetes.namespace", "spark")
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "jupyter")
    .config("spark.driver.port", "2222")
    .config("spark.driver.blockManager.port", "7777")
    .config("spark.driver.host", socket.gethostbyname(socket.gethostname()))
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.instances", 5)
    .config("spark.executor.memory", "32g")
    .config("spark.executor.cores", 3)
    .config("spark.dynamicAllocation.enabled", "false")
    .getOrCreate()
)
​
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', "***-key")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', "***-key")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.endpoint', "http://***.***.***.***:80")
spark.sparkContext._jsc.hadoopConfiguration().set('com.amazonaws.services.s3.enableV4', "true")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', "false")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
​
spark
​
df = spark.read.format("parquet").load("s3a://***.parquet")

df.count()
​

아래는 에러

ExecutorPodsSnapshotsStoreImpl: Exception when notifying snapshot subscriber.
java.util.NoSuchElementException: spark.app.id

더보기

22/03/24 10:06:40 WARN ExecutorPodsSnapshotsStoreImpl: Exception when notifying snapshot subscriber.
java.util.NoSuchElementException: spark.app.id
at org.apache.spark.SparkConf.$anonfun$get$1(SparkConf.scala:245)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.SparkConf.get(SparkConf.scala:245)
at org.apache.spark.SparkConf.getAppId(SparkConf.scala:450)
at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.$anonfun$constructVolumes$4(MountVolumesFeatureStep.scala:88)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.constructVolumes(MountVolumesFeatureStep.scala:57)
at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.configurePod(MountVolumesFeatureStep.scala:34)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.$anonfun$buildFromFeatures$4(KubernetesExecutorBuilder.scala:64)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:63)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:391)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:382)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36(ExecutorPodsAllocator.scala:346)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36$adapted(ExecutorPodsAllocator.scala:339)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:339)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:117)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:117)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:138)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:126)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:81)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

구글링해보니 uniquekey고, 설정하지 않았을 때 NoSuchElementException 나오는게 맞음

  • getAppId gives spark.app.id Spark property or reports NoSuchElementException if not set.
Note
getAppId is used when:

application_id 출력하면 난수로 나온다.

spark.sparkContext.applicationId # 'spark-application-1648084000316'

sparkcontext가 나올때 생성된다고 한다.

spark.master   Master URL
spark.app.id TaskScheduler.applicationId() Unique identifier of a Spark application that Spark uses to uniquely identify metric sources.
Set when SparkContext is created (right after TaskScheduler is started that actually gives the identifier).
spark.app.name   Application Name

출처: 

https://wanghao989711.gitbooks.io/spark-2-translation/content/spark-SparkConf.html

 

그럼 그냥 spark.app.id 만들면 안되나?

    .config("spark.app.id", "spark-hgkim") \

만들어보니 에러 안난다.

 

spark.sparkContext.applicationId # 'spark-hgkim'

rest API 호출을 위해서만 사용한다고 하니, 고정해서 사용할 예정이다.

 

 

728x90
반응형