大数据:数据同步

概述

  • 数据同步场景类型:
  1. 主数据库与备份数据库之间的数据备份;
  2. 主系统与子系统间的数据更新;
  3. 属于同类型不同集群数据库间的数据同步;
  4. 不同区域、不同数据库类型间的数据传输交换;
  • 大数据系统:数据从业务系统同步进入数据仓库、数据从数据仓库同步进入数据服务或者数据应用。(本篇主讲数据从业务系统同步进入数据仓库)

一、数据同步基础

  • 源业务系统的数据类型:
  1. 源于关系型数据库的结构化数据(如MySQL、Oracle、DB2、SQL Server等);
  2.  源于非关系型数据库的非结构化数据(如OceanBase、HBase、MongoDB等,数据主要存储在数据库表中);
  3. 源于文件系统的结构化或非结构化数据(如阿里云对象存储 OSS、文件存储 NAS等,数据通常以文件形式进行存储);
  • 数据同步需要针对不同的数据类型及业务场景选择不同的同步方式:直连同步、数据文件同步、数据库日志解析同步。

1、直连同步

  • 直连同步:通过定义好的规范接口 API 和基于动态链接库的方式直接连接业务库。(如 ODBC/JDBC 等规定了统一规范的标准接口,不同数据库基于这套标准接口提供规范的驱动,致辞完全相同的函数调用和 SQL 实现)
  • 大数据:数据同步
  1. 优点:配置简单,实现容易,比较适合操作型业务系统的数据同步。
  2.  缺点:对源系统的性能影响较大,当执行大批量数据同步是会降低甚至拖垮业务系统的性能。
  •  如果业务库采取主备策略,则可以从备库抽取数据,避免对业务系统产生性能影响;但是当数据量较大时,采取此种抽取方式性能较差,不太适合从业务系统到数据仓库系统的同步。

2、数据文件同步

  • 通过约定好的文件编码、大小、格式等,直接从源系统生成数据的文本文件,由专门的文件服务器,如 FTP 服务器传输到目标系统后,加载到目标数据库系统中。
  1. 当数据源包含多个异构的数据库系统(如 MySQL、Oracle、SQL Server、DB2等)时,用这种方式比较简单、使用。
  2. 互联网的日志类数据,通常是以文本文件形式存在的,也适合使用数据文件同步方式。
  3. 大数据:数据同步
  •  有通过文件服务器上传、下载可能会造成丢包或错误,为了确保数据文件同步的完整性,通常除了上传数据文件本身以外,还会上传一个校验文件,该校验文件记录了数据文件的数据量以及文件大小等校验信息,以供下游目标系统验证数据同步的准确性。
  • 从源系统生成数据文件的过程中,可以增加压缩和加密功能,传输到目标系统以后,再对数据进行解压缩和解密,这样可以大大提高文件的传输效率和安全性。

