如何让pyspark使用ipython
vim ./bin/pyspark ,在最上面加一行
export PYSPARK_DRIVER_PYTHON=ipython
如何在pyspark中连接cassandra
在启动pyspark时,添加 –packages 参数
./bin/pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0
然后写python代码从cassandra中读取数据:
spark.conf.set("spark.cassandra.connection.host", "172.31.0.21") data_frame = spark.read.format("org.apache.spark.sql.cassandra")\ .options(table="service_name_index", keyspace="jaeger").load() data_frame.filter("start_time>1529459493693772").show() data_frame.rdd.map(lambda r: (r.service_name, 1))\ .reduceByKey(lambda x, y: x+y).collect()
如何打开pyspark即启用jupyter notebook
vim ~/.bash_profile,添加环境变量
export SPARK_HOME="/home/kyle/spark-2.3.0-bin-hadoop2.7" export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
或者,在启动pypark的命令行中设置环境变量
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark \ --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0
节点资源不足报错
如果出现这样的报警日志,则有可能是worker节点的cpu、内存等资源不够
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
解决办法是,可以给worker更多的内存,如果实在没有内存可用,那么可以让spark限制一下内存的使用,在spark-submit的时候添加参数:
~/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --master spark://172.31.0.53:7077 \ --executor-memory 512M \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 \ spark_job_kafka_streaming.py
driver与worker节点不能通讯
如果是以client,而不是cluser模式提交我们的程序(spark-submit 的–deploy-mode 参数),那么我们程序所在的driver节点,和spark的worker节点之间需要网络通讯,如果相应的ip设置不正确,会导致不能正常执行任务。
- 启动master节点时,要设置SPARK_LOCAL_IP环境变量,以及用-h参数指定绑定的ip。
SPARK_LOCAL_IP=172.31.0.53 ./sbin/start-master.sh -h 172.31.0.53
- 启动slave节点时,要设置SPARK_LOCAL_IP环境变量,并指定master时使用正确ip。
SPARK_LOCAL_IP=172.31.0.21 ./sbin/start-slave.sh spark://172.31.0.53:7077
- 在spark-submit的时候,要指定SPARK_LOCAL_HOSTNAME环境变量,设置为正确的ip。
- 在woker与driver节点,检查一下master、slave等节点的网络端口,看能不能正常连接。
使用python虚拟环境
在driver节点,通过spark-submit提交py脚本时,如果我们开发时是用了virtualenv创建了虚拟环境,那么提交spark job时,也需要使用同一个env环境,可以通过环境变量来设置
PYSPARK_DRIVER_PYTHON=/home/ops/spark/env/bin/python
打包python依赖
如果我们的python程序依赖了很多第三方模块,在虚拟环境中通过pip装了很多模块,则在提交spark任务的时候,需要将这些依赖包提交给spark worker。
首先把依赖打包到一个zip文件中:
pip install -t dependencies -r requirements.txt cd dependencies zip -r ../dependencies.zip .
然后在spark-submit时添加参数
spark-submit --py-files dependencies.zip spark_job.py
部署spark的机器不能连接外网
运行spark-submit时,spark会从网上下载一些依赖jar包,比如你指定了cassandra或kafka的模块时,如果我们的server没有访问公网的权限,这时候命令就被卡在一个地方很久。
后面我观察日志发现了一个jar包文件夹,~/.ivy2 ,下面有jars和cache两个文件夹,于是我把自己开发机中这个文件夹的所有文件,copy到部署的机器上去,这时候spark会从cache文件夹中查找,找到了相应的文件,就不会再从互联网下载jar包了,解决了这个问题。
运行 spark 脚本时 google.protobuf 包找不到
运行spark任务时,在worker节点会报错:
ImportError: No module named google.protobuf
虽然我们通过指定 –py-files dependencies.zip 打包了所需的依赖包,但始终还是找不到包 google.protobuf。
后来查了下,原来 google.protobuf 这个包比较特殊,它的路径是通过 site-packages/protobuf-*.pth 文件配置的,site-packages/google 文件夹并不是个普通的python包,里面没有 __init__.py。
所以只有python进程加载 protobuf-*.pth 配置文件后,才能找到包 google.protobuf,而加载这个文件是在python进程启动时。
总之,dependencies.zip 这种方式,无法解法 pth 文件的包路径问题,最终还是在 worker 节点机器上,通过 pip 全局安装了 protobuf 包。