题目:
代码:fault_detect.py
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext def detect(new_values,fault_state): for value in new_values: if value<1.0: fault_state=1.0 elif value>100.0 and fault_state==1.0: fault_state=2.0 return fault_state else: fault_state=0.0 return fault_state if __name__=="__main__": if len(sys.argv)!=3: sys.exit(-1) sc=SparkContext(appName="FaultDetect") sc.setLogLevel("ERROR") ssc=StreamingContext(sc,1) ssc.checkpoint("file:///root/spark/stateful") lines=ssc.socketTextStream(sys.argv[1],int(sys.argv[2])) counts=lines.map(lambda line:line.split()) .map(lambda id_amount:(id_amount[0],float(id_amount[1]))) .updateStateByKey(detect) result=counts.filter(lambda x:x[1]==2.0).map(lambda x:x[0]+"is detected as fault") result.pprint() ssc.start() ssc.awaitTermination()
搭建集群:
在spark-master窗口:
新开一个窗口:spark-worker1:
运行:
注意最后的端口号要与master的一样!!!
spark-submit --master spark://spark-master:7077 fault_detect.py spark-master 9001