博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark缓存策略
阅读量:5846 次
发布时间:2019-06-18

本文共 2906 字,大约阅读时间需要 9 分钟。

当对同一个rdd多次执行action时,如果在磁盘上则每次执行action都会从磁盘将数据加载,如果将其缓存到内存中会提高再次action的读取速度,Spark缓存主要有cache()和persist()两种,当缓存一个rdd时,每一个节点上都会存放这个rdd的partition,当要使用rdd的时候可以直接从内存读出。
cache源码:
def cache(self):        """        Persist this RDD with the default storage level (C{MEMORY_ONLY}).        """        self.is_cached = True        self.persist(StorageLevel.MEMORY_ONLY)        return self

从源码可以看出,cache底层调用的是persist方法,传入的参数是:StorageLevel.MEMORY_ONLY,再看persist()方法:

def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):        self.is_cached = True        javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)        self._jrdd.persist(javaStorageLevel)        return self

persist方法,传入的参数是StorageLevel,从StorageLevel的源码可以看出它的值总共有6种,因此persist()相比cache()在缓存形式上更为丰富,不仅支持内存的方式,还支持内存和磁盘、内存副本等方式。

 

StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)

 

持久化到内存和直接从磁盘读取时间对比:

import osimport timefrom pyspark import SparkContext, SparkConfconf = SparkConf()sc = SparkContext(conf=conf)current_dir = os.path.dirname(os.path.realpath(__file__))file_path = "{}/name_age.txt".format(current_dir)def cached():    start_time = time.time()    text_rdd = sc.textFile("file://{}".format(file_path)).cache()    text_rdd.count()    text_rdd.count()    end_time = time.time()    print("{}:{}".format("first cache", end_time - start_time))    start1_time = time.time()    text1_rdd = sc.textFile("file://{}".format(file_path)).cache()    text1_rdd.count()    text1_rdd.count()    end1_time = time.time()    print("{}:{}".format("second cache", end1_time - start1_time))def uncached():    start_time = time.time()    text_rdd = sc.textFile("file://{}".format(file_path))    text_rdd.count()    text_rdd.count()    end_time = time.time()    print("{}:{}".format("first uncache", end_time - start_time))    start1_time = time.time()    text1_rdd = sc.textFile("file://{}".format(file_path))    text1_rdd.count()    text1_rdd.count()    end1_time = time.time()    print("{}:{}".format("second uncache", end1_time - start1_time))sc.stop()执行cached()结果:first cache:1.7104301452636719                                                  second cache:0.2717571258544922执行uncached()结果:first uncache:1.4453039169311523                                                second uncache:0.49161386489868164

从执行结果可以看出,当第二次执行rdd.count()时,有cache情况下是0.2717571258544922;无cache情况下是0.49161386489868164,由于我的内存空间不足,所以不太明显,当数据量大且内存充足的时候,持久化到内存的效率会远远高于磁盘。

对pyspark有兴趣的小伙伴可以关注我的github, 持续更新

 

转载于:https://www.cnblogs.com/FG123/p/9748772.html

你可能感兴趣的文章
Linux下修改文件创建时间(修改文件更改时间)
查看>>
我的友情链接
查看>>
make menuconfig出错(ncurses)解决方案
查看>>
ceph-objectstore-tool工具使用示例
查看>>
关于gevent的协程间通信及队列和事件event用法
查看>>
VC 中 字符串编程
查看>>
【移动开发】Android中图片过大造成内存溢出,OOM(OutOfMemory)异常解决方法
查看>>
在Linux中执行.sh脚本,异常/bin/sh^M: bad interpreter: No such file or directory
查看>>
备份事务日志 (SQL Server)
查看>>
Ubuntu操作系统安装使用教程
查看>>
一个GUI对话框界面
查看>>
Linux中sort 排序
查看>>
oracle 11g dataguard 物理备份搭建实验
查看>>
bash脚本编程循环结构+linux软件包管理(1)
查看>>
ssh环境搭建步骤
查看>>
学习LINXU目标
查看>>
jQuery+jsPlumb简易流程设计
查看>>
利用Solid Converter PDF提取Excel表格并打印
查看>>
MDN JavaScript重温笔记
查看>>
笔记本视频监控器
查看>>