実際のSparkschemaEvolution

読者の皆様、良い䞀日を



この蚘事では、NeoflexのBig Data Solutionsビゞネスラむンの䞻芁コンサルタントが、ApacheSparkを䜿甚しお可倉構造ストアフロントを構築するためのオプションに぀いお詳しく説明したす。



デヌタ分析プロゞェクトの䞀環ずしお、倧たかに構造化されたデヌタに基づいおマヌトを構築するタスクが頻繁に発生したす。



通垞、これらはログ、たたはさたざたなシステムからの応答であり、JSONたたはXMLずしお保存されたす。デヌタはHadoopにアップロヌドされ、そこからショヌケヌスを䜜成する必芁がありたす。たずえば、Impalaを介しお、䜜成されたストアフロントぞのアクセスを敎理できたす。



この堎合、タヌゲットストアフロントのレむアりトは以前は䞍明でした。たた、デヌタに䟝存するため、事前に図を䜜成するこずはできず、これらの非垞に匱い構造のデヌタを扱っおいたす。



たずえば、今日、次の回答がログに蚘録されたす。



{source: "app1", error_code: ""}


そしお明日、次の答えは同じシステムから来たす



{source: "app1", error_code: "error", description: "Network error"}


その結果、ストアフロントにもう1぀のフィヌルド説明を远加する必芁があり、それが来るかどうかは誰にもわかりたせん。



このようなデヌタでマヌトを䜜成するタスクはかなり暙準的であり、Sparkにはこのためのツヌルがいく぀かありたす。生デヌタの解析にはJSONずXMLの䞡方がサポヌトされおおり、schemaEvolutionのサポヌトはこれたで知られおいなかったスキヌマに提䟛されたす。



䞀芋、゜リュヌションは単玔に芋えたす。JSONを含むフォルダヌを取埗し、それをデヌタフレヌムに読み蟌む必芁がありたす。Sparkはスキヌマを䜜成し、ネストされたデヌタを構造に倉換したす。次に、ショヌケヌスをHiveメタストアに登録するこずにより、すべおを寄朚现工の床に保存する必芁がありたす。これはImpalaでもサポヌトされおいたす。



すべおが単玔なようです。



ただし、ドキュメントの短い䟋から、実際に倚くの問題をどう凊理するかは明確ではありたせん。



ドキュメントでは、ストアフロントを䜜成するためではなく、JSONたたはXMLをデヌタフレヌムに読み蟌むためのアプロヌチに぀いお説明しおいたす。



぀たり、JSONの読み取りず解析の方法が簡単に瀺されおいたす。



df = spark.read.json(path...)


これは、Sparkがデヌタを利甚できるようにするのに十分です。



実際には、シナリオは、フォルダからJSONファむルを読み取っおデヌタフレヌムを䜜成するよりもはるかに耇雑です。状況は次のようになりたす。すでに特定のショヌケヌスがあり、新しいデヌタが毎日届きたす。スキヌムが異なる可胜性があるこずを忘れずに、ショヌケヌスに远加する必芁がありたす。



ストアフロントを構築するための通垞のスキヌムは次のずおりです。



ステップ1.デヌタがHadoopにロヌドされ、その埌毎日リロヌドされお新しいパヌティションに远加されたす。初期デヌタが日数で分割されたフォルダが芋぀かりたす。



ステップ2。起動の初期化䞭に、このフォルダヌはSparkによっお読み取られお解析されたす。結果のデヌタフレヌムは、たずえば寄朚现工などの分析に䜿甚できる圢匏で保存され、Impalaにむンポヌトできたす。これにより、これたでに蓄積されたすべおのデヌタを含むタヌゲットショヌケヌスが䜜成されたす。



ステップ3.ストアフロントを毎日曎新するダりンロヌドが䜜成されたす。

問題は、増分ロヌド、ショヌケヌスを分割する必芁性、およびショヌケヌスの䞀般的なスキヌムをサポヌトするこずの問題から生じたす。



䟋を挙げたしょう。ストレヌゞを構築する最初のステップが実装され、JSONファむルのフォルダヌぞの゚クスポヌトが構成されおいるずしたす。



それらからデヌタフレヌムを䜜成し、それをショヌケヌスずしお保存するこずは問題ではありたせん。これは、Sparkのドキュメントで簡単に芋぀けるこずができる最初のステップです。



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


すべおが順調のようです。



JSONを読み取っお解析し、デヌタフレヌムを寄朚现工ずしお保存し、䟿利な方法でHiveに登録したす。



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


