设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 创业者 数据 手机
当前位置: 首页 > 大数据 > 正文

什么是ETL?一文掌握ETL设计过程

发布时间:2022-10-22 09:21 所属栏目:125 来源:互联网
导读:ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。 更关键的是,数据来源的业务系统也是在不断地更新维护中的,任何一个变更都会对下游的数据分析程序产生巨大的影
  ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。
  更关键的是,数据来源的业务系统也是在不断地更新维护中的,任何一个变更都会对下游的数据分析程序产生巨大的影响。因此,有了ETL过程作为一个缓冲区,当上游的业务系统变更时,只需要对ETL过程进行相应变更,下游的数据分析就能够比较稳定,从而降低系统维护成本。
 
  ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。
 
  1、数据清洗
  首先进行数据清洗,对原始数据中的错误予以纠正,或者对缺失数据进行补填。譬如,现在要建设一个增值税发票的数据中台。这时,系统从许多不同的来源采集与增值税发票相关的数据。当收集完这些原始数据以后,进行数据清洗工作。增值税发票的数据结构如下图所示。
 
  图片
 
  增值税发票的数据结构图
 
  在正常的增值税发票的数据结构中,每张进项发票都应当有至少一条发票明细。然而,可能由于采集的数据不一致,发票与明细经常不是同时到来,可能相差几天,造成用发票分析的数据与用发票明细分析的数据不一致。这时,必须要先补填一个发票明细,虽然商品名称与数量不知道,但至少要保证发票明细的金额之和要等于发票金额,才不至于影响后续的分析质量。至于商品名称,可以暂时补填一个“未知商品”。这样,当该发票真正的发票明细到来时,再覆盖原有补填的明细。
 
  此外,原本每张发票都应当有购方纳税人与销方纳税人,然而由于纳税人信息的基础数据来源于不同的系统,可能造成该发票的纳税人信息不在纳税人信息表中的情况。这时,必须要补填一条纳税人信息,使得发票表与纳税人能够对应上,不会造成数据无法关联而缺失数据。
 
  同理,每个纳税人都应当有各自的税务机关、地域和行业,这些信息都可能缺失。对于税务机关和地域,可以通过纳税人社会信用代码中的内容进行推测。但是,行业信息是无法推测的。即使无法推测,也不能将其置为null,而是填一个默认值X99999,对应到行业表中的“未知行业”。
 
  数据清洗的过程通过SparkSQL来实现。通过SparkSQL从原始表中查询数据,然后经过以下处理过程,最终写入ETL临时表中:
 
  复制
  1/**
   2
   3 * @author fangang
   4
   5 */
   6
   7object ZzsfpJx {
   8
   9  def main(args: Array[String]): Unit = {
  10
  11    val task = LogUtils.start("zzsfpJxQd")
  12
  13    try {
  14
  15      val spark = SparkUtils.init("zzsfpJx")
  16
  17      val ETLFPMAPNUM = PropertyFile.getProperty("ETLFPMAPNUM").toInt
  18
  19      spark.udf.register("getJxfpId", (fpdm:String, fphm:String, kprq:String) =>
  20      if(null==kprq) fpdm+"X"+fphm+"X" else fpdm+"X"+fphm+"X"+kprq)
  21      UdfRegister.fillNsr(spark)
  22      UdfRegister.fillSwjg(spark)
  23      UdfRegister.cutSL(spark)
  24
  25
  26      val result = spark.sql("SELECT getJxfpId(D.FPDM,D.FPHM,D.KPRQ) JXFP_ID,D.FPDM,D.
  27      FPHM,'YB' FP_LB,D.JE JE,cast(cutSL(D.SE/D.JE) as double) SL,D.
  28      SE SE,fillNsr(D.XFSBH) XF_NSRSBH, D.XFMC XF_NSRMC, fillNsr(D.GFSBH) GF_NSRSBH,
  29      D.GFMC GF_NSRMC,D.KPRQ, D.KPRQ RZSJ, D.XF_QXSWJG_DM SWJG_DM,
  30      from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') CZSJ,NSR.SWJG_KEY GF_SWJG_DM,
  31      getSwjg(D.XF_QXSWJG_DM,fillNsr(D.XFSBH)) XF_SWJG_DM, case trim(D.fpzt_dm)
  32      when '0' then 'N' when '1' then 'N' else 'Y' end ZFBZ,'' SKM,'' SHRSBH,'
  33     ' SHRMC,'' FHRSBH,'' FHRMC,'' QYD,'' SKPH, D.JSHJ,'' CZCH,'' CCDW,''
  34     YSHWXX,D.BZ,D.tspz_dm as TSPZBZ,CASE WHEN
  35     length(trim(D.zfrq))>15 THEN D.zfrq ELSE NULL END ZFSJ FROM dzdz.DZDZ_FPXX_ZZSFP
  36     D JOIN DW.DW_DM_NSR NSR ON D.GFSBH = NSR.NSR_KEY and
  37     NSR.WDBZ='1'").repartition(ETLFPMAPNUM)
  38
  39     DataFrameUtils.saveAppend(result, "etl", "etl_jxfp")
  40
  41     LogUtils.end(task)
  42    } catch { case ex:Exception  => LogUtils.error(task, ex) }
  43  }
  44}
  45
  
  在以上SparkSQL程序中,首先从原始数据dzdz.DZDZ_FPXX_ZZSFP中查询数据,通过公用方法UdfRegister.fillNsr(spark)与UdfRegister.fillSwjg(spark)对纳税人与税务机关进行补填,保证发票在与纳税人信息、税务机关信息关联时不会因为数据为null而造成数据缺失。最终,将结果数据写入etl_jxfp的临时表中。
 
  此外,在处理发票明细时加入了这样一段语句:
 
  复制
  1val result1 = spark.sql("SELECT getJxfpqdId(R.FPDM,R.FPHM,R.KPRQ,'00','1') JXFPQD_ID,
   2
   3getJxfpId(R.FPDM,R.FPHM,R.KPRQ) JXFP_ID ,1.0 HH,'YB' FP_LB,
   4
   5'无商品明细' WP_MC,'' WP_DW,'' WP_XH,1.0 WP_SL,R.JE DJ,R.JE, cast(cutSL(R.SL) as double) SL,R.SE,R.RZSJ, from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') CZSJ,R.KPRQ,'00' QDBZ,'' SKPH,'' SFZHM,'' CD,'' HGZS,'' JKZMSH,'' SJDH,'' FDJHM,
   6
   7'' CJHM,'' DH,'' ZH,'' KHYH,'' DW,'' XCRS,0.0 JSHJ,'9999999999999999999' spbm "+s"FROM dzdz.DZDZ_HWXX_ZZSFP D
   8
   9RIGHT JOIN etl.ETL_JXFP R ON (D.FPDM = R.FPDM AND D.FPHM = R.FPHM)
  10
  11WHERE (D.FPDM is null or D.FPHM is null) and R.FP_LB='YB' ").repartition(ETLFPMAPNUM)
  12
  13DataFrameUtils.saveAppend(result1, "etl", "etl_jxfp_qd")
  14
  
  通过该语句在发票明细中加入了名为“无商品明细”的记录,保证发票明细、发票的金额与税额没有缺失,保障后续数据分析的准确性。
 
  2.数据转换
  以上一系列的数据清洗,可以有效杜绝因为缺失数据或关联不上造成的数据分析质量问题。接着,就是数据转换与集成。
 
  数据中台的数据来源于不同的业务系统,因此数据格式、计算口径都可能存在差异。当把它们都抽取到数据中台以后,应当将其转换成统一口径,并规范计算口径。譬如,如何识别代开发票,不同的系统有不同的判断逻辑,但经过数据转换以后,可以在表中增加一个“是否代开发票”字段,这样后续的分析业务就不必再去判断了,直接看该字段即可。此外,同样是税务机关代码,有的系统是9位,有的系统是11位,应该将它们都统一成11位。以上这些工作就是数据转换。
 
  3.数据集成
  清洗和转换工作完成以后,将相同或者相似的数据都集成在一起。譬如,从各个不同路径采集的纳税人信息,包括纳税人的基础信息、认证信息、核定信息、资格信息,都集成到了纳税人表中;从各个不同路径采集的各种不同的增值税发票,如增值税专票、增值税普票、机动车统一销售发票、电子发票等类型的发票,都统一集成到发票信息表中。它们都来源于不同的业务系统,字段与类型都各不相同。因此,在集成的过程中,需要进行转换或补填,彼此格式一致,并最终存入同一张表中。譬如,其他发票都有发票明细,但机动车统一销售发票没有,因此需要给它补填一条发票明细,商品就是那辆汽车,金额与税额都是那张发票的金额与税额。
 
  在具体设计实现上,就是为每一种发票都编写一个发票与发票明细的SparkSQL程序。它们分别从各自的原始数据中获取,但经过一个SQL语句的转换,最终都存入名为etl_jxfp与etl_jxfp_qd的发票与发票明细临时表中。

(编辑:ASP站长网)

    网友评论
    推荐文章
      热点阅读