ãã®èšäºã§ã¯ãNeoflexã®Big Data Solutionsããžãã¹ã©ã€ã³ã®äž»èŠã³ã³ãµã«ã¿ã³ãããApacheSparkã䜿çšããŠå¯å€æ§é ã¹ãã¢ããã³ããæ§ç¯ããããã®ãªãã·ã§ã³ã«ã€ããŠè©³ãã説æããŸãã
ããŒã¿åæãããžã§ã¯ãã®äžç°ãšããŠã倧ãŸãã«æ§é åãããããŒã¿ã«åºã¥ããŠããŒããæ§ç¯ããã¿ã¹ã¯ãé »ç¹ã«çºçããŸãã
éåžžããããã¯ãã°ããŸãã¯ããŸããŸãªã·ã¹ãã ããã®å¿çã§ãããJSONãŸãã¯XMLãšããŠä¿åãããŸããããŒã¿ã¯Hadoopã«ã¢ããããŒããããããããã·ã§ãŒã±ãŒã¹ãäœæããå¿ èŠããããŸããããšãã°ãImpalaãä»ããŠãäœæãããã¹ãã¢ããã³ããžã®ã¢ã¯ã»ã¹ãæŽçã§ããŸãã
ãã®å Žåãã¿ãŒã²ããã¹ãã¢ããã³ãã®ã¬ã€ã¢ãŠãã¯ä»¥åã¯äžæã§ããããŸããããŒã¿ã«äŸåãããããäºåã«å³ãäœæããããšã¯ã§ããããããã®éåžžã«åŒ±ãæ§é ã®ããŒã¿ãæ±ã£ãŠããŸãã
ããšãã°ã仿¥ã次ã®åçããã°ã«èšé²ãããŸãã
{source: "app1", error_code: ""}
ãããŠææ¥ã次ã®çãã¯åãã·ã¹ãã ããæ¥ãŸãïŒ
{source: "app1", error_code: "error", description: "Network error"}
ãã®çµæãã¹ãã¢ããã³ãã«ãã1ã€ã®ãã£ãŒã«ãïŒèª¬æïŒã远å ããå¿ èŠãããããããæ¥ããã©ããã¯èª°ã«ãããããŸããã
ãã®ãããªããŒã¿ã§ããŒããäœæããã¿ã¹ã¯ã¯ããªãæšæºçã§ãããSparkã«ã¯ãã®ããã®ããŒã«ãããã€ããããŸããçããŒã¿ã®è§£æã«ã¯JSONãšXMLã®äž¡æ¹ããµããŒããããŠãããschemaEvolutionã®ãµããŒãã¯ãããŸã§ç¥ãããŠããªãã£ãã¹ããŒãã«æäŸãããŸãã
äžèŠããœãªã¥ãŒã·ã§ã³ã¯åçŽã«èŠããŸããJSONãå«ããã©ã«ããŒãååŸãããããããŒã¿ãã¬ãŒã ã«èªã¿èŸŒãå¿ èŠããããŸããSparkã¯ã¹ããŒããäœæãããã¹ããããããŒã¿ãæ§é ã«å€æããŸããæ¬¡ã«ãã·ã§ãŒã±ãŒã¹ãHiveã¡ã¿ã¹ãã¢ã«ç»é²ããããšã«ããããã¹ãŠã坿šçް工ã®åºã«ä¿åããå¿ èŠããããŸããããã¯Impalaã§ããµããŒããããŠããŸãã
ãã¹ãŠãåçŽãªããã§ãã
ãã ããããã¥ã¡ã³ãã®çãäŸãããå®éã«å€ãã®åé¡ãã©ãåŠçãããã¯æç¢ºã§ã¯ãããŸããã
ããã¥ã¡ã³ãã§ã¯ãã¹ãã¢ããã³ããäœæããããã§ã¯ãªããJSONãŸãã¯XMLãããŒã¿ãã¬ãŒã ã«èªã¿èŸŒãããã®ã¢ãããŒãã«ã€ããŠèª¬æããŠããŸãã
ã€ãŸããJSONã®èªã¿åããšè§£æã®æ¹æ³ãç°¡åã«ç€ºãããŠããŸãã
df = spark.read.json(path...)
ããã¯ãSparkãããŒã¿ãå©çšã§ããããã«ããã®ã«ååã§ãã
å®éã«ã¯ãã·ããªãªã¯ããã©ã«ãããJSONãã¡ã€ã«ãèªã¿åã£ãŠããŒã¿ãã¬ãŒã ãäœæãããããã¯ããã«è€éã§ããç¶æ³ã¯æ¬¡ã®ããã«ãªããŸãããã§ã«ç¹å®ã®ã·ã§ãŒã±ãŒã¹ããããæ°ããããŒã¿ãæ¯æ¥å±ããŸããã¹ããŒã ãç°ãªãå¯èœæ§ãããããšãå¿ããã«ãã·ã§ãŒã±ãŒã¹ã«è¿œå ããå¿ èŠããããŸãã
ã¹ãã¢ããã³ããæ§ç¯ããããã®éåžžã®ã¹ããŒã ã¯æ¬¡ã®ãšããã§ãã
ã¹ããã1.ããŒã¿ãHadoopã«ããŒãããããã®åŸæ¯æ¥ãªããŒããããŠæ°ããããŒãã£ã·ã§ã³ã«è¿œå ãããŸããåæããŒã¿ãæ¥æ°ã§åå²ããããã©ã«ããèŠã€ãããŸãã
ã¹ããã2ãèµ·åã®åæåäžã«ããã®ãã©ã«ããŒã¯Sparkã«ãã£ãŠèªã¿åãããŠè§£æãããŸããçµæã®ããŒã¿ãã¬ãŒã ã¯ãããšãã°å¯æšçް工ãªã©ã®åæã«äœ¿çšã§ãã圢åŒã§ä¿åãããImpalaã«ã€ã³ããŒãã§ããŸããããã«ããããããŸã§ã«èç©ããããã¹ãŠã®ããŒã¿ãå«ãã¿ãŒã²ããã·ã§ãŒã±ãŒã¹ãäœæãããŸãã
ã¹ããã3.ã¹ãã¢ããã³ããæ¯æ¥æŽæ°ããããŠã³ããŒããäœæãããŸãã
åé¡ã¯ãå¢åããŒããã·ã§ãŒã±ãŒã¹ãåå²ããå¿ èŠæ§ãããã³ã·ã§ãŒã±ãŒã¹ã®äžè¬çãªã¹ããŒã ããµããŒãããããšã®åé¡ããçããŸãã
äŸãæããŸããããã¹ãã¬ãŒãžãæ§ç¯ããæåã®ã¹ããããå®è£ ãããJSONãã¡ã€ã«ã®ãã©ã«ããŒãžã®ãšã¯ã¹ããŒããæ§æãããŠãããšããŸãã
ãããããããŒã¿ãã¬ãŒã ãäœæãããããã·ã§ãŒã±ãŒã¹ãšããŠä¿åããããšã¯åé¡ã§ã¯ãããŸãããããã¯ãSparkã®ããã¥ã¡ã³ãã§ç°¡åã«èŠã€ããããšãã§ããæåã®ã¹ãããã§ãã
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
ãã¹ãŠãé 調ã®ããã§ãã
JSONãèªã¿åã£ãŠè§£æããããŒã¿ãã¬ãŒã ã坿šçް工ãšããŠä¿åãã䟿å©ãªæ¹æ³ã§Hiveã«ç»é²ããŸãã
df.write.format(âparquetâ).option('path','<External Table Path>').saveAsTable('<Table Name>')
ã·ã§ãŒã±ãŒã¹ããããŸãã
ããããç¿æ¥ããœãŒã¹ããã®æ°ããããŒã¿ã远å ãããŸãããJSONãå«ããã©ã«ããŒãšããã®ãã©ã«ããŒã«åºã¥ããŠäœæãããã·ã§ãŒã±ãŒã¹ããããŸãããœãŒã¹ããããŒã¿ã®æ¬¡ã®ãã£ã³ã¯ãããŒãããåŸãããŒã¿ããŒãã¯1æ¥ããŒã¿ã䜿ãæãããŸãã
è«ççãªè§£æ±ºçã¯ãã¹ãã¢ããã³ããæ¥ããšã«ããŒãã£ã·ã§ã³åããããšã§ããããã«ãããç¿æ¥ããšã«æ°ããããŒãã£ã·ã§ã³ã远å ã§ããŸãããã®ã¡ã«ããºã ãããç¥ãããŠããŸããSparkã䜿çšãããšãããŒãã£ã·ã§ã³ãåå¥ã«æžã蟌ãããšãã§ããŸãã
ãŸããåæåã®èªã¿èŸŒã¿ãè¡ããäžèšã®ããã«ããŒã¿ãä¿åããããŒãã£ã·ã§ãã³ã°ã®ã¿ã远å ããŸãããã®ã¢ã¯ã·ã§ã³ã¯ã¹ãã¢ããã³ãåæåãšåŒã°ãã1åã ãå®è¡ãããŸãã
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
ç¿æ¥ãæ°ããããŒãã£ã·ã§ã³ã®ã¿ãããŒãããŸãã
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
æ®ã£ãŠããã®ã¯ãã¹ããŒããæŽæ°ããããã«Hiveã«åç»é²ããããšã ãã§ãã
ãã ããããã§åé¡ãçºçããŸãã
æåã®åé¡ãé ããæ©ãããçµæã®å¯æšçް工ã¯èªãããšãã§ããªããªããŸããããã¯ã坿šçް工ãšJSONã空ã®ãã£ãŒã«ãã«ã©ã®ããã«ã¢ãããŒãããããšé¢ä¿ããããŸãã
å žåçãªç¶æ³ãèããŠã¿ãŸããããããšãã°ãJSONã¯æšæ¥å°çããŸãã
1: {"a": {"b": 1}},
ãããŠä»æ¥ãåãJSONã¯æ¬¡ã®ããã«ãªããŸãã
2: {"a": null}
ãããã1è¡ã®2ã€ã®ç°ãªãããŒãã£ã·ã§ã³ããããšããŸãããã
çããŒã¿å šäœãèªã¿åããšãSparkã¯ã¿ã€ããå€å¥ãããaããã¿ã€ããstructureãã®ãã£ãŒã«ãã§ãããã¿ã€ãINTã®ãã¹ãããããã£ãŒã«ããbããããããšãçè§£ã§ããŸãããã ããåããŒãã£ã·ã§ã³ãåå¥ã«ä¿åãããå Žåãäºææ§ã®ãªãããŒãã£ã·ã§ã³ã¹ããŒã ãæã€å¯æšçް工ãååŸãããŸãã
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
ãã®ç¶æ³ã¯ããç¥ãããŠãããããåæããŒã¿ãè§£æãããšãã«ç©ºã®ãã£ãŒã«ããåé€ãããªãã·ã§ã³ãç¹å¥ã«è¿œå ãããŸããã
df = spark.read.json("...", dropFieldIfAllNull=True)
ãã®å Žåã坿šçް工ã¯äžç·ã«èªã¿åãããšãã§ããããŒãã£ã·ã§ã³ã§æ§æãããŸãã
å®éã«ãããããäººã¯æ¿ããç¬ãã§ãããããã©ãããŠïŒããã«2ã€ã®ç¶æ³ãçºçããå¯èœæ§ãããããã§ãããŸãã¯3ã€ããŸãã¯4ã€ãæåã®ãã®ã¯ãã»ãŒç¢ºå®ã«è¡šç€ºãããŸãããæ°å€ã¿ã€ãã¯JSONãã¡ã€ã«ããšã«ç°ãªã£ãŠèŠãããšããããšã§ããããšãã°ã{intFieldïŒ1}ããã³{intFieldïŒ1.1}ããã®ãããªãã£ãŒã«ãã1ã€ã®éšåã§èŠã€ãã£ãå Žåãã¹ããŒãããŒãžã¯ãã¹ãŠãæ£ããèªã¿åããæãæ£ç¢ºãªã¿ã€ãã«ãªããŸãããã ããç°ãªãå Žåãäžæ¹ã¯intFieldïŒintã«ãªããããäžæ¹ã¯intFieldïŒdoubleã«ãªããŸãã
ãã®ç¶æ³ãåŠçããããã®æ¬¡ã®ãã©ã°ããããŸãã
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
ããã§ãããŒãã£ã·ã§ã³ãé 眮ãããŠãããã©ã«ããŒãã§ããŸããããã®ãã©ã«ããŒã¯ãåäžã®ããŒã¿ãã¬ãŒã ã«èªã¿èŸŒãããšãã§ããã¹ãã¢ããã³ãå šäœã«æå¹ãªå¯æšçް工ã§ããã¯ãïŒçªå·ã
ããŒãã«ãHiveã«ç»é²ããããšãæãåºããŠãã ããããã€ãã¯ãã£ãŒã«ãåã§å€§æåãšå°æåãåºå¥ããŸãããã坿šçް工ã¯å€§æåãšå°æåãåºå¥ããŸãããããã£ãŠãã¹ããŒããfield1ïŒintããã³Field1ïŒintã®ããŒãã£ã·ã§ã³ã¯ãHiveã§ã¯åãã§ãããSparkã§ã¯åãã§ã¯ãããŸããããã£ãŒã«ãåã¯å°æåã«ããããšãå¿ããªãã§ãã ããã
ãã®åŸããã¹ãŠãé 調ã«èŠããŸãã
ãã ãããã¹ãŠãããã»ã©åçŽãšããããã§ã¯ãããŸãããæ¬¡ã«ãããç¥ãããŠããåé¡ãçºçããŸããæ°ããããŒãã£ã·ã§ã³ã¯ããããåå¥ã«ä¿åããããããSparkãµãŒãã¹ãã¡ã€ã«ã¯ããŒãã£ã·ã§ã³ãã©ã«ãã«ãããŸãïŒäŸïŒ_SUCCESSæäœæåãã©ã°ïŒãããã¯ã坿šçް工ãããããšãããšãã«ãšã©ãŒãã¹ããŒããŸãããããåé¿ããã«ã¯ãSparkããã©ã«ãã«ãµãŒãã¹ãã¡ã€ã«ã远å ã§ããªãããã«ããŠæ§æãèšå®ããå¿ èŠããããŸãã
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
çŸåšãæ¯æ¥æ°ãã坿šçް工ã®ããŒãã£ã·ã§ã³ãã¿ãŒã²ããã®ã¹ãã¢ããã³ããã©ã«ããŒã«è¿œå ããããã®æ¥ã®è§£ææžã¿ããŒã¿ãä¿åãããŠããããã§ããããŒã¿ã¿ã€ããç«¶åããããŒãã£ã·ã§ã³ããªãããã«äºåã«æ³šæããŸããã
ããããç§ãã¡ã®åã«3çªç®ã®åé¡ããããŸããããã«ãHiveã§ã¯ãæ°ããããŒãã£ã·ã§ã³ããšã«ã¹ããŒã ã«æªã¿ãçããå¯èœæ§ãé«ããããã¹ããŒã ãééã£ãŠããããŒãã«ã®äžè¬çãªã¹ããŒã ã¯äžæã§ãã
ããŒãã«ãåç»é²ããå¿ èŠããããŸããããã¯ç°¡åã«è¡ãããšãã§ããŸããã¹ãã¢ããã³ã坿šçް工ãããäžåºŠèªã¿ãã¹ããŒããååŸããããã«åºã¥ããŠDDLãäœæããŸããããã«ãããHiveã®ãã©ã«ããŒãå€éšããŒãã«ãšããŠåç»é²ããã¿ãŒã²ããã¹ãã¢ããã³ãã¹ããŒããæŽæ°ããŸãã
ç§ãã¡ã¯4çªç®ã®åé¡ã«çŽé¢ããŠããŸããåããŠããŒãã«ãç»é²ãããšãã¯ãSparkã«äŸåããŠããŸãããä»ãç§ãã¡ã¯èªåãã¡ã§ãããè¡ããŸãããããŠããªãã¯å¯æšçް工ã®ãã£ãŒã«ãããã€ãã«ç¡å¹ãªæåã§å§ãŸãããšãã§ããããšãèŠããŠããå¿ èŠããããŸããããšãã°ãSparkã¯ãè§£æã§ããªãã£ãè¡ããcorrupt_recordããã£ãŒã«ãã«ã¹ããŒããŸãããã®ãããªãã£ãŒã«ãã¯ããšã¹ã±ãŒãããã«Hiveã«ç»é²ããããšã¯ã§ããŸããã
ãããç¥ã£ãŠãããšã次ã®ã¹ããŒã ãåŸãããŸãã
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
ã³ãŒãïŒ "_corrupt_record"ã "` _corrupt_record` "ïŒ+" "+ f [1] .replaceïŒ"ïŒ "ã" `ïŒ"ïŒãReplaceïŒ "<"ã "<` "ïŒãReplaceïŒ"ã " ã "ã` "ïŒãreplaceïŒ" array <`"ã "array <"ïŒã¯ãDDLãå®å šã«ããŸããã€ãŸããæ¬¡ã®ä»£ããã«ãªããŸãã
create table tname (_field1 string, 1field string)
ã_field1ã1fieldããªã©ã®ãã£ãŒã«ãåã䜿çšãããšããã£ãŒã«ãåããšã¹ã±ãŒããããå Žæã§å®å šãªDDLãäœæãããŸããcreatetable `tname`ïŒ` _field1` stringã `1field` stringïŒã
çåãçããŸãïŒå®å šãªã¹ããŒãã§ããŒã¿ãã¬ãŒã ãæ£ããïŒpfã³ãŒãã§ïŒååŸããæ¹æ³ã¯ïŒãã®pfãååŸããã«ã¯ã©ãããã°ããã§ããïŒããã¯5çªç®ã®åé¡ã§ããã¿ãŒã²ããã¹ãã¢ããã³ãã®å¯æšçް工ã®ãã¡ã€ã«ãå«ããã©ã«ããŒãããã¹ãŠã®ããŒãã£ã·ã§ã³ã®ã¹ããŒããåèªã¿èŸŒã¿ããŸããïŒããã¯æãå®å šãªæ¹æ³ã§ãããæãé£ããæ¹æ³ã§ãã
ã¹ããŒãã¯ãã§ã«Hiveã«ãããŸããããŒãã«å šäœã®ã¹ããŒããšæ°ããããŒãã£ã·ã§ã³ãçµã¿åãããããšã§ãæ°ããã¹ããŒããååŸã§ããŸãããããã£ãŠãHiveããããŒãã«ã¹ããŒããååŸãããããæ°ããããŒãã£ã·ã§ã³ã¹ããŒããšçµã¿åãããå¿ èŠããããŸããããã¯ãHiveãããã¹ãã¡ã¿ããŒã¿ãèªã¿åãããããäžæãã©ã«ãã«ä¿åããSparkã§äž¡æ¹ã®ããŒãã£ã·ã§ã³ãäžåºŠã«èªã¿åãããšã§å®è¡ã§ããŸãã
åºæ¬çã«ãå¿ èŠãªãã®ã¯ãã¹ãŠæã£ãŠããŸããHiveã®å ã®ããŒãã«ã¹ããŒããšæ°ããããŒãã£ã·ã§ã³ã§ããããŒã¿ããããŸããæ®ã£ãŠããã®ã¯ãäœæãããããŒãã£ã·ã§ã³ããã¹ãã¢ããã³ãã¹ããŒããšæ°ãããã£ãŒã«ããçµã¿åãããæ°ããã¹ããŒããååŸããããšã ãã§ãã
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
次ã«ãåã®ã¹ããããã®ããã«ãããŒãã«ãç»é²ããããã®DDLãäœæããŸãã
ãã§ãŒã³å šäœãæ£ããæ©èœããŠããå Žåãã€ãŸããåæåããŒãããããHiveã«æ£ããäœæãããããŒãã«ãããå ŽåãæŽæ°ãããããŒãã«ã¹ããŒããååŸããŸãã
ãããŠæåŸã®åé¡ã¯ãããŒãã£ã·ã§ã³ãå£ããŠããŸããããHiveããŒãã«ã«ããŒãã£ã·ã§ã³ã远å ããã ãã§ã¯äžååãªããšã§ããHiveã«ããŒãã£ã·ã§ã³æ§é ãä¿®æ£ãããå¿ èŠããããŸãã
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
JSONãèªã¿åããããã«åºã¥ããŠã¹ãã¢ããã³ããäœæãããšããåçŽãªã¿ã¹ã¯ã¯ãå€ãã®æé»ã®åé¡ãå æããããšã«ã€ãªãããŸãããã®è§£æ±ºçã¯åå¥ã«æ¢ãå¿ èŠããããŸãããããã®è§£æ±ºçã¯åçŽã§ãããèŠã€ããã®ã«é·ãæéãããããŸãã
ã·ã§ãŒã±ãŒã¹ã®å»ºèšã宿œããããã«ãç§ã¯ããªããã°ãªããŸããã§ããïŒ
- ã¹ãã¢ããã³ãã«ããŒãã£ã·ã§ã³ã远å ããŠããµãŒãã¹ãã¡ã€ã«ãåé€ããŸã
- Sparkãå ¥åããå ã®ããŒã¿ã®ç©ºã®ãã£ãŒã«ããåŠçããŸã
- åçŽãªåãæååã«ãã£ã¹ããã
- ãã£ãŒã«ãåãå°æåã«å€æãã
- Hiveã§ã®åå¥ã®ããŒã¿ãã³ããšããŒãã«ç»é²ïŒDDLäœæïŒ
- Hiveãšäºææ§ããªãå¯èœæ§ããããã£ãŒã«ãåããšã¹ã±ãŒãããããšãå¿ããªãã§ãã ãã
- Hiveã§ããŒãã«ã®ç»é²ãæŽæ°ããæ¹æ³ãåŠã¶
èŠçŽãããšãã·ã§ãŒã±ãŒã¹ãæ§ç¯ãããšããæ±ºå®ã¯å€ãã®èœãšã穎ãé ããŠããããšã«æ³šæããŠãã ããããããã£ãŠãå®è£ ã§åé¡ãçºçããå Žåã¯ãæåããå°éç¥èãæã€çµéšè±å¯ãªããŒãããŒã«é£çµ¡ããããšããå§ãããŸãã
ãã®èšäºãèªãã§ããã ãããããšãããããŸãããã®æ å ±ãã圹ã«ç«ãŠã°å¹žãã§ãã