了解如何在Novel Corona病毒数据集上的Spark中使用AWS Glue进行ETL操作
AWS Glue是一项完全托管的无服务器ETL服务,可用于准备和加载数据以进行数据分析。 该服务可用于对数据进行分类,清理,充实并在不同的数据存储之间可靠地移动。 在本文中,我将解释如何使用AWS Glue在Novel Corona病毒数据集的Spark中执行ETL操作。
本文将涵盖以下主题:
·glue成分。
· 完成教程以编写Glue spark Job。
· 从AWS S3提取数据。
· 使用Spark转换数据。
· 以实木复合地板格式将转换后的数据存储回S3。
glue成分
简而言之,AWS Glue具有以下重要组件:
· 数据源和数据目标:作为输入提供的数据存储(从中为ETL加载数据)称为数据源,而存储转换后的数据的数据存储为数据目标。
· 数据目录:数据目录是AWS Glue的中央元数据存储库,可在区域中的所有服务之间共享。 该目录包含表定义,作业定义和其他控制信息,以管理您的AWS Glue环境。
· 搜寻器和分类器:搜寻器是从数据存储区检索数据架构的程序。 搜寻器使用自定义或内置分类器来标识数据格式,并在目录中填充元数据表。
· 数据库和表:每次成功的搜寻器运行都会在数据目录中填充一个数据库表。 目录中的数据库是一组关联的表。 每个表仅具有数据的元数据信息,例如列名,数据类型定义,分区信息,并且实际数据保留在数据存储区中。 ETL作业运行中的源和目标使用数据库中的一个或多个表。
· 作业和触发器:执行ETL任务是实际的业务逻辑。 作业由转换脚本,数据源和数据目标组成。 我们可以在python或pyspark中定义Jobs。 作业运行由可以计划或由事件触发的触发器启动。
我留下了一些组件,因为它们不在本文的讨论范围之内。 有关AWS Glue的详细研究,您可以访问官方开发人员指南。
在AWS Glue上使用PySpark进行ETL
现在我们已经了解了Glue的不同组成部分,现在我们可以跳入如何在AWS中编写Glue Jobs并执行实际的提取,转换和加载(ETL)操作。
新型电晕病毒数据集:
该数据集是从Kaggle数据集获得的。 我使用的版本最近一次更新是在2020年5月2日。此数据集中的主文件为covid_19_data.csv。数据集的详细说明如下。
· Sno-序列号
· ObservationDate —以MM / DD / YYYY表示的观察日期
· 省/州-观测值的省或州(缺失时可以为空)
· 国家/地区—观察国家
· 上次更新-针对指定省或国家/地区的行更新时间,以UTC为单位。 (未标准化,因此请在使用前清洁)
· 已确认-截至该日为止已确认病例的累计数量
· 死亡人数-截至该日期为止的累计死亡人数
· 已追回-截至该日为止已追回的案件总数
1)设置我们的数据存储:
作为开发端到端ETL工作的第一步,我们将首先设置数据存储。 转到you s3控制台并在其中创建存储桶。 我们将使用以下分区方案将数据集文件上传到存储桶中:
s3://bucket-name/dataset/year=<year>/month=<month>/day=<day>/hour=<hour>/
现在,我们正在处理一个文件,因此您可以手动创建分区并上载文件,但是如果要处理大量文件,则可以使用我在上一篇文章中介绍的FTP文件提取代码为您完成此工作。 以这种方式对数据进行分区有助于在使用AWS Athena时进行查询优化。
2)创建AWS Glue角色
创建一个Glue角色,该角色将允许Glue访问不同的AWS资源,例如s3。 转到IAM控制台并添加一个新角色,并将AWSGlueServiceRole策略附加到该角色。 该策略包含访问Glue,CloudWatch,EC2,S3和IAM的权限。 有关如何为Glue设置IAM角色的更多详细信息,请考虑以下链接。
3)设置搜寻器到目录数据
执行以下步骤来添加搜寻器:
· 在左侧菜单上,单击数据库并添加数据库。
· 现在转到爬虫和一个新的爬虫
· 选择数据存储
· 提供s3存储桶路径
· 选择glue角色
· 将频率设置为按需运行
· 选择数据库
· 最后查看并单击完成
· 现在,您的搜寻器已创建。 单击" Run Crawler"对数据集进行分类
· 搜寻器可能需要一些时间来对数据进行分类。 成功运行后,必须在指定的数据库中创建表
4)为ETL工作添加glue作业
现在我们已经对数据集进行了分类,现在我们可以添加一个胶水作业,它将对数据集执行ETL工作。
· 在左侧菜单上,单击"作业"并添加一个新作业。 Glue可以自动生成可用于执行ETL操作的python或pyspark脚本。 但是,在我们的情况下,我们将提供一个新脚本。
· 如下设置作业属性
· 将以下内容保留为默认值
· 将最大容量设置为2,将作业超时设置为40分钟。 您设置的DPU数量(最大容量)越多,产生的成本就越高。
· 由于我们没有连接到任何RDBMS,因此无需设置任何连接。 单击"保存作业并编辑脚本"。
· 我们将有以下屏幕。 在下面的编辑器中从我的GitHub存储库中复制并粘贴代码,然后点击保存。 现在单击运行作业按钮。
· 根据工作,作业可能需要一些时间才能执行(在这种情况下,需要15到30分钟)。 到目前为止,Glue Jobs的冷启动时间至少为10分钟,此后开始执行作业。
· 如果作业执行成功,您将在胶粘作业中指定的目标存储段中以实木复合地板格式汇总结果。
了解Spark作业
如果执行了上述所有步骤,那么您应该通过AWS Glue成功执行了ETL作业。 在本节中,我将深入研究正在执行实际ETL操作的Spark代码。
glue和Spark导入和参数
在最顶部,我们有必要的Glue和Spark导入。
导入后,我们将设置一些参数。 这包括获取作业名称,设置spark和胶粘上下文,初始化作业以及定义目录数据库和表名称以及s3输出路径。
提取
ETL操作的"提取"部分除了连接到某个数据存储并从那里获取数据外什么也没有做。 代码的摘录部分执行以下操作:
· 使用胶水目录创建一个Glue DynamicFrame。 目录数据库和表名称已提供。
· 创建动态框架后,我们使用toDF()方法将其转换为Spark数据框架。 转换为spark数据框将使我们能够使用所有spark转换和动作。
转变
在ETL操作的"转换"部分中,我们对数据应用了不同的转换。 代码的转换部分执行以下操作:
· 首先,我们在spark中使用drop()方法删除"最后更新"列(无特殊原因)。
· 然后使用spark中的dropna()方法删除具有4个以上null字段的任何行。
· 然后,使用spark中的fillna()方法使用自定义值" na_province_state"填充"省/州"列中的缺失值。
· 接下来,我们对数据集执行汇总。 已执行3种不同的汇总。 我们检查一个国家/地区中哪个省/州的病例最多,死亡人数最多,恢复率最高。 这是通过使用groupBy()方法将记录按"省/州"和"国家/地区"列分组,使用max()方法与max(Confirmed),max(Deaths)和max(Recovered)列进行聚合,然后将其排序的 使用orderBy()方法降序。
· 最后,我们使用fromDF()方法将数据帧转换回Glue DynamicFrame,以将结果保存在S3中。 它使用三个参数:数据帧,粘合上下文和所得DynamicFrame的名称。
加载
在ETL操作的加载部分中,我们将转换后的数据存储到某些持久性存储中,例如s3。 代码的加载部分执行以下操作:
我们使用DynamicFrame()的from_options()方法将结果保存到s3。 此方法采用以下参数:
· frame:我们要编写的DynamicFrame。
· connection_type:在这种情况下为s3我们要写入的目标数据存储。
· connection_options:这里我们指定目标s3路径和数据格式(在这种情况下为镶木地板)以保存数据。
· Transformation_ctx:可选转换上下文。
所有3个汇总结果都以拼花格式保存在目标路径中。
摘要
在本文中,我们学习了如何使用AWS Glue在Spark中进行ETL操作。 我们学习了如何设置数据源和数据目标,如何创建搜寻器以对s3上的数据进行分类以及如何编写Glue Spark Job来执行提取,转换和加载(ETL)操作。
完整的spark代码可以在我的GitHub存储库中找到:
furqanshahid85-python / AWS-Glue-Pyspark-ETL-Job
感谢您阅读❤。
(本文翻译自Furqan Butt的文章《Extract, Transform, Load (ETL) — AWS Glue》,参考:https://towardsdatascience.com/extract-transform-load-etl-aws-glue-edd383218cfd)
还没有评论,来说两句吧...