さびたダミーのさまざまなデータタイプのデータセットでのPySparkMLでのモデル開発

PySpark MLで複数のデータタイプを操作する方法をすでに知っていますか?番号?その後、あなたは緊急に私たちを必要としています。



画像



こんにちは!興味深いものを1つ詳しく説明したいのですが、残念ながら、Sparkドキュメントのトピックではありません。さまざまなデータタイプ(文字列と数値)のデータセットでPySpark MLのモデルをトレーニングする方法は? Sparkの公式チュートリアルでは、1つのデータタイプの記号だけでなく、通常は1つの記号を使用する例が示されているため、この記事を書きたいと思ったのは、コードを含む必要な記事を探すために数日間インターネットを閲覧する必要があったためです。いくつかの列は、より異なるタイプのデータであり、ありません。しかし、データを処理するためのPySparkの機能を詳細に研究した結果、動作するコードを記述し、すべてがどのように発生するかを理解することができました。これを皆さんと共有したいと思います。だから全速力で先に、友達!



最初に、作業に必要なすべてのライブラリをインポートしてから、コードを詳細に分析して、自尊心のある「さびたティーポット」が、最近、すべてを理解できるようにします。



#  
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#     
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


次に、(ローカル)SparkコンテキストとSparkセッションを作成し、それを画面に表示してすべてが機能するかどうかを確認しましょう。Sparkセッションの作成は、Sparkでデータセットを操作するための開始点です。



#  
sc = SparkContext('local')
spark = SparkSession(sc)
spark






データを操作するためのツールがあります。それをロードしましょう。この記事では、Kaggleマシンラーニングコンペティションサイトから取得したデータセットを使用してい

ます。https

//www.kaggle.com/unitednations/international-greenhouse-gas-emissionsダウンロード後、.csv形式でpath_csvに保存され、次のオプションがあります。



  • ヘッダー:ファイルの最初の行がヘッダーの場合、「true」を入力します
  • 区切り文字:1行のデータを記号で区切る記号を付けます。多くの場合、「、」または「;」です。
  • inferSchema:trueの場合、PySparkは各列のタイプを自動的に検出します。そうでない場合は、自分で作成する必要があります。


#   .csv  path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)


私たちが扱っているデータの種類をよりよく理解するために、それらの行のいくつかを見てみましょう。



#   
data.show()




データセットにある行数も見てみましょう。

#  
data.select('year').count()






最後に、データの種類を推測しましょう。覚えているように、オプション( "inferSchema"、 "true")を使用して自動的に決定するようにPySparkに依頼しました。



#     
data.printSchema()






それでは、メインディッシュに移りましょう。さまざまなデータタイプのいくつかの兆候を処理します。Sparkは、変換されたデータでモデルをトレーニングできます。予測された列はベクトルであり、特徴のある列もベクトルであるため、タスクが複雑になります...しかし、あきらめず、PySparkでモデルをトレーニングするには、パイプラインを使用します。パイプラインに特定のアクションプラン(変数ステージ):



  1. step label_stringIdx:予測する値データセットの列をSparkベクトル文字列に変換し、名前を変更して、パラメーターhandleInvalid = 'keep'でラベル付けします。これは、予測列がnullをサポートすることを意味します。
  2. step stringIndexer:文字列列をSparkカテゴリ文字列に変換します
  3. encoder: ()
  4. assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
  5. gbt: PySpark ML GBTRegressor,


#value -      - 
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#   :    
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #        
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#   : 
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#    - - 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


データセットをそれぞれ70%から30%のお気に入りの比率でトレーニングサンプルとテストサンプルに分割し、以前に1つのベクトル「機能」に結合された機能に基づいてラベルベクトルを予測する勾配回帰ブースティングツリー(GBTRegressor)を使用してモデルのトレーニングを開始しましょう。反復可能な制限maxIter = 10の場合:



#       (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#  (   )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

#   stages    
pipeline = Pipeline(stages=stages)


そして今、私たちはコンピュータに行動計画とトレーニングデータセットを送る必要があります:



#  
model = pipeline.fit(trainingData)

#     
predictions = model.transform(testData)


モデルを保存して、再トレーニングせずにいつでも使用できるようにします。



# 
pipeline.write().overwrite().save('model/gbtregr_model')


また、トレーニング済みモデルを予測に再び使用することにした場合は、次のように記述してください。



#     
load_model = pipeline.read().load('model/gbtregr_model')




そこで、Python言語でビッグデータを操作するためのツールであるPySparkで、さまざまなデータタイプのいくつかのフィーチャ列を操作する方法を確認しました。



これをモデルに適用する時が来ました...



All Articles