提升Hive操作Amazon S3读写数据的性能

通常通过Amazon S3读取和写入数据的速度也要慢于使用HDFS,即使Amazon S3存储桶与基于Amazon EC2基础架构上运行的Hadoop集群,原因如下:

1、在Amazon S3上,重命名是非常昂贵的复制操作。 在提交阶段,重命名表面的性能下降,其中包括:

  • MapReduce FileOutputCommitter
  • DistCp在复制操作后重命名
  • Hadoop FileSystem shell输入操作,如果垃圾箱已启用,则输入-rm

2、写操作只能从close()操作完成后才开始,操作可能需要很长时间并且其中的某些过程可能会超时;

3、性能问题也可能由于HTTP请求的大小而发生。

Hive操作S3权限的优化

参数推荐值参数说明

hive.warehouse.subdir.inherit.permsfalse由于S3没有文件权限的概念,请设置hive.warehouse.subdir.inherit.perms = false以减少文件权限检查的次数hive.metastore.pre.event.listeners(空)由于S3没有目录权限的概念,请设置hive.metastore.pre.event.listeners =(设置为空值)以减少S3中目录权限检查的次数 以上参数设置在hive-site.xml中。

提升Hive操作Amazon S3读写数据的性能

Hive提高读取ORC格式Job的优化

参数推荐值参数说明hive.orc.compute.splits.num.threads默认值为10如果使用ORC格式并且希望改进拆分计算时间,则可以增加hive.orc.compute.splits.num.threads(默认值为10)。此参数控制计算拆分中涉及的并行线程的数量。请注意,对于Parquet,它仍然是单线程的,所以拆分计算可能需要更长的Parquet和S3hive.orc.splits.include.file.footertrue如果将ORC格式与ETL文件拆分策略一起使用,则可以设置hive.orc.splits.include.file.footer = true以搭载拆分有效内容中的文件页脚信息 可以使用Hive CLI中的--hiveconf选项或使用Beeline中的set命令来设置这些参数。

加速ETL作业

参数推荐值参数说明hive.stats.fetch.partition.statsfalsehive.stats.fetch.partition.stats如果没有可用的统计信息或hive.stats.fetch.partition.stats = false,则查询启动可能会稍慢。在这种情况下,Hive最终会为每个要访问的文件查看文件大小,调整hive.metastore.fshandler.threads有助于减少Metastore操作所需的总时间fs.trash.interval0由于操作涉及将文件移动到垃圾箱(副本+删除),因此在S3中,drop table可能会很慢,因此可以将fs.trash.interval = 0设置为完全跳过垃圾桶可以使用Hive CLI中的--hiveconf选项或使用Beeline中的set命令来设置这些参数。

在Hive中加速插入参数

推 荐值参数说明hive.mv.files.thread默认值为15插入数据时,Hive将数据从临时文件夹移动到最终位置。这个移动操作实际上是一个复制+删除操作,这在Amazon S3中很贵;向S3写入的数据越多,操作的成本就越高。为了加速该过程,您可以调整hive.mv.files.thread(默认值为15),具体取决于数据集的大小。提高容器分配性能

参数推荐值参数说明yarn.scheduler.capacity.node-locality-delay0由于AWS不具备机架位置的概念,设置yarn.scheduler.capacity.node-locality-delay = 0可以启用更快的容器,更多信息,请参阅Capacity Scheduler相关文档优化HTTP GET请求注:这些优化参数,当前还初于实验性阶段,将来其行为可能会改变

实验性的fadvice策略介绍

S3A文件系统客户端支持输入策略的概念,类似于POSIX fadvise()API调用的概念。 这会调整S3A客户端的行为以优化各种用例的HTTP GET请求。 要优化HTTP GET请求,可以使用S3A实验输入策略fs.s3a.experimental.input.fadvise:

策略描述sequential (default)整个文档在一个HTTP请求中被请求,被顺序读取,通过跳过中间数据支持在预读范围内的前向搜索,这会让读取操作吞吐量达到最大,但会带来非常昂贵的后向搜索。normal当前的行为同sequential是一样的random针对随机IO进行了优化,特别是针对Hadoop的`PositionedReadable`操作 - 使用seek(offset);read(byte_buffer)`操作同样也有提升。

与顺序读取的不同,HTTP请求读取不是整个文件,而是通过`read`操作设置所需要读取的数据长度的范围 - 如果有必要,可能通过对通过setReadahead()的值的范围进行四舍五入的操作。

通过降低关闭现有HTTP请求的成本,这对于文件IO通过一系列`PositionedReadable.read()`和`PositionedReadable.readFully()`调用来访问二进制文件是非常有效的。顺序读取文件是很昂贵的,因为现在许多HTTP请求必须通过文件读取。 对于一些需要顺序读取文件的操作(如复制,DistCp,读取gzip或其他压缩格式,解析.csv文件等),顺序策略是适当的, 这是默认设置,所以你不需要配置它。

对于高性能随机访问IO(例如,访问ORC文件)的特定情况,您可以考虑在以下情况下使用随机策略:

  • 使用PositionedReadable API读取数据;
  • 要顺序读取很多内容才能够找到需要的内容;
  • 同时需要读取文件的前面和后面的内容。
  • 很少使用单字符read()调用或者少量的缓冲区read(buffer)操作;
  • 应用程序的地理位置区域与Amazon S3数据存储区域在一起或者很近,也就是说,运行应用程序的EC2 VM与Amazon S3存储区位于同一区域;

实验性的fadvice策略配置

创建文件系统实例时,必须在配置选项fs.s3a.experimental.input.fadvise中设置所需的fadvise策略,它只能在每个文件系统的基础上设置,而不能基于每个文件读取进行设置, 你可以在core-site.xml中设置它:

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.experimental.input.fadvise</name>
  3. <value>random</value>
  4. <description>Policy for reading files.
  5. Values: 'random', 'sequential' or 'normal'
  6. </description>
  7. </property>

也可以在spark-defaults.xml配置文件中进行如下设置:

[html] view plain copy

  1. spark.hadoop.fs.s3a.experimental.input.fadvise random

请注意,这种随机访问性能是以连续IO为代价的 - 这包括读取使用gzip压缩的文件。

用S3A快速上传写入数据(注:这些优化参数,当前还初于实验性阶段,将来其行为可能会改变)

S3对象存储的本质,写入到S3A OutputStream的数据不会是逐步写入的 - 相反,在默认情况下,它会先缓存到磁盘,直到流的close()方法关闭完成才开始写入S3中。这可能会使输出变慢,因为:

  • OutputStream.close()的执行时间与缓冲的数据量成正比,与带宽成反比。那是O(数据/带宽)。
  • 带宽是从主机到S3的可用带宽:在上载时同一进程,服务器或网络中的其他工作可能会增加上传时间,因此会增加close()调用的持续时间。
  • 如果在调用OutputStream.close()之前上载数据的进程失败,则所有数据都将丢失。
  • 承载fs.s3a.buffer.dir中定义的临时目录的磁盘必须具有存储整个缓冲文件的能力。

综上所述,从S3端点进一步处理的时间越长,或者EC2 VM越小,完成工作的时间就越长。这会在应用程序代码中造成问题:

  • 代码通常假设close()调用很快;延误可能会在运营中造成瓶颈。
  • 非常缓慢的上传有时会导致应用程序超时 - 通常,上传期间线程阻塞会停止报告进度,从而触发超时。
  • 在上传开始之前,流式传输大量数据可能会消耗所有磁盘空间。

解决写入慢的问题的方案一直在研发之中,目前可以通过“S3A快速上传”来解决这个问题,要使用这个功能,需要增加一些配置。

S3A快速上传”的特点

“S3A快速上传”特点包括:

  • 通过参数fs.s3a.multipart.size设置文件上传的大小,也就是说,分段上传开始的阈值和每次上传的大小是相同的;
  • 缓冲区缓冲到磁盘(默认)或堆内内存或堆外内存;
  • 后台线程并行上传块;
  • 一旦缓冲数据超过此分区大小,立即开始上传块;
  • 将数据缓存到磁盘时,使用fs.s3a.buffer.dir中列出的一个或多个目录,可以缓冲的数据大小限于可用磁盘空间;
  • 生成输出统计信息作为文件系统的度量标准,包括活动和待处理块上载的统计信息;
  • close()执行的时间由上传剩余数据量决定,而不是文件的总大小;

通过块的增量写入,“S3A快速上传”的上传时间至少与“传统”机制一样快,特别是对长时输出流以及生成大量数据时具有显着优势, 内存缓冲机制也可以对S3端点附近运行时提供加速,因为此时中间数据存储是通过内存而不是磁盘。

启用“S3A快速上传”

要启用快速上载机制,请将fs.s3a.fast.upload属性设置为true:

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.fast.upload</name>
  3. <value>true</value>
  4. <description>
  5. Use the incremental block upload mechanism with
  6. the buffering mechanism set in fs.s3a.fast.upload.buffer.
  7. The number of threads performing uploads in the filesystem is defined
  8. by fs.s3a.threads.max; the queue of waiting uploads limited by
  9. fs.s3a.max.total.tasks.
  10. The size of each buffer is set by fs.s3a.multipart.size.
  11. </description>
  12. </property>

核心配置选项

以下主要配置选项可用于“S3A快速上传”:

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.fast.upload.buffer</name>
  3. <value>disk</value>
  4. <description>
  5. The buffering mechanism to use when using S3A fast upload
  6. (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
  7. This configuration option has no effect if fs.s3a.fast.upload is false.
  8. "disk" will use the directories listed in fs.s3a.buffer.dir as
  9. the location(s) to save data prior to being uploaded.
  10. "array" uses arrays in the JVM heap
  11. "bytebuffer" uses off-heap memory within the JVM.
  12. Both "array" and "bytebuffer" will consume memory in a single stream up to the number
  13. of blocks set by: fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
  14. If using either of these mechanisms, keep this value low
  15. The total number of threads performing work across all threads is set by
  16. fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
  17. work items.
  18. </description>
  19. </property>
  20. <property>
  21. <name>fs.s3a.multipart.size</name>
  22. <value>100M</value>
  23. <description>How big (in bytes) to split upload or copy operations up into.
  24. A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
  25. </description>
  26. </property>
  27. <property>
  28. <name>fs.s3a.fast.upload.active.blocks</name>
  29. <value>8</value>
  30. <description>
  31. Maximum Number of blocks a single output stream can have
  32. active (uploading, or queued to the central FileSystem
  33. instance's pool of queued operations.
  34. This stops a single stream overloading the shared thread pool.
  35. </description>
  36. </property>

注意:

  • 如果写入流中的数据量的大小低于fs.s3a.multipart.size中设置的数据量的大小,则上传将在OutputStream.close()操作完成后执行 - 与原始输出流一样。
  • 已发布的Hadoop度量标准监视器,包含活动队列长度和上传操作计数,以确定何时存在工作积压或数据生成速率与网络带宽之间的不匹配情况。每个流的统计信息也可以通过在当前流上调用toString()方法来记录。
  • 增量写入不可见;只有在close()调用中的多部分操作完成时,才能列出或读取该对象,在上载完成之前该对象将被阻塞。

使用磁盘缓冲区快速上载

当fs.s3a.fast.upload.buffer设置为磁盘时,所有数据在上载前都会缓存到本地硬盘。这最大限度地减少了消耗的内存量,因此消除了堆大小作为排队上载中的限制因素 - 与fs.s3a.fast.upload = false时使用的原始“直接到磁盘”缓冲完全一样。

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.fast.upload</name>
  3. <value>true</value>
  4. </property>
  5. <property>
  6. <name>fs.s3a.fast.upload.buffer</name>
  7. <value>disk</value>
  8. </property>
  9. <property>
  10. <name>fs.s3a.buffer.dir</name>
  11. <value></value>
  12. <description>Comma separated list of temporary directories use for
  13. storing blocks of data prior to their being uploaded to S3.
  14. When unset, the Hadoop temporary directory hadoop.tmp.dir is used</description>
  15. </property>

这是默认的缓冲机制, 可以缓冲的数据量受可用磁盘空间量的限制。

使用ByteBuffers快速上传

当fs.s3a.fast.upload.buffer设置为bytebuffer时,所有数据在上传前都会缓存在“直接”ByteBuffers中。 这可能比缓冲到磁盘更快,并且如果磁盘空间很小(例如,微小的EC2 VM),可能没有多少磁盘空间可供缓冲。

ByteBuffers在JVM的内存中创建,但不在Java堆本身中创建。 可以缓冲的数据量由Java运行时,操作系统以及YARN应用程序限制每个容器请求的内存量。

上传到S3的带宽越慢,内存耗尽的风险就越大 - 因此在调整上传线程设置时需要更加小心,以减少可以缓存的等待上载的最大数据量(请参阅下文)。

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.fast.upload</name>
  3. <value>true</value>
  4. </property>
  5. <property>
  6. <name>fs.s3a.fast.upload.buffer</name>
  7. <value>bytebuffer</value>
  8. </property>

使用Arrays快速上传

当fs.s3a.fast.upload.buffer设置为Arrays时,所有数据都会在上传之前缓存在JVM堆中的字节数组中, 这比缓冲到磁盘更快。

可以缓冲的数据量受JVM堆堆的可用大小的限制, 写入S3带宽越慢,堆溢出的风险就越大, 这种风险可以通过调整上传线程设置来缓解(见下文)。

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.fast.upload</name>
  3. <value>true</value>
  4. </property>
  5. <property>
  6. <name>fs.s3a.fast.upload.buffer</name>
  7. <value>array</value>
  8. </property>

S3A快速上传线程调整

数组和字节缓冲区缓冲机制都会分别消耗大量的堆内或堆外内存,磁盘缓冲机制不占用太多内存,但会消耗硬盘容量。

如果在单个进程中写入的输出流很多,则所用内存或磁盘的数量是所有流的活动内存/磁盘使用量的倍数。

如果想减少内存耗尽的风险,尤其是在数据缓冲在内存中的情况下。有一些可以调整的参数:

  1. 调整文件系统中可用于数据上载或任何其他排队文件系统操作的线程总数,这在fs.s3a.threads.max参数中设置;
  2. 调整排队等待执行的操作数:fs.s3a.max.total.tasks;
  3. 调整单个输出流可以激活的任务块的数量,即由线程上载或在文件系统线程队列中排队的块数:fs.s3a.fast.upload.active.blocks;
  4. 空闲线程在退出之前可以留在线程池中多久:fs.s3a.threads.keepalivetime;
  5. 当达到单个流的活动块的最大允许数量时,在该活动块的一个或多个上传完成之前,不能从该流上传更多块。那就是:write()调用会触发一个现在完整的数据块的上传,直到队列中有容量时才会被阻塞;

结论

  1. 由于在fs.s3a.threads.max中设置的线程池是共享的(并且打算在所有线程中使用),因此这里设置较大的数字,就可以允许更多的并行操作。但是由于上传需要网络带宽,添加更多线程并不一定能够保证起到加速作用,因而该参数与当前网络状况相关;
  2. 线程池的额外任务队列(fs.s3a.max.total.tasks)涵盖了所有正在进行的后台S3A操作(未来计划包括:并行化重命名操作,异步目录操作);
  3. 当使用内存缓冲时,可以通过设置fs.s3a.fast.upload.active.blocks来限制每个流操作可以使用的内存;
  4. 使用磁盘缓冲时,设置较大fs.s3a.fast.upload.active.blocks的值,虽然不会消耗太多内存,但它可能导致大量的块与其他文件系统操作之间的竞争;

我们建议先把fs.s3a.fast.upload.active.blocks的值设置的低一点 - 至少足以启动后台上传,但不会使用系统的其他部分超载,然后通过试验,不断的调整该值,看看是否能够提供更高的吞吐量 - 尤其是要针EC2上运行的虚拟机多做些测试。

[html] view plain copy

  1. <property>
  2. <name>fs.s3a.fast.upload.active.blocks</name>
  3. <value>4</value>
  4. <description>
  5. Maximum Number of blocks a single output stream can have
  6. active (uploading, or queued to the central FileSystem
  7. instance's pool of queued operations.
  8. This stops a single stream overloading the shared thread pool.
  9. </description>
  10. </property>
  11. <property>
  12. <name>fs.s3a.threads.max</name>
  13. <value>10</value>
  14. <description>The total number of threads available in the filesystem for data
  15. uploads *or any other queued filesystem operation*.</description>
  16. </property>
  17. <property>
  18. <name>fs.s3a.max.total.tasks</name>
  19. <value>5</value>
  20. <description>The number of operations which can be queued for execution</description>
  21. </property>
  22. <property>
  23. <name>fs.s3a.threads.keepalivetime</name>
  24. <value>60</value>
  25. <description>Number of seconds a thread can be idle before being
  26. terminated.</description>
  27. </property>

改善负载均衡行为

Amazon S3使用一组前端服务器来提供对底层数据的访问,有关使用哪个前端服务器的决定是通过负载均衡DNS服务处理的,当查找Amazon S3存储桶的IP地址时,根据前端服务器的当前负载来选择返回到客户端的IP地址。

随着时间的推移,前端的负载会发生变化,因此那些被认为是“轻负载”的服务器会发生变化,这意味着如果DNS值被缓存了很长时间,应用程序最终可能会与超负荷的服务器连接;或者在出现故障的情况下,他们最终可能会尝试与已经不存服务器连接。

而且,出于历史安全原因,在小应用程序时代,默认情况下,JVM的DNS TTL设置为“无穷大”。

要改善AWS负载平衡,请将与Amazon S3配合使用的应用程序的DNS生存时间设置为低于默认值的值,这请参阅AWS文档中的设置DNS名称查找的JVM TTL。

相关推荐