ショヌケヌスがありたす。



しかし、翌日、゜ヌスからの新しいデヌタが远加されたした。JSONを含むフォルダヌず、このフォルダヌに基づいお䜜成されたショヌケヌスがありたす。゜ヌスからデヌタの次のチャンクをロヌドした埌、デヌタマヌトは1日デヌタを䜿い果たしたす。



論理的な解決策は、ストアフロントを日ごずにパヌティション化するこずです。これにより、翌日ごずに新しいパヌティションを远加できたす。このメカニズムもよく知られおいたす。Sparkを䜿甚するず、パヌティションを個別に曞き蟌むこずができたす。



たず、初期化の読み蟌みを行い、䞊蚘のようにデヌタを保存し、パヌティショニングのみを远加したす。このアクションはストアフロント初期化ず呌ばれ、1回だけ実行されたす。



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


翌日、新しいパヌティションのみをロヌドしたす。



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


残っおいるのは、スキヌマを曎新するためにHiveに再登録するこずだけです。

ただし、ここで問題が発生したす。



最初の問題。遅かれ早かれ、結果の寄朚现工は読むこずができなくなりたす。これは、寄朚现工ずJSONが空のフィヌルドにどのようにアプロヌチするかず関係がありたす。



兞型的な状況を考えおみたしょう。たずえば、JSONは昚日到着したす。



 1: {"a": {"b": 1}},


そしお今日、同じJSONは次のようになりたす。



 2: {"a": null}


それぞれ1行の2぀の異なるパヌティションがあるずしたしょう。

生デヌタ党䜓を読み取るず、Sparkはタむプを刀別し、「a」がタむプ「structure」のフィヌルドであり、タむプINTのネストされたフィヌルド「b」があるこずを理解できたす。ただし、各パヌティションが個別に保存された堎合、互換性のないパヌティションスキヌムを持぀寄朚现工が取埗されたす。



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


この状況はよく知られおいるため、初期デヌタを解析するずきに空のフィヌルドを削陀するオプションが特別に远加されたした。



df = spark.read.json("...", dropFieldIfAllNull=True)


この堎合、寄朚现工は䞀緒に読み取るこずができるパヌティションで構成されたす。

実際にこれをした人は激しく笑うでしょうが。どうしおさらに2぀の状況が発生する可胜性があるためです。たたは3぀。たたは4぀。最初のものは、ほが確実に衚瀺されたすが、数倀タむプはJSONファむルごずに異なっお芋えるずいうこずです。たずえば、{intField1}および{intField1.1}。そのようなフィヌルドが1぀の郚分で芋぀かった堎合、スキヌママヌゞはすべおを正しく読み取り、最も正確なタむプになりたす。ただし、異なる堎合、䞀方はintFieldintになり、もう䞀方はintFielddoubleになりたす。



この状況を凊理するための次のフラグがありたす。



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


これで、パヌティションが配眮されおいるフォルダヌができたした。このフォルダヌは、単䞀のデヌタフレヌムに読み蟌むこずができ、ストアフロント党䜓に有効な寄朚现工です。はい番号。



テヌブルをHiveに登録したこずを思い出しおください。ハむブはフィヌルド名で倧文字ず小文字を区別したせんが、寄朚现工は倧文字ず小文字を区別したす。したがっお、スキヌマがfield1intおよびField1intのパヌティションは、Hiveでは同じですが、Sparkでは同じではありたせん。フィヌルド名は小文字にするこずを忘れないでください。



その埌、すべおが順調に芋えたす。



ただし、すべおがそれほど単玔ずいうわけではありたせん。次に、よく知られおいる問題が発生したす。新しいパヌティションはそれぞれ個別に保存されるため、Sparkサヌビスファむルはパヌティションフォルダにありたす䟋_SUCCESS操䜜成功フラグ。これは、寄朚现工をしようずしたずきに゚ラヌをスロヌしたす。これを回避するには、Sparkがフォルダにサヌビスファむルを远加できないようにしお構成を蚭定する必芁がありたす。



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


珟圚、毎日新しい寄朚现工のパヌティションがタヌゲットのストアフロントフォルダヌに远加され、その日の解析枈みデヌタが保存されおいるようです。デヌタタむプが競合するパヌティションがないように事前に泚意したした。



しかし、私たちの前に3番目の問題がありたす。さらに、Hiveでは、新しいパヌティションごずにスキヌムに歪みが生じた可胜性が高いため、スキヌムが間違っおいるテヌブルの䞀般的なスキヌムは䞍明です。



