栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Python

PySpark——随机森林分类案例

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

PySpark——随机森林分类案例

PySpark——随机森林分类案例 一、随机森林
  1. 随机森林案例

    """
    Random Forest Classifier Example.
    """
    import os
    import time
    
    import findspark
    import pandas as pd
    import pyspark.pandas as pyd
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.linalg import Vectors
    from pyspark.sql import SparkSession
    from sklearn.datasets import load_breast_cancer
    
    if __name__ == "__main__":
        findspark.init()
        os.environ["PYARROW_IGNORE_TIMEZONE"] = '1'
        spark = SparkSession 
            .builder 
            .appName("随机森林针对乳腺癌数据分析") 
            .master("local[*]") 
            .getOrCreate()
    
        # 加载数据
        cancer = load_breast_cancer()
        data = pd.DataFrame(data=cancer.data, columns=cancer.feature_names)
        data["label"] = cancer.target
        data = pyd.from_pandas(data)
        data = data.to_spark(index_col='index')
    
        # 合并特征
        data = data.rdd.map(lambda x: (Vectors.dense(x[1:-1]), x[-1])).toDF(["features", "label"])
        data.show()
    
        # 索引标签,将元数据添加到标签列。
        # 适合整个数据集以包含索引中的所有标签。
        labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
    
        # 自动识别分类特征,并对其进行索引。
        # 设置 maxCategories,使具有 > 4 个不同值的特征被视为连续的。
        featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data)
    
        # 将数据分成训练集和测试集(30% 用于测试)
        (trainingData, testData) = data.randomSplit([0.7, 0.3])
    
        # 训练一个随机森林模型。
        rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", seed=60, maxDepth=16,
                                    numTrees=60)
    
        # 将索引标签转换回原始标签。
        labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                                       labels=labelIndexer.labels)
    
        # 在管道中链接索引和随机森林
        pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
    
        # 训练模型。 运行索引器。
        model = pipeline.fit(trainingData)
    
        # 做预测
        predictions = model.transform(testData)
    
        # 选择要显示的示例行
        predictions.select("predictedLabel", "label", "features").show(5)
    
        # 选择(预测,真实标签)并计算测试误差
        evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction",
                                                      metricName="accuracy")
    
        accuracy = evaluator.evaluate(predictions)
        print("Test Right = {}".format(accuracy * 100))
    
        rfModel = model.stages[2]
        print(rfModel)  # summary only
        time.sleep(3000)
        spark.stop()
    
  2. 随机森林项目

    1. main

      """
          main
      """
      import os
      from time import time
      
      import pandas as pd
      import findspark
      from pyspark.ml.feature import VectorAssembler
      from pyspark.ml.linalg import Vectors
      from pyspark.sql import SparkSession
      import pyspark.pandas as pyd
      import DownloadData
      from SeaBornPlot import Seaborn_Plot
      from ConcatData import concat_data
      from DataPreprocess import time_preprocess
      import ExtractTrainData, ExtractTestData, PlotPredict, ExtractFullTrainData
      import TrainingAndPredict
      
      if __name__ == '__main__':
          findspark.init()
          spark = SparkSession 
              .builder 
              .appName("基于pyspark的PDM项目") 
              .master("local[*]") 
              .getOrCreate()
          os.environ["PYARROW_IGNORE_TIMEZONE"] = '1'
      
          # # 1. 下载数据
          # # DownloadData.download_data()
          # # exit(0)
          #
          # # 2. 合并训练数据
          # train_act_val, train_alarm, test_act_val, test_alarm = concat_data()
          #
          # # 3.预处理,按照时间排序
          # train_act_val, train_alarm, test_act_val, test_alarm = time_preprocess(
          #     train_act_val, train_alarm, test_act_val, test_alarm)
          #
          # # 4. 绘图查看特征
          # # seaborn = Seaborn_Plot()
          # # seaborn.plot_seaborn(train_act_val, train_alarm)
          #
          # # 5. 提取训练数据
          # """先分flag,后提取特征,速度慢,但可针对每个flag进行更细致的操作"""
          # train_data = ExtractTrainData.extract_train_data(train_act_val, train_alarm)
          #
          # """先提取特征,后flag,速度快,但数据处理相对粗糙,如少于window滑窗的单个flag较多,会损失较大信息"""
          # # train_data = ExtractFullTrainData.extract_train_data(train_act_val, train_alarm)
          #
          # # 6. 提取测试集
          # test_data = ExtractTestData.extract_test_all_data(test_act_val)
      
          # TODO:从HDFS文件系统读取CSV数据
          # TODO 大数据
          train_data = pyd.read_csv("/export/data_pdm/train_data.csv")
          test_data = pyd.read_csv("/export/data_pdm/test_data.csv")
          test_data_yuan = pd.read_csv("/export/data_pdm/test_data_small.csv")
          test_alarm = pd.read_csv("/export/data_pdm/test_alarm.csv")
      
          # TODO 小数据
          # train_data = pyd.read_csv("/export/data_pdm/train_data_small.csv")
          # test_data = pyd.read_csv("/export/data_pdm/test_data_small.csv")
          # test_data_yuan = pd.read_csv("/export/data_pdm/test_data_small.csv")
          # test_alarm = pd.read_csv("/export/data_pdm/test_alarm.csv")
      
          # TODO 数据处理
          train_data = train_data.drop(["_c0"], axis=1)
          test_data = test_data.drop(["_c0"], axis=1)
          test_alarm = test_alarm.drop(["Unnamed: 0"], axis=1)
      
          # TODO:数据并行处理
          # 处理训练集数据
          train_data = train_data.iloc[:, 2:]
          train_data = train_data.to_spark(index_col='index')
          train_data = train_data.rdd.map(lambda x: (Vectors.dense(x[1:-1]), x[-1])).toDF(["features", "label"])
      
          # 处理测试集数据
          test_data = test_data.iloc[:, 2:]
          test_data = test_data.to_spark(index_col='index')
          test_data = test_data.rdd.map(lambda x: (Vectors.dense(x[1:]), x[0])).toDF(["features", "index"])
      
          # 7. 训练并预测数据集
          ts = time()
          data_predict = TrainingAndPredict.training_and_predict(train_data, test_data)
          te = time()
          print("spark总耗时:", te - ts)
          exit(0)
      
          # 8. 实时绘图显示
          # TODO:预测表与真实值的结合
          test_data_yuan["errno"] = 0
          test_data_yuan['label'] = data_predict
          test_data_yuan['alarmtime'] = test_data_yuan['acttime']  # 将act_time时间转为alarmtime,好进行对比
          data_predict = test_data_yuan[["flag", "alarmtime", "errno", "label"]].copy()
          data_predict.alarmtime = data_predict.alarmtime.astype('datetime64[ns]')
          test_alarm.alarmtime = test_alarm.alarmtime.astype('datetime64[ns]')
      
          # 实时绘图显示
          PlotPredict.plot_predict(data_predict, test_alarm)
      
    2. traning.py

      from time import time
      from pyspark.ml.classification import RandomForestClassifier
      from Parameters import Parameters
      
      # 超参数 测试集必须取与训练集相同的特征参数
      select_features = Parameters.train_features.get('select_features')
      random = Parameters.test_features.get('random')
      n_estimators = Parameters.test_features.get('n_estimators')
      
      
      def training_and_predict(data, test_data):
          # 训练一个随机森林模型。
          rfc = RandomForestClassifier(labelCol="label", featuresCol="features", seed=random)
      
          # 训练模型。 运行索引器。
          ts = time()
          rfc = rfc.fit(data)
          te = time()
          print("spark训练时间:", te - ts)
      
          # 预测概率
          ts = time()
          predictions = rfc.transform(test_data)
          te = time()
          print("spark预测时间", te - ts)
      
          # 返回概率
          probability = predictions.select("probability")
          probability = probability.toPandas()
          probability = probability.probability.apply(lambda x: x[1])
      
          return probability
      
转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1038600.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号