3、数据库日志解析同步

  •  数据库日志解析同步:通过解析日志文件获取发生变更的数据,从未满足增量数据同步的需求。(日志文件信息足够丰富,数据格式稳定)
  • 目前广泛应用于从业务系统到数据仓库的增量数据同步应用中。
  • 以 Oracle 为例:
  1. 通过源系统的进程,读取归档日志文件用以收集变化的数据信息;
  2. 判断日志中的变更是否属于被收集对象,将其解析到目标文件中;(这种读操作是在操作系统层面完成的,不需要通过数据库,不会给源系统带来性能影响。)
  3. 通过网络协议,实现源系统和目标系统间的数据文件传输。(相关进程可以确保数据文件的正确接收和网络数据包的正确顺序,并提供网络传输冗余,确保数据文件的完整性;)
  4. 数据被传输到目标系统后,可通过数据加载模块完成数据的导入,从而实现数据从源系统到目标系统的同步;
  • 大数据:数据同步
  • 优点:性能好,效率高,实现了实时与准实时同步的能力,延迟可以控制在毫秒级别,并且对业务系统的性能影响比较小。
  • 问题/缺点:
  1. 数据延迟。(例:业务系统做批量补录可能会使数据更新量超出系统处理峰值,导致数据延迟。)
  2. 投入较大。(需要在源数据库与目标数据库之间部署一个系统实施抽取数据。)
  3. 数据漂移和遗漏。(数据漂移,一般是对增量表而言的,通常指该表的同一个业务日期数据中包含前一天或后一天凌晨附近的数据或者丢失当天的变更数据。)
  • 由于数据库日志抽取一般是获取所有的数据记录的变更(增、删、该),落地到目标表时需要根据主键去重按照日志事件倒排序获取最后状态的变化情况。(对于删除数据这种变更,针对不同的业务场景可以采用一些不同的落地手法。)
  • 实例:

  • 下图为源业务系统中某表变更日志流水表。其含义:存在5条变更日志,其中主键为 1 的记录有 3 条变更日志,主键为 2 的记录有 2 条变更日志。

  • 大数据:数据同步
  • 针对删除数据这种变更,主要有三种方式:假设根据主键去重,按照流水倒序,获取记录最后状态生成的表为 delta 表;
  1. 第一种方式:不过滤删除流水,生成下图 delta 表。(不管是否是删除操作,都获取统一主键最后变更的那条流水。)大数据:数据同步
  2.  第二种方式:过滤最后一条删除流水,生成下图 delta 表。(如果统一主键最后变更的那条流水是删除操作,就获取倒数第二条流水。)大数据:数据同步
  3. 第三种方式:过滤删除流水和之前的流水,生成下图 delta 表。(如果在同一主键变更的过程中有删除操作,则根据操作时间将该删除操作做对应的流水和之前的流水都过滤掉)大数据:数据同步
  • 对于采用哪种方式处理删除数据,要看前端是如何删除数据的。
  • 前端业务系统删除数据的方式一般有两种:正常业务数据删除、手工批量删除。
  1. 手工批量删除:通常针对类似的场景,业务系统只做逻辑删除,不做物理删除,DBA 定期将部分历史数据直接删除或者备份到备份库。
  2. 一般情况下,可以采用不过滤的方式来处理,下游通过是否删除巨佬的标识来判断记录是否有效。如果明确业务数据不存在业务上的删除,但是存在批量删除或备份数据删除,则可以采用只过滤最后一条删除流水的方式,通过状态字段来标识删除记录是否有效。

二、阿里数据仓库的同步方式

  •  数据仓库的特性之一是集成,将不同的数据来源、不同形式的数据整合在一起,所以从不同业务系统将各类数据源同步到数据仓库是一切的开始。
  • 阿里数据仓库的数据同步的特点:
  1. 数据来源的多样性。(除了结构化的数据,还有大量非结构化数据,特别是日志数据,这类数据通常直接以文本形式记录在文件系统中,对于数据的分析、统计、挖掘等各类数据应用有极大的价值。)
  2. 数据量巨大。(目前大型互联网企业的大数据系统每条同步的数据量达到 PB 级别(1 PB = 1024 TB),而阿里的大数据系统 MaxCompute 的数据存储达到 EB 级别(1 EB = 1024 PB),每天需要同步的数据量达到 PB 级)
  • 针对不同的数据源类型和数据应用的时效性要求,采用不同的同步方式。

1、批量数据同步

  • 对于离线类型的数据仓库应用,需要将不同的数据源批量同步到数据仓库,以及将经过数据仓库处理的结果数据定时同步到业务系统。
  • 数据仓库系统是集成了各类数据源的地方,数据类型是统一的。
  • 要实现各类数据库系统与数据仓库系统之间的批量双向数据同步,需要先将数据转换成中间状态,统一数据格式。
  • 由于各数据库系统的数据都是结构化的,均支持标准的 SQL 语言查询,所以所有的数据类型都可以转换成字符串类型。因此,通过将各类数据库系统的数据类型统一转换为字符串类型的方式,实现数据格式的统一。
  • 阿里集团的 DataX :一个能满足多方向高自由度的异构数据交换服务产品。
  1. DataX 通过插件的形式提供支持,将数据从数据源读出并转换为中间状态,同时维护好数据的传输、缓存等工作。
  2. 数据在 DataX 中以中间状态存在,并在目标数据系统中将中间状态的数据转换为对应的数据格式后写入。
  3.  DataX 通过分布式模式,批量同步处理数据。
  4. 大数据:数据同步
  • DataX 采用 Framework + Plugin 的开放式框架实现,Framework 处理缓冲、流程控制、并发、上下文加载等高速数据减缓的大部分技术问题,并提供鉴定的接口与插件介入。
  • 数据传输在单进程(单机模式)/多进程(分布式模式)下完成,传输过程全内存操作,不读写磁盘,也没有进程间通信,实现了在异构数据库或文件系统之间的高速数据交换。
  • 大数据:数据同步
  1.  Job:数据同步作业。
  2. Spliter:作业切分模块。(将一个大任务分解成多个可以并发行的小任务)
  3. Sub-Job:数据同步作业切分后的小任务,或称之为 Task。
  4.  Reader:数据读入模块。(负责运行切分后的小任务,将数据从源系统装载到 DataX)
  5.  Channel:Reader 与 Writer 通过 Channel 交换数据。
  6.  Writer:数据写入模块。(负责将数据从 DataX 导入到目标数据系统)