テヌブルを再登録する必芁がありたす。これは簡単に行うこずができたす。ストアフロント寄朚现工をもう䞀床読み、スキヌマを取埗し、それに基づいおDDLを䜜成したす。これにより、Hiveのフォルダヌを倖郚テヌブルずしお再登録し、タヌゲットストアフロントスキヌマを曎新したす。



私たちは4番目の問題に盎面しおいたす。初めおテヌブルを登録したずきは、Sparkに䟝存しおいたした。今、私たちは自分たちでそれを行いたす、そしおあなたは寄朚现工のフィヌルドがハむブに無効な文字で始たるこずができるこずを芚えおおく必芁がありたす。たずえば、Sparkは、解析できなかった行を「corrupt_record」フィヌルドにスロヌしたす。このようなフィヌルドは、゚スケヌプせずにHiveに登録するこずはできたせん。



これを知っおいるず、次のスキヌムが埗られたす。



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


コヌド "_corrupt_record"、 "` _corrupt_record` "+" "+ f [1] .replace" "、" `"。Replace "<"、 "<` "。Replace"、 " 、 "、` "。replace" array <`"、 "array <"は、DDLを安党にしたす。぀たり、次の代わりになりたす。



create table tname (_field1 string, 1field string)


「_field1、1field」などのフィヌルド名を䜿甚するず、フィヌルド名が゚スケヌプされた堎所で安党なDDLが䜜成されたす。createtable `tname`` _field1` string、 `1field` string。



疑問が生じたす完党なスキヌマでデヌタフレヌムを正しくpfコヌドで取埗する方法はこのpfを取埗するにはどうすればよいですかこれは5番目の問題です。タヌゲットストアフロントの寄朚现工のファむルを含むフォルダヌからすべおのパヌティションのスキヌマを再読み蟌みしたすかこれは最も安党な方法ですが、最も難しい方法です。



スキヌマはすでにHiveにありたす。テヌブル党䜓のスキヌマず新しいパヌティションを組み合わせるこずで、新しいスキヌマを取埗できたす。したがっお、Hiveからテヌブルスキヌマを取埗し、それを新しいパヌティションスキヌマず組み合わせる必芁がありたす。これは、Hiveからテストメタデヌタを読み取り、それを䞀時フォルダに保存し、Sparkで䞡方のパヌティションを䞀床に読み取るこずで実行できたす。



基本的に、必芁なものはすべお揃っおいたす。Hiveの元のテヌブルスキヌマず新しいパヌティションです。デヌタもありたす。残っおいるのは、䜜成されたパヌティションからストアフロントスキヌマず新しいフィヌルドを組み合わせた新しいスキヌマを取埗するこずだけです。



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


次に、前のスニペットのように、テヌブルを登録するためのDDLを䜜成したす。

チェヌン党䜓が正しく機胜しおいる堎合、぀たり、初期化ロヌドがあり、Hiveに正しく䜜成されたテヌブルがある堎合、曎新されたテヌブルスキヌマを取埗したす。



そしお最埌の問題は、パヌティションが壊れおしたうため、Hiveテヌブルにパヌティションを远加するだけでは䞍十分なこずです。Hiveにパヌティション構造を修正させる必芁がありたす。



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


JSONを読み取り、それに基づいおストアフロントを䜜成するずいう単玔なタスクは、倚くの暗黙の問題を克服するこずに぀ながりたす。その解決策は個別に探す必芁がありたす。これらの解決策は単玔ですが、芋぀けるのに長い時間がかかりたす。



ショヌケヌスの建蚭を実斜するために、私はしなければなりたせんでした



  • ストアフロントにパヌティションを远加しお、サヌビスファむルを削陀したす
  • Sparkが入力した元のデヌタの空のフィヌルドを凊理したす
  • 単玔な型を文字列にキャストする
  • フィヌルド名を小文字に倉換する
  • Hiveでの個別のデヌタダンプずテヌブル登録DDL䜜成
  • Hiveず互換性がない可胜性があるフィヌルド名を゚スケヌプするこずを忘れないでください
  • Hiveでテヌブルの登録を曎新する方法を孊ぶ


芁玄するず、ショヌケヌスを構築するずいう決定は倚くの萜ずし穎を隠しおいるこずに泚意しおください。したがっお、実装で問題が発生した堎合は、成功した専門知識を持぀経隓豊富なパヌトナヌに連絡するこずをお勧めしたす。



この蚘事を読んでいただきありがずうございたす。この情報がお圹に立おば幞いです。



All Articles