订阅
纠错
加入自媒体

基于Spark的数据分析实践

2019-06-19 09:55
EAWorld
关注

SparkSQL Flow 支持的Sourse

支持从 Hive 获得数据;

支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile

支持RDBMS数据库:PostgreSQL, MySQL,Oracle

支持 NOSQL 数据库:Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 为读取文本文件,把文本文件每行按照 delimiter 指定的字符进行切分,切分不够的列使用 null 填充。

<source type="textfile" table_name="et_rel_pty_cong"              fields="cust_id,name1,gender1,age1:int"               delimiter=","              path="file:///Users/zhenqin/software/hive/user.txt"/>

可左右滑动查看代码

Tablename 为该文件映射的数据表名,可理解为数据的视图;

Fields 为切分后的字段,使用逗号分隔,字段后可紧跟该字段的类型,使用冒号分隔;

Delimiter 为每行的分隔符;

Path 用于指定文件地址,可以是文件,也可是文件夹;

Path 指定地址需要使用协议,如:file:// 、 hdfs://,否则跟 core-site.xml 配置密切相关;

SparkSQL Flow DB Source

<source type="mysql" table_name="et_rel_pty_cong"                table="user"                url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8"                driver="com.mysql.jdbc.Driver"                user="root" password="123456"/>

可左右滑动查看代码

RDBMS 是从数据库使用 JDBC读取 数据集。支持 type 为:db、mysql、oracle、postgres、mssql;

tablename 为该数据表的抽象 table 名称(视图);

url、driver、user,password 为数据库 JDBC 驱动信息,为必须字段;

SparkSQL 会加载该表的全表数据,无法使用 where 条件。


SparkSQL Flow Transformer

<transform type="sql" table_name="cust_id_agmt_id_t" cached="true">            SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids            FROM user_concat_testx            group by c_phone,c_type,c_num</transform>

可左右滑动查看代码

Transform 支持 cached 属性,默认为 false;如果设置为 true,相当于把该结果缓存到内存中,缓存到内存中的数据在后续其它 Transform 中使用能提高计算效率。但是需使用大量内存,开发者需要评估该数据集能否放到内存中,防止出现 OutofMemory 的异常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持输出数据到一个或者多个目标。这些目标,基本覆盖了 Source 包含的外部系统。下面以 Hive 举例说明:

<target type="hive" table_name="cust_id_agmt_id_t"  savemode=”append”target_table_name="cust_id_agmt_id_h"/>

可左右滑动查看代码

table_name 为 source 或者 Transform 定义的表名称;

target_table_name 为 hive 中的表结果,Hive 表可不存在也可存在,sparksql 会根据 DataFrame 的数据类型自动创建表;

savemode 默认为 overwrite 覆盖写入,当写入目标已存在时删除源表再写入;支持 append 模式, 可增量写入。

Target 有一个特殊的 show 类型的 target。用于直接在控制台输出一个 DataFrame 的结果到控制台(print),该 target 用于开发和测试。

<target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/>

可左右滑动查看代码

Rows 用于控制输出多少行数据。

SparkSQL Around

After 用于 Flow 在运行结束后执行的一个环绕,用于记录日志和写入状态。类似 Java 的 try {} finally{ round.execute() }

多个 round 一定会执行,round 异常不会导致任务失败。

<prepare>        <round type="mysql"               sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)               values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())"               url="${jdbc.url}" .../></prepare><after>        <round type="mysql"               sql="update cpic_task_history set               end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}"               url="${jdbc.url}”…/></after>

可左右滑动查看代码

Prepare round 和 after round 配合使用可用于记录 SparkSQL Flow 任务的运行日志。

SparkSQL Around可使用的变量

SparkSQL Around的执行效果

Prepare round 可做插入(insert)动作,after round 可做更新 (update)动作,相当于在数据库表中从执行开始到结束有了完整的日志记录。SparkSQL Flow 会保证round 一定能被执行,而且 round 的执行不影响任务的状态。

SparkSQL Flow 提交

bin/spark-submit --master yarn-client --driver-memory 1G --num-executors 10 --executor-memory 2G --jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar --queue default --name FlowTest etl-flow-0.2.0.jar -f hive-flow-test.xml

可左右滑动查看代码

接收必须的参数 –f,可选的参数为支持 Kerberos 认证的租户名称principal,和其认证需要的密钥文件。

usage: spark-submit --jars etl-flow.jar --class                    com.yiidata.etl.flow.source.FlowRunner -f,--xml-file <arg>     Flow XML File Path    --keytabFile <arg>   keytab File Path(Huawei)    --krb5File <arg>     krb5 File Path(Huawei)    --principal <arg>    principal for hadoop(Huawei)

可左右滑动查看代码

SparkSQL Execution Plan

每个Spark Flow 任务本质上是一连串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的数据表操作。

regiserDataFrameAsTable 是每个 source 和 Transform 的数据在 SparkSQL 中的数据视图,每个视图都会在 SparkContex 中注册一次。

<上一页  1  2  3  4  下一页>  余下全文
声明: 本文由入驻维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。

发表评论

0条评论,0人参与

请输入评论内容...

请输入评论/评论长度6~500个字

您提交的评论过于频繁,请输入验证码继续

暂无评论

暂无评论

    人工智能 猎头职位 更多
    扫码关注公众号
    OFweek人工智能网
    获取更多精彩内容
    文章纠错
    x
    *文字标题:
    *纠错内容:
    联系邮箱:
    *验 证 码:

    粤公网安备 44030502002758号