2、实时数据同步

  • 天猫“双 11” 的数据大屏为例:对所产生的交易数据需要实时汇总,实现秒级的数据刷新。
  1. 方法:通过解析 MySQL 的binlog 日志(相当于 Oracle 的归档日志)来实时获得增量的数据更新,并通过消息订阅模式来实现数据的实时同步。
  2. 具体操作:建立数据交易中心,通过专门的模块从每台服务器源源不断地读取日志数据,或者解析业务数据库系统的 binlog 或归档日志,将增量数据以数据流的方式不断同步到日志交换中心,然后通知所有订阅了这些数据的数据仓库系统来获取。
  • 阿里集团的 Time Tunnel(TT)系统就是这样的实时数据传输平台:是一种基于生产者、消费者和 Topic 消息标识的消息中间件,将消息数据持久化到 HBase 的高可用、分布式数据交互系统。
  • 大数据:数据同步
  1. 生产者(Client):消息数据的产生端,向 Time Tunnel 集群发送消息数据。
  2. 消费者(Client):消息数据的接收端,从 Time Tunnel 集群中获取数据进行业务处理。
  3. Topic:消息类型的标识,如淘宝 acookie 日志的 Topic 为 tao_acookie,生成 Client 和消费 Client 均需要知道对应的Topic 名字。
  4.  Broker 模块:负责处理客户端手法消息数据的请求,然后往 HBase 取发数据。

三、数据同步遇到的问题与解决方案

1、分库分表处理

  • 问题及要求:业务不断增长,数据量飞速增加,需要系统具备灵活的扩展能力和高并发大数据量的处理能力;
  • 解决方案:数据库系统采用分布式分库分表方案。(见下图)
  • 大数据:数据同步
  • 例:阿里集团的 TDDL(Taobao Distributed Data Layer)就是这样一个分布式数据库的访问引擎:通过建立中间状态的逻辑表来整合统一分库分表访问。
  1. TDDL 是在持久层框架之下、JDBC 驱动之上的中间件,与 JDBC 规范保持一致,有效解决了分库分表的规则引擎问题,实现了 SQL 解析、规则计算、表名替换、选择执行单元并合并结果集的功能,同时解决了数据库表的读写分离、高性能主备切换的问题,实现了数据库配置信息的统一管理。
  2. 大数据:数据同步

2、高效同步和批量同步

  • 数据同步的方法通常是:先创建目标表,再通过同步工具的填写数据库连接、表、字段等各种配置信息后测试完成数据同步。
  • 问题:
  1. 业务发展,数据量增大,传统方法完成工作的工作量增大,而且,相似并重复的操作降低开发人员的工作热情;
  2. 数据仓库的数据源种类特别丰富,遇到不同类型的数据同步,开发人员需要去了解其特色配置;
  3. 部分真正的数据需求方,如 Java 开发和业务运营,由于存在相关数据同步的专业技能部门,往往需要将需求提交给数据开发方来完成,额外增加了沟通和流程成本;
  • 解决方案以阿里集团为例:数据仓库研发了 OneClick 产品;
  1. 对不同数据源的数据同步配置透明化,可以通过库名和表名唯一定位,通过 IDB 接口获取源数据信息自动生成配合信息;
  2. 简化了数据同步操作步骤,实现了与数据同步相关的建表、配置任务、发布、测试操作一键化处理,并且封装成 Web 接口进一步达到批量化的效果;
  3. 降低了数据同步的技能门槛,让数据需求方更加方便地获取和使用数据。
  • OneClick:实现了数据的一键化和批量化同步,一键完成 DDL 和 DML 的生成、数据的冒烟测试以及在生成环境中测试等。
  • IDB:阿里集团用于统一管理 MySQL、OceanBase、PostgreSQL、Oracle、SQL Server等关系型数据库的平台,是一种集数据管理、结构管理、诊断优化、实时监控和系统管理与一体的数据管理服务。(在对集团数据库表统一管服务过程中,IDB 产出了数据库、表、字段各个级别元数据信息,并提供了元数据接口服务)

