大数据框架整理
作者:水分子Andy 发布时间:[ 2017/1/22 10:42:46 ] 推荐标签:框架 大数据
kafka
1、kafka和jms的区别
2、kafka的topic理解
topic是逻辑存在的,真正在物理磁盘中的体现是partitioner,一个topic可以对应多个partition,不同的paritition存放在不同的broker中,以提高并发存储能力。
3、partitioner
partition是topic信息在屋里存储中的具体体现,在磁盘中它是一个文件夹,名字是topic名字_partition编号。4、segment
每个partition对对应多个segment文件,默认大小是1G,为了快速定位到指定的offset位置。
5、kafka为什么这么快
1/使用了操作系统使用的pagecache缓存,缓存大,缓存到一定量的数据时,以顺序写入的方 式写入到磁盘中。
因为:磁盘顺序写入的方式非常的快=>600MB/s,而随机存储只有100kb/s左右。
2/使用操作系统的sendfile技术。在读取信息发送的时候,不需要经过用户区,而是在os端直接发送,可以减少很多步骤。
6、为什么要多个partitioner7、为什么每个partitioner需要切分为多个segment文件
8、kafka的HA
对partitioner分区进行备份,利用zookeeper的选举机制选择leader。数据的生产存储和消费读取都是有leader负责,其他的replicatition只是负责备份而已。
9、kafka如何用shell脚本来讲一个文件读写进去?10、kafka如何用JavaAPI实现生产者和消费者?
大数据一站式解决方案:Scala和Spark部分
scala回顾
1、如何定义变量
2、如何定义函数、方法,如何在将函数作为方法的参数传入进去?
3、条件判断语句,循环控制语句
4、集合操作:Array、list、set、tuple、map (注意:可变和不可变的区别)5、样例类的使用6、trit、抽象类的使用7、主构造器和辅助构造器的使用
8、scala的高级特性
高阶函数:作为值得函数、匿名函数、闭包、柯里化
隐式转换:一个类对象中,如果他没有摸一个功能,但是我们有想要它实现,可以使用英式转换的方式。
object MyPredef{
//定义隐式转换方法
implicit def fileReadToRichFile(file: File)=new RichFile(file)
}
使用:
import MyPredef._9、Actor
写起来像多线程,用起来像socket10、akka
ActorSystem.actorOf()创建一个Actor,
创建的同时,是执行Actor中的prestart方法,去初始化一些信息。
Spark RDD
1、SparkRDD叫做:弹性分布式数据集,其实是一个类,用来描述:任务的数据从哪里读取、用那个算进行计算、得到的结果有存放在哪里、RDD之间的依赖关系是款以来还是窄依赖
2、RDD有五个特点
一系列分区
每个算子作用在每个分区上
一系列依赖关系
有位置(如果从HDFS上读取数据)
3、RDD的两种算子Transformation和Action
Transformation是懒加载,只是定义了这个算子的任务,该如何做,但是还没有做。
Action是立即执行,当执行到Action时,会触发DAGSchudle切分stage,切分完成后,有TaskScheduler将任务通过DriverActor发送到executor中执行。
4、RDD的几个复杂的Transformation
->combineByKey(x=>x,(a:List[String],b:String) => a :+ b,
(m:List[String],n:List[String])=> m ++ n)
第一个参数表示分组后的第一个值如何处理,
第二个参数表示后续的值和前一个值如何处理,
第三个参数表示,map端处理完成后,在reduce端如何对这些list进行处理。
->aggregate("初始量,可以是String也可以是int")(第一个func,第二个func)
初始量作用于没一个分区,第一个func作用于map端,第二个func作用于reduce端。
->reduceByKey(_+_) 作用于map端和reduce端,可以进行局部聚合。
其实reduceByKey和aggregateByKey在底层都调用了combineByKey方法来实现响应的功能。
->mapPartitions
对每一个分区进行操作,直接在里面使用匿名函数即可
当然如果逻辑非常复杂也是可以考虑在外面先定义好这个函数之后在传输进去。
rdd1.mapPartitions((it:Iterator[String]) => {
it.toList.map(x => (x,1)).iterator
})
->mapPartitionsWithIndex
首先定义一个函数,当然也可以写在里面作为匿名函数
val func = (index:Int, it:Iterator[Int]) => {
it.toList.map(x => ("index:" + index, x)).iterator
}
rdd1.mapPartitionsWithIndex(func).collect
5、RDD自定义Partitioner
//自定义分区器,重写里面的getPartition方法和numPartitions方法。
//构造这个对象的时候,把所有情况的信息传输过来,然后在里面进行分类处理。
class HostPartition(hostArr:Array[String]) extends Partitioner{
//对所有的数据进行分类,每一种类型对应一个int编号。所以使用map比较合适。
val map = new mutable.HashMap[String,Int]()
for(index <- 0 until(hostArr.length)){
map.put(hostArr(index),index)
}
//重写getPartition的方法。
override def getPartition(key: Any): Int = {
map.getOrElse(key.toString,0)
}
override def numPartitions: Int = hostArr.length
}
应用:
val hostPartition: HostPartition = new HostPartition(hostList)
val allPartitionRDD: RDD[(String, (String, Int))] = host_url_count.partitionBy(hostPartition)
6、自定义排序规则 ==>定义一个
case class Gril(yanzhi:Int,nianling:Int) extends Ordered[Gril] with Serializable{
override def compare(that: Gril): Int = {
val yanzhiResult: Int = this.yanzhi.compareTo(that.yanzhi)
if(yanzhiResult == 0){
return this.nianling.compareTo(that.nianling)
}
return yanzhiResult
}
}
应用:
val rdd2: RDD[(String, Int, Int)] = rdd1.sortBy(msg => Gril(msg._2,msg._3))
Spark的SQLContext 1、Spark整合Hive和HDFS 只需要将Hive的hive-site.xml ; hadoop的core-site.xml和hdfs-site.xml拷贝到Spark的conf目录下即可。Spark知道如何使用hive的表,同时也知道去哪个NameNode哪里都数据了。
2、DataFrame是什么?
是一个分布式数据集,对RDD的封装。RDD有的方法他基本上都有
3、DataFrame如何创建?
三种方式:->RDD + case class
->RDD + structType
->sqlContext.read.format.options(Map())
4、DataFrame首先需要注册成表结构之后才可以使用sqlContext来操作。
dF.registerTempTable("person")
5、使用sqlContext ==> 返回一个DataFrame
sqlContext.sql("select * from person")
6、DataFrame将数据写入到HDFS或者mysql中
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "815325")
//如果数据库中没有这个表,那么他也会创建一张表(很强大)
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系SPASVO小编(021-61079698-8054),我们将立即处理,马上删除。
相关推荐
更新发布
功能测试和接口测试的区别
2023/3/23 14:23:39如何写好测试用例文档
2023/3/22 16:17:39常用的选择回归测试的方式有哪些?
2022/6/14 16:14:27测试流程中需要重点把关几个过程?
2021/10/18 15:37:44性能测试的七种方法
2021/9/17 15:19:29全链路压测优化思路
2021/9/14 15:42:25性能测试流程浅谈
2021/5/28 17:25:47常见的APP性能测试指标
2021/5/8 17:01:11热门文章
常见的移动App Bug??崩溃的测试用例设计如何用Jmeter做压力测试QC使用说明APP压力测试入门教程移动app测试中的主要问题jenkins+testng+ant+webdriver持续集成测试使用JMeter进行HTTP负载测试Selenium 2.0 WebDriver 使用指南