Spark开发部署遇到的问题汇总

如何让pyspark使用ipython

vim ./bin/pyspark ,在最上面加一行

export PYSPARK_DRIVER_PYTHON=ipython

如何在pyspark中连接cassandra

在启动pyspark时,添加 –packages 参数

./bin/pyspark  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

然后写python代码从cassandra中读取数据:

spark.conf.set("spark.cassandra.connection.host", "172.31.0.21")
data_frame = spark.read.format("org.apache.spark.sql.cassandra")\
            .options(table="service_name_index", keyspace="jaeger").load()
data_frame.filter("start_time>1529459493693772").show()
data_frame.rdd.map(lambda r: (r.service_name, 1))\
                 .reduceByKey(lambda x, y: x+y).collect()

如何打开pyspark即启用jupyter notebook

vim ~/.bash_profile,添加环境变量

export SPARK_HOME="/home/kyle/spark-2.3.0-bin-hadoop2.7"
export PYSPARK_DRIVER_PYTHON=jupyter 
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

或者,在启动pypark的命令行中设置环境变量

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark \
--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

节点资源不足报错

如果出现这样的报警日志,则有可能是worker节点的cpu、内存等资源不够

Initial job has not accepted any resources; 
check your cluster UI to ensure that workers are registered 
and have sufficient resources

解决办法是,可以给worker更多的内存,如果实在没有内存可用,那么可以让spark限制一下内存的使用,在spark-submit的时候添加参数:

~/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --master spark://172.31.0.53:7077 \
--executor-memory 512M  \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 \
spark_job_kafka_streaming.py

driver与worker节点不能通讯

如果是以client,而不是cluser模式提交我们的程序(spark-submit 的–deploy-mode 参数),那么我们程序所在的driver节点,和spark的worker节点之间需要网络通讯,如果相应的ip设置不正确,会导致不能正常执行任务。

  • 启动master节点时,要设置SPARK_LOCAL_IP环境变量,以及用-h参数指定绑定的ip。
  SPARK_LOCAL_IP=172.31.0.53 ./sbin/start-master.sh -h 172.31.0.53
  • 启动slave节点时,要设置SPARK_LOCAL_IP环境变量,并指定master时使用正确ip。
  SPARK_LOCAL_IP=172.31.0.21 ./sbin/start-slave.sh spark://172.31.0.53:7077
  • 在spark-submit的时候,要指定SPARK_LOCAL_HOSTNAME环境变量,设置为正确的ip。
  • 在woker与driver节点,检查一下master、slave等节点的网络端口,看能不能正常连接。

使用python虚拟环境

在driver节点,通过spark-submit提交py脚本时,如果我们开发时是用了virtualenv创建了虚拟环境,那么提交spark job时,也需要使用同一个env环境,可以通过环境变量来设置

PYSPARK_DRIVER_PYTHON=/home/ops/spark/env/bin/python 

打包python依赖

如果我们的python程序依赖了很多第三方模块,在虚拟环境中通过pip装了很多模块,则在提交spark任务的时候,需要将这些依赖包提交给spark worker。

首先把依赖打包到一个zip文件中:

pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .

然后在spark-submit时添加参数

spark-submit --py-files dependencies.zip spark_job.py

部署spark的机器不能连接外网

运行spark-submit时,spark会从网上下载一些依赖jar包,比如你指定了cassandra或kafka的模块时,如果我们的server没有访问公网的权限,这时候命令就被卡在一个地方很久。

后面我观察日志发现了一个jar包文件夹,~/.ivy2 ,下面有jars和cache两个文件夹,于是我把自己开发机中这个文件夹的所有文件,copy到部署的机器上去,这时候spark会从cache文件夹中查找,找到了相应的文件,就不会再从互联网下载jar包了,解决了这个问题。

运行 spark 脚本时 google.protobuf 包找不到

运行spark任务时,在worker节点会报错:

ImportError: No module named google.protobuf

虽然我们通过指定 –py-files dependencies.zip 打包了所需的依赖包,但始终还是找不到包 google.protobuf。

后来查了下,原来 google.protobuf 这个包比较特殊,它的路径是通过 site-packages/protobuf-*.pth 文件配置的,site-packages/google 文件夹并不是个普通的python包,里面没有 __init__.py。

所以只有python进程加载 protobuf-*.pth 配置文件后,才能找到包 google.protobuf,而加载这个文件是在python进程启动时。

总之,dependencies.zip 这种方式,无法解法 pth 文件的包路径问题,最终还是在 worker 节点机器上,通过 pip 全局安装了 protobuf 包。