Spark) spark_submit시 spark.app.id warning_ jupyterhub
jupyterhub에서 spark_submit을 하는데 warning이 뜬다. 예외처리해주면되는데, 궁금해서 알아봄
spark.app.id는 뭘까
import os, posixpath, socket
import pyspark
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("hgkim-spark")
.config("spark.kryoserializer.buffer.max", "1024m")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.master("k8s://https://172.17.***.56:6443")
.config("spark.kubernetes.container.image", "hgkim/library/spark-3.2.0-base:1.0.0-20220309")
.config("spark.kubernetes.namespace", "spark")
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "jupyter")
.config("spark.driver.port", "2222")
.config("spark.driver.blockManager.port", "7777")
.config("spark.driver.host", socket.gethostbyname(socket.gethostname()))
.config("spark.driver.bindAddress", "0.0.0.0")
.config("spark.driver.memory", "8g")
.config("spark.executor.instances", 5)
.config("spark.executor.memory", "32g")
.config("spark.executor.cores", 3)
.config("spark.dynamicAllocation.enabled", "false")
.getOrCreate()
)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', "***-key")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', "***-key")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.endpoint', "http://***.***.***.***:80")
spark.sparkContext._jsc.hadoopConfiguration().set('com.amazonaws.services.s3.enableV4', "true")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', "false")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
spark
df = spark.read.format("parquet").load("s3a://***.parquet")
df.count()
아래는 에러
ExecutorPodsSnapshotsStoreImpl: Exception when notifying snapshot subscriber.
java.util.NoSuchElementException: spark.app.id
22/03/24 10:06:40 WARN ExecutorPodsSnapshotsStoreImpl: Exception when notifying snapshot subscriber.
java.util.NoSuchElementException: spark.app.id
at org.apache.spark.SparkConf.$anonfun$get$1(SparkConf.scala:245)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.SparkConf.get(SparkConf.scala:245)
at org.apache.spark.SparkConf.getAppId(SparkConf.scala:450)
at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.$anonfun$constructVolumes$4(MountVolumesFeatureStep.scala:88)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.constructVolumes(MountVolumesFeatureStep.scala:57)
at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.configurePod(MountVolumesFeatureStep.scala:34)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.$anonfun$buildFromFeatures$4(KubernetesExecutorBuilder.scala:64)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:63)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:391)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:382)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36(ExecutorPodsAllocator.scala:346)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36$adapted(ExecutorPodsAllocator.scala:339)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:339)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:117)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:117)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:138)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:126)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:81)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
구글링해보니 uniquekey고, 설정하지 않았을 때 NoSuchElementException 나오는게 맞음
- getAppId gives spark.app.id Spark property or reports NoSuchElementException if not set.
Note
|
getAppId is used when:
|
application_id 출력하면 난수로 나온다.
spark.sparkContext.applicationId # 'spark-application-1648084000316'
sparkcontext가 나올때 생성된다고 한다.
spark.master | Master URL | |
spark.app.id | TaskScheduler.applicationId() | Unique identifier of a Spark application that Spark uses to uniquely identify metric sources. Set when SparkContext is created (right after TaskScheduler is started that actually gives the identifier). |
spark.app.name | Application Name |
출처:
https://wanghao989711.gitbooks.io/spark-2-translation/content/spark-SparkConf.html
그럼 그냥 spark.app.id 만들면 안되나?
.config("spark.app.id", "spark-hgkim") \
만들어보니 에러 안난다.
spark.sparkContext.applicationId # 'spark-hgkim'
rest API 호출을 위해서만 사용한다고 하니, 고정해서 사용할 예정이다.
끝