-
随机森林案例
""" Random Forest Classifier Example. """ import os import time import findspark import pandas as pd import pyspark.pandas as pyd from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession from sklearn.datasets import load_breast_cancer if __name__ == "__main__": findspark.init() os.environ["PYARROW_IGNORE_TIMEZONE"] = '1' spark = SparkSession .builder .appName("随机森林针对乳腺癌数据分析") .master("local[*]") .getOrCreate() # 加载数据 cancer = load_breast_cancer() data = pd.DataFrame(data=cancer.data, columns=cancer.feature_names) data["label"] = cancer.target data = pyd.from_pandas(data) data = data.to_spark(index_col='index') # 合并特征 data = data.rdd.map(lambda x: (Vectors.dense(x[1:-1]), x[-1])).toDF(["features", "label"]) data.show() # 索引标签,将元数据添加到标签列。 # 适合整个数据集以包含索引中的所有标签。 labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data) # 自动识别分类特征,并对其进行索引。 # 设置 maxCategories,使具有 > 4 个不同值的特征被视为连续的。 featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data) # 将数据分成训练集和测试集(30% 用于测试) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # 训练一个随机森林模型。 rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", seed=60, maxDepth=16, numTrees=60) # 将索引标签转换回原始标签。 labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels) # 在管道中链接索引和随机森林 pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter]) # 训练模型。 运行索引器。 model = pipeline.fit(trainingData) # 做预测 predictions = model.transform(testData) # 选择要显示的示例行 predictions.select("predictedLabel", "label", "features").show(5) # 选择(预测,真实标签)并计算测试误差 evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test Right = {}".format(accuracy * 100)) rfModel = model.stages[2] print(rfModel) # summary only time.sleep(3000) spark.stop()
-
随机森林项目
-
main
""" main """ import os from time import time import pandas as pd import findspark from pyspark.ml.feature import VectorAssembler from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession import pyspark.pandas as pyd import DownloadData from SeaBornPlot import Seaborn_Plot from ConcatData import concat_data from DataPreprocess import time_preprocess import ExtractTrainData, ExtractTestData, PlotPredict, ExtractFullTrainData import TrainingAndPredict if __name__ == '__main__': findspark.init() spark = SparkSession .builder .appName("基于pyspark的PDM项目") .master("local[*]") .getOrCreate() os.environ["PYARROW_IGNORE_TIMEZONE"] = '1' # # 1. 下载数据 # # DownloadData.download_data() # # exit(0) # # # 2. 合并训练数据 # train_act_val, train_alarm, test_act_val, test_alarm = concat_data() # # # 3.预处理,按照时间排序 # train_act_val, train_alarm, test_act_val, test_alarm = time_preprocess( # train_act_val, train_alarm, test_act_val, test_alarm) # # # 4. 绘图查看特征 # # seaborn = Seaborn_Plot() # # seaborn.plot_seaborn(train_act_val, train_alarm) # # # 5. 提取训练数据 # """先分flag,后提取特征,速度慢,但可针对每个flag进行更细致的操作""" # train_data = ExtractTrainData.extract_train_data(train_act_val, train_alarm) # # """先提取特征,后flag,速度快,但数据处理相对粗糙,如少于window滑窗的单个flag较多,会损失较大信息""" # # train_data = ExtractFullTrainData.extract_train_data(train_act_val, train_alarm) # # # 6. 提取测试集 # test_data = ExtractTestData.extract_test_all_data(test_act_val) # TODO:从HDFS文件系统读取CSV数据 # TODO 大数据 train_data = pyd.read_csv("/export/data_pdm/train_data.csv") test_data = pyd.read_csv("/export/data_pdm/test_data.csv") test_data_yuan = pd.read_csv("/export/data_pdm/test_data_small.csv") test_alarm = pd.read_csv("/export/data_pdm/test_alarm.csv") # TODO 小数据 # train_data = pyd.read_csv("/export/data_pdm/train_data_small.csv") # test_data = pyd.read_csv("/export/data_pdm/test_data_small.csv") # test_data_yuan = pd.read_csv("/export/data_pdm/test_data_small.csv") # test_alarm = pd.read_csv("/export/data_pdm/test_alarm.csv") # TODO 数据处理 train_data = train_data.drop(["_c0"], axis=1) test_data = test_data.drop(["_c0"], axis=1) test_alarm = test_alarm.drop(["Unnamed: 0"], axis=1) # TODO:数据并行处理 # 处理训练集数据 train_data = train_data.iloc[:, 2:] train_data = train_data.to_spark(index_col='index') train_data = train_data.rdd.map(lambda x: (Vectors.dense(x[1:-1]), x[-1])).toDF(["features", "label"]) # 处理测试集数据 test_data = test_data.iloc[:, 2:] test_data = test_data.to_spark(index_col='index') test_data = test_data.rdd.map(lambda x: (Vectors.dense(x[1:]), x[0])).toDF(["features", "index"]) # 7. 训练并预测数据集 ts = time() data_predict = TrainingAndPredict.training_and_predict(train_data, test_data) te = time() print("spark总耗时:", te - ts) exit(0) # 8. 实时绘图显示 # TODO:预测表与真实值的结合 test_data_yuan["errno"] = 0 test_data_yuan['label'] = data_predict test_data_yuan['alarmtime'] = test_data_yuan['acttime'] # 将act_time时间转为alarmtime,好进行对比 data_predict = test_data_yuan[["flag", "alarmtime", "errno", "label"]].copy() data_predict.alarmtime = data_predict.alarmtime.astype('datetime64[ns]') test_alarm.alarmtime = test_alarm.alarmtime.astype('datetime64[ns]') # 实时绘图显示 PlotPredict.plot_predict(data_predict, test_alarm)
-
traning.py
from time import time from pyspark.ml.classification import RandomForestClassifier from Parameters import Parameters # 超参数 测试集必须取与训练集相同的特征参数 select_features = Parameters.train_features.get('select_features') random = Parameters.test_features.get('random') n_estimators = Parameters.test_features.get('n_estimators') def training_and_predict(data, test_data): # 训练一个随机森林模型。 rfc = RandomForestClassifier(labelCol="label", featuresCol="features", seed=random) # 训练模型。 运行索引器。 ts = time() rfc = rfc.fit(data) te = time() print("spark训练时间:", te - ts) # 预测概率 ts = time() predictions = rfc.transform(test_data) te = time() print("spark预测时间", te - ts) # 返回概率 probability = predictions.select("probability") probability = probability.toPandas() probability = probability.probability.apply(lambda x: x[1]) return probability
-