3、增量与全量同步的合并

  • 问题:传统数据同步方式为周期全量数据同步,但随着业务发展数据量的急剧增加,周期全量同步的效率太低了。
  • 解决方案:每个周期只同步增量数据,然后与上一个同步周期获取的全量数据进行合并,获取最新版本的全量数据。
  • 传统数据整合方案:merge 方式(update + insert );
  • 当前大数据平台不支持 update 操作,而采用:全外连接(full outer join) + 数据全覆盖重新加载(insert overwrite);(即如日调度,则将当天的增量数据和前一天的全量数据做全外连接,重新加载最新的全量数据)
  • 如果担心数据更新错误:每条保持一个最新的全量版本,保留较短的事件周期。(另外,当业务系统的表有物理删除数据的操作,而数据仓库需要保留所有历史数据时,也可以选择这种方式,在数据仓库中永久保留最新的全量数据快照。)
  • 例:淘宝订单表
  • 大数据:数据同步

4、同步性能的处理

  • 数据同步任务是针对不同数据看系统之间的数据同步问题而创建的一些列周期调度的任务。在代行的数据调度工作台上,每条会运行大量的数据同步任务。针对数据同步任务,一般首先需要设定首轮同步的线程数,然后运行同步任务。这样的数据同步模式存在以下几个问题:
  1. 有些数据同步任务的总线程数达不到用户设置的首轮同步的线程数时,如果同步控制器将这些同步县城分发到 CPU 比较繁忙的机器上,将导致这些同步任务的平均同步速度非常低,数据同步速度非常慢;
  2. 用户不清楚该如何设置首轮同步的线程数,基本都会设置成一个固定的值,导致同步任务因得不到合理的 CPU 资源而影响同步效率;
  3. 不同的数据同步任务的重要程度是不一样的,但是同步控制器平等对待接收到的同步线程,导致重要的同步线程因得不到 CPU 资源而无法同步;
  • 上述三种情况可能会导致同步任务不稳定。
  • 阿里集团的解决思路:通过目标数据库的元数据估算同步任务的总线程数,以及通过系统预先定义的期望同步速度估算首轮同步的线程数,同时通过数据同步任务的业务优先级决定同步线程的优先级,最终提升同步任务的执行效率和稳定性。
  • 具体实现步骤:
  1. 用户创建数据同步任务,并提交该同步任务;
  2. 根据系统提前获知及设定的数据,估算该同步任务需要同步的数据量、平均同步速度、首轮运行期望的线程数、需要同步的总线程数;
  3. 根据需要同步的总线程数将待同步的数据拆分成等数据量的数据块,一个线程处理一个数据块,并将该任务对应的所有线程提交至同步控制器;
  4. 同步控制器判断需要同步的总线程数是否大于首轮运行期望的线程数,若大于,则跳转至(5);若不大于,则跳转至(6);
  5. 同步控制器采用多机多线程的数据同步模式,准备该任务的第一轮线程的调度,优先发送等待时间最长、优先级最高且同一任务的线程;
  6. 同步控制器准备一定数量(期望首轮线程数 - 总线程数)的虚拟线程,采用单机多线程的数据同步模式,准备该任务响应实体线程和虚拟线程的调度,优先发送等待时间最长、优先级最高且单机 CPU 剩余资源可以支持首轮所有线程数且同一任务的线程,如果没有满足条件的机器,则选择 CPU 剩余资源最多的机器进行首轮发送。
  7. 数据任务同步开始,并等待完成;
  8. 数据任务同步结束。

5、数据漂移的处理

  • ODS:通常把从源系统同步进入数据仓库的第一层数据成为 ODS 或者 staging 层数据。
  •  数据漂移:指 ODS 表的同一个业务日期数据中包含前一天或后一天凌晨附近的数据,或者丢失当天的变更数据。
  • 由于 ODS 需要承接面向历史的细节数据查询需求,这就需要物理落地到数据仓库的 ODS 表按时间段来切分进行分区存储;(通常的做法是按某些时间戳字段来切分,而实际上往往由于时间戳字段的准确性问题导致发生数据漂移)
  • 问题:
  • 通常,时间戳分为四类(根据其中的某一个字段来切分 ODS 表,导致产生数据漂移):
  1. 数据库表中用来表示数据记录更新时间的时间戳字段(假设这类字段叫 modified_time);
  2. 数据库日志中用来表示数据记录更新时间的时间戳字段(假设这类字段叫 log_time);
  3. 数据库表中用来记录具体业务过程发生时间的时间戳字段(假设这类字段叫 proc_time);
  4. 标识数据记录被抽取到时间的时间戳字段(假设这类字段叫 extract_time);
  • 理论上以上几个时间应该是一致的,但实际生产中,这几个时间往往会出现差异,可能的原因有一下几点:
  1. 由于数据抽取是需要时间的,extract_time 往往会晚于前三个时间;
  2. 前台业务系统手工订正数据时未更新 modified_time;
  3. 由于网络或者系统压力问题,log_time 或者 modified_time 会晚于 proc_time;
  • 数据漂移场景:
  1. 根据 extract_time 来获取数据(ODS 数据)。(数据漂移问题最明显)
  2. 根据 modified_time 限制。(最常见,但是往往会发生不更新 modified_time 而导致的数据遗漏,或者凌晨事件产生的数据记录漂移到前一天。)
  3. 根据 log_time 限制。(由于网络或者系统压力问题,log_time 会晚于 proc_time,从而导致凌晨时间产生的数据记录漂移到后一天。)(例:天猫“双11”大促期间,凌晨时间产生的数据非常大,用户支付需要调用多个接口,导致 log_time 晚于实际的支付时间)
  4. 根据 proc_time 限制。(得到的 ODS 表只是包含一个业务过程所产生的记录,会遗漏很多其他过程的变化记录,违背了 ODS 和业务系统保持一致的设计原则)
  • 两种解决方法:
  • (1)多获取后一天的数据
  1. ODS 每个时间分区中,向前、向后多冗余一些数据,保障数据智慧多不会少,而具体的数据切分让下游根据自身不同的业务场景用不同的业务时间 proc_time 来限制。
  2. 缺点:产生数据误差。(例:一个订单是当天支付的,但是第二条凌晨申请退款关闭了该订单,那么这条记录的订单状态会被更新,下游在统计支付订单状态时会出现错误)
  • (2)通过多个时间戳字段限制时间来获取相对准确的数据
  1. 首先,根据 log_time 分别冗余前一天最后 15 分钟的数据和后一天凌晨开始 15 分钟的数据,并用 modified_time 过滤非当天数据,确保数据不会因为系统问题而遗漏;
  2. 然后,根据 log_time 获取后一天 15分钟的数据;针对此数据,按照主键根据 log_time 做升级排列去重。(因为最终需要获取的是最接近当天记录变化的数据(数据库日志将保留所有变化的数据,但是落地到 ODS 表的是根据主键去重获取最后状态变化的数据))
  3. 最后,将前两步的结果数据做全外连接,通过限制业务时间 proc_time 来获取所需要的数据。
  • 例:淘宝交易订单的数据漂移;
  •  “双 11”的交易订单中,有一大批在 11 月 11 日 23:59:59 左右支付的交易订单漂移到了 12 日。主要原因是用户下单支付后系统需要调用支付宝的接口而有所延迟,从而导致这些订单最终生成的时间咵天了。即 modified_time 和 log_time 都晚于 proc_time。
  • 难点:
  1. 如果订单只有一个支付业务过程,则可以用支付时间类限制就能获取到正确的数据。但是往往实际订单有多个业务过程:下单、支付、成功,每个业务过程都有响应的时间戳字段,并不只有支付数据会漂移。
  2. 若果直接通过多获取后一天的数据,然后限制这些时间,则可以获取到相关数据,倒数后一天的数据可能已经更新多次,直接获取到的那条记录已经是更新多次后的状态,数据的准确性存在问题。
  • 解决方法:
  1. 根据实际情况获取后一天 15 分钟的数据,并限制多个业务过程的时间戳字段(下单、支付、成功)都是 “双 11” 当天的,然后对这些数据按照订单的 modified_time 做升序排列,获取每个订单首次数据变更的那条记录;
  2. 根据 log_time 分别冗余前一天最后 15 分钟的数据和后一天凌晨开始 15 分钟的数据,并用 modified_time 过滤非当天数据,针对每个订单按照 log_time 进行降序排列,去每个订单当天最后一次数据变更的那条记录。
  3. 最后将两份数据根据订单做全外连接,精漂移数据回补到当天数据中。