最佳实践 | RDS & POLARDB归档到X-Pack Spark计算

2019 年 11 月 5 日 阿里巴巴数据库技术

1

业务背景

部分RDS和POLARDB For MySQL的用户曾遇到如下场景:当一张表的数据达到几千万时,你查询一次所花的时间会变多。

这时候采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。

本文将介绍如何把这些水平分表的表归档到X-Pack Spark数仓,做统一的大数据计算。

X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。


2

RDS & POLARDB

分表归档到X-Pack Spark步骤


一键关联POLARDB到Spark集群
一键关联主要是做好spark访问RDS & POLARDB的准备工作。

POLARDB表存储

在database ‘test1’中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、... 

具体的建表语句如下:

*请左右滑动阅览

 CREATE TABLE `test1` ( `a` int(11NOT NULL,
                        `b` time DEFAULT NULL,          
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

归档到Spark的调试

x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。

1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。

*请左右滑动阅览

 wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar


2、创建交互式查询

以pyspark为例,下面是具体归档demo的代码:

*请左右滑动阅览

spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("
CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string"
      "
USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
    #构造polardb的表名
    dbtable = "
test1." + "test" + str(num)
    #spark外表关联polardb对应的表
    externalPolarDBTableNow = spark.read \
        .format("
jdbc") \
        .option("
driver", "com.mysql.jdbc.Driver") \
        .option("
url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("
dbtable", dbtable) \
        .option("
user", "name") \
        .option("
password", "xxx*") \
        .load().registerTempTable("
polardbTableTemp")
    #生成本次polardb表数据要写入的spark表的分区信息
    (dtValue, hhValue, mmValue) = ("
20191015", "13", str(05 * num))
    #执行导数据sql 
    spark.sql("
insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "
select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #删除临时的spark映射polardb表的catalog
    spark.catalog.dropTempView("
polardbTableTemp")
    #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
    spark.sql("
show partitions sparktest").show(1000, False)
    spark.sql("
select count(*) from sparktest").show()



归档作业上生产

交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:

/polardb/polardbArchiving.py 内容如下:

*请左右滑动阅览

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(14):
        #构造polardb的表名
        dbtable = "test1." + "test" + str(num)
        #spark外表关联polardb对应的表
        externalPolarDBTableNow = spark.read \
            .format("jdbc") \
            .option("driver""com.mysql.jdbc.Driver") \
            .option("url""jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
            .option("dbtable", dbtable) \
            .option("user""ma,e") \
            .option("password""xxx*") \
            .load().registerTempTable("polardbTableTemp")
        #生成本次polardb表数据要写入的spark表的分区信息
        (dtValue, hhValue, mmValue) = ("20191015""13", str(05 * num))
        #执行导数据sql
        spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
              "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
        #删除临时的spark映射polardb表的catalog
        spark.catalog.dropTempView("polardbTableTemp")
        #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
        spark.sql("show partitions sparktest").show(1000False)
        spark.sql("select count(*) from sparktest").show()
    spark.stop()

扫描下方 ⬇️二维码

了解关于X-Pack Spark计算服务的更多信息



双十一还不知道买什么?
阿里云数据库双11 爆款直降
这份购物清单 ⬇️给你拿去!  

没有套路,买就是划算
“爱码士”们赶紧买起来吧 !
点击 阅读原文”  直达会场

阿里巴巴数据库技术

微信:alibabadba



分享数据库前沿 

解构实战干货

长按二维码关注

登录查看更多
0

相关内容

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Python分布式计算,171页pdf,Distributed Computing with Python
专知会员服务
107+阅读 · 2020年5月3日
【2020新书】Kafka实战:Kafka in Action,209页pdf
专知会员服务
67+阅读 · 2020年3月9日
【新书】Java企业微服务,Enterprise Java Microservices,272页pdf
TensorFlow Lite指南实战《TensorFlow Lite A primer》,附48页PPT
专知会员服务
69+阅读 · 2020年1月17日
【干货】大数据入门指南:Hadoop、Hive、Spark、 Storm等
专知会员服务
95+阅读 · 2019年12月4日
MIT新书《强化学习与最优控制》
专知会员服务
275+阅读 · 2019年10月9日
滴滴离线索引快速构建FastIndex架构实践
InfoQ
21+阅读 · 2020年3月19日
社区分享 | Spark 玩转 TensorFlow 2.0
TensorFlow
15+阅读 · 2020年3月18日
携程用ClickHouse轻松玩转每天十亿级数据更新
DBAplus社群
11+阅读 · 2019年8月6日
Flink 靠什么征服饿了么工程师?
阿里技术
6+阅读 · 2018年8月13日
开发、调试计算机视觉代码有哪些技巧?
AI研习社
3+阅读 · 2018年7月9日
基于 Storm 的实时数据处理方案
开源中国
4+阅读 · 2018年3月15日
Spark机器学习:矩阵及推荐算法
LibRec智能推荐
16+阅读 · 2017年8月3日
Spark App自动化分析和故障诊断
CSDN大数据
7+阅读 · 2017年6月22日
Arxiv
5+阅读 · 2020年3月26日
Advances in Online Audio-Visual Meeting Transcription
Arxiv
4+阅读 · 2019年12月10日
Arxiv
4+阅读 · 2019年1月14日
Accelerated Methods for Deep Reinforcement Learning
Arxiv
6+阅读 · 2019年1月10日
Arxiv
3+阅读 · 2018年1月31日
VIP会员
相关资讯
滴滴离线索引快速构建FastIndex架构实践
InfoQ
21+阅读 · 2020年3月19日
社区分享 | Spark 玩转 TensorFlow 2.0
TensorFlow
15+阅读 · 2020年3月18日
携程用ClickHouse轻松玩转每天十亿级数据更新
DBAplus社群
11+阅读 · 2019年8月6日
Flink 靠什么征服饿了么工程师?
阿里技术
6+阅读 · 2018年8月13日
开发、调试计算机视觉代码有哪些技巧?
AI研习社
3+阅读 · 2018年7月9日
基于 Storm 的实时数据处理方案
开源中国
4+阅读 · 2018年3月15日
Spark机器学习:矩阵及推荐算法
LibRec智能推荐
16+阅读 · 2017年8月3日
Spark App自动化分析和故障诊断
CSDN大数据
7+阅读 · 2017年6月22日
Top
微信扫码咨询专知VIP会员