original link:
http://findhy.github.io/blog/2016/02/15/hello-baidu/
written by Findhy
posted at http://findhy.github.io
Hadoop数据倾斜问题总结
首先解释什么是数据倾斜问题,举一个例子,图书馆有A0、A1、A2…A9共10个书架,A0书架有1000本书,而A1到A9书架有100本书,现在要统计图书的总数量,这时我们会发现A1到A9很快就统计完了,而A0书架需要统计很长时间,显然A0成为整个统计过程的瓶颈。
Hadoop最根本的思想就是分而治之,数据倾斜会导致分布式的计算效率依赖单个节点,这与它的初衷是背道而驰的,但是Hadoop又没有从根本层面解决这个问题,所以需要我们在业务设计和开发时来解决这个问题,最好的解决方法是设计时避免让这个问题出现。如果无法避免,那么解决数据倾斜问题和核心思想只有一个:让map输出的key均匀分布到各个reduce中去(mapjoin例外,直接在内存中解决,牺牲内存空间提高效率,只适用于较小的输入数据)
产生数据倾斜的具体原因以及解决方案可以参考阿里的这篇博客:http://www.alidata.org/archives/2109
本文主要想总结一下各种处理方案的思路:
HBase通过Mysql建立二级索引
HBase查询数据有三种模式:Rowkey查询、Rowkey范围scan和全表扫描,HBase的Rowkey查询效率很高,毫秒级的,所以我们应该尽量对HBase使用Rowkey查询,而避免范围和全表扫描,思路有很多种,一种是Rowkey的巧妙设计,例如Rowkey是userId#timestamp,这样就可以支持用户和时间戳的查询了,这种方式最简单但也比较局限,例如无法支持较复杂的查询条件,还有Rowkey设计与region数据分布的问题,我们总是希望数据在多个region上均匀分布,又希望经常被一起查的数据在同一个region上,所以全靠Rowkey设计来解决还是比较局限的,本文介绍通过Mysql来建立HBase二级索引的方式,来实现HBase高效率的分页查询、复杂条件组合查询。
思路
- 在Mysql中设计HBase的索引表,Rowkey作为主键,其它查询条件作为其它字段,由于数据量很大,Mysql索引表需要做分表
- 通过Python编码实现定时将HBase的Rowkey扫描到Mysql中,这里注意一个点,每次扫描完将最后的Rowkey记录在redis中,下次扫描从这个Rowkey开始
- 前端分页查询先查询索引表,这样就能够实现和Mysql一样的分页效果,总行数还有直接跳转到具体页,HBase是实现不了直接跳转到具体页的
- 从Mysql中根据查询条件查询到具体的Rowkey,再用这些Rowkey到HBase中批量查询,每一个都是Rowkey直接定位,批量查询效率也是毫秒左右返回,整体查询效率是在秒级响应的
HBase分页
HBase分页的难点在于两点:获取总行数困难、没有行标号。根据HBase的特点获取数据只有三种模式:RowKey唯一定位、Rowkey范围扫描和全表扫描。那么分页可以采用Rowkey的范围扫描,每次扫描一定范围内的数据并且通过PageFilter过滤来限定扫描的行数,如果我们每页10条,那么每次就扫描11条,记住第11条的Rowkey,当点击下一页的时候,将startRowkey设置为第11条的Rowkey,就实现了分页的效果,下面是相关的核心代码,具体项目可以参考github-HBase-page
public class PageHBase {
private Integer currentPageNo=1;//当前页码
private Integer pageSize=5;//每页显示行数
private Integer totalCount;//总行数
private Integer totalPage;//总页数
private Integer direction;//下一页:1 上一页2
private Boolean hasNext=false;//是否有下一页
private String nextPageRowkey;//下一页起始rowkey
private List<Map<String, String>> resultList;//结果集List
private Map<String,String> paramMap=new HashMap<String,String>();//分页查询参数
private Map<Integer,String> pageStartRowMap=new HashMap<Integer,String>();//每页对应的startRow,key为currentPageNo,value为Rowkey
private Scan scan=new Scan();
public Scan getScan(String startRowkey,String endRowkey){
scan.setCaching(100);
if(direction==1&&hasNext){
scan.setStartRow(Bytes.toBytes(startRowkey));
scan.setStopRow(Bytes.toBytes(endRowkey));
}else{
if(pageStartRowMap.get(currentPageNo)!=null){
scan.setStartRow(Bytes.toBytes(pageStartRowMap.get(currentPageNo)));
scan.setStopRow(Bytes.toBytes(endRowkey));
}else{
scan.setStartRow(Bytes.toBytes(startRowkey));
scan.setStopRow(Bytes.toBytes(endRowkey));
}
}
this.hasNext=false;
this.nextPageRowkey=null;
return scan;
}
}
public class AdunionHbaseService {
private static final Logger log = LoggerFactory
.getLogger(AdunionHbaseService.class);
public static final String CLICK_COLUMN_NAME="c";
public static final String CLICK_COLUMN_KEY="v";
public static final String POSTBACK_COLUMN_NAME="p";
public static final String POSTBACK_COLUMN_KEY="v";
public static final String ADUNION_TABLE_NAME="adunion_active";
private Configuration configuration;
private String clientPort;
private String retriesNumber;
private String zookeeperQuorum;
public AdunionHbaseService(String zookeeeper,String port,String retries){
try {
configuration = HBaseConfiguration.create();
this.setClientPort(port);
this.setRetriesNumber(retries);
this.setZookeeperQuorum(zookeeeper);
configuration.set("hbase.zookeeper.property.clientPort", this.getClientPort());
configuration.set("hbase.client.retries.number", this.getRetriesNumber());
configuration.set("hbase.zookeeper.quorum",this.getZookeeperQuorum());
} catch (Exception e) {
log.error(e.getMessage());
}
}
public PageHBase getPageHBaseData(String tableName, String columnName,String columnKey,String businessId,String publisherId,String offerId,
Date startDate,Date endDate, PageHBase pager) {
HConnection con = null;
HTable table = null;
ResultScanner rs=null;
List<Map<String,String>> resultList=new ArrayList<Map<String,String>>();
try {
if(startDate==null||endDate==null) return pager;
con = HConnectionManager.createConnection(configuration);
table=(HTable)con.getTable(tableName);
StringBuffer startRow=new StringBuffer();
StringBuffer endRow=new StringBuffer();
startRow.append(startDate.getTime());
endRow.append(endDate.getTime());
if(pager.getNextPageRowkey()==null) pager.setNextPageRowkey(startRow.toString());
Scan scan=pager.getScan(pager.getNextPageRowkey(),endRow.toString());
List<Filter> filters=new ArrayList<Filter>();
if(StringUtils.isNotBlank(publisherId)){
SingleColumnValueFilter filter=new SingleColumnValueFilter(
Bytes.toBytes(columnName),
Bytes.toBytes("p"),
CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(publisherId));
filters.add(filter);
}
if(StringUtils.isNotBlank(offerId)){
SingleColumnValueFilter filter=new SingleColumnValueFilter(
Bytes.toBytes(columnName),
Bytes.toBytes("o"),
CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(offerId));
filters.add(filter);
}
if(StringUtils.isNotBlank(businessId)){
SingleColumnValueFilter filter=new SingleColumnValueFilter(
Bytes.toBytes(columnName),
Bytes.toBytes("b"),
CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(businessId));
filters.add(filter);
}
Filter pageFilter=new PageFilter(pager.getPageSize()+1);
Filter familyFilter=new FamilyFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(columnName)));
Filter qualifierFilter=new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(columnKey)));
filters.add(familyFilter);
filters.add(qualifierFilter);
filters.add(pageFilter);
FilterList filterList=new FilterList(filters);
scan.setFilter(filterList);
rs = table.getScanner(scan);
int totalRow=0;
if(rs!=null){
for(Result result : rs){
totalRow++;
if(totalRow==1){
pager.getPageStartRowMap().put(pager.getCurrentPageNo(),Bytes.toString(result.getRow()));
pager.setTotalPage(pager.getPageStartRowMap().size());
}
if(totalRow>pager.getPageSize()){
pager.setNextPageRowkey(new String(result.getRow()));
pager.setHasNext(true);
}else{
Map<String,String> map = new HashMap<String,String>();
for(KeyValue keyValue:result.raw()){
String family=new String(keyValue.getFamily());
byte[] value=keyValue.getValue();
//map.put("rowkey", new String(result.getRow()));
Map<Object,Object> jsonMap=JsonUtil.getMapFromJsonObjStr(new String(value));
for(Object obj:jsonMap.keySet()){
map.put(obj.toString(), jsonMap.get(obj).toString());
}
}
resultList.add(map);
}
}
}
pager.setResultList(resultList);
}catch(IOException ioe){
log.error(ioe.getMessage());
} catch (Exception e) {
log.error(e.getMessage());
}finally{
try {
rs.close();
} catch (Exception e) {
log.error(e.getMessage());
}
try {
table.close();
} catch (Exception e) {
log.error(e.getMessage());
}
try {
con.close();
} catch (Exception e) {
log.error(e.getMessage());
}
}
return pager;
}
}<p class='post-footer'>
original link:
<a href='http://findhy.github.io/blog/2014/11/28/hbase-page/'>http://findhy.github.io/blog/2014/11/28/hbase-page/</a><br/>
written by <a href='http://findhy.github.io'>Findhy</a>
posted at <a href='http://findhy.github.io'>http://findhy.github.io</a>
</p>
Hadoop集群NameNode运行中进入安全模式的问题:NameNode Low on Available Disk Space
问题描述:Hadoop集群在运行过程,早上来的时候发现NameNode自动进入安全模式,导致系统运行的MR任务失败。
最开始怀疑是HDFS的块复制没有达到最低的要求,通过hadoop fsck -blocks查看,Minimally replicated blocks为100%,说明blocks都达到了复制要求,最后查看了NameNode的日志发现几行警告信息:
2014-11-12 21:31:38,478 WARN org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker (org.apache.hadoop.hdfs.server.namenode.FSNamesystem$NameNodeResourceMonitor@1f1dd4b6): Space available o
n volume 'null' is 13119488, which is below the configured reserved amount 104857600
2014-11-12 21:31:38,478 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem (org.apache.hadoop.hdfs.server.namenode.FSNamesystem$NameNodeResourceMonitor@1f1dd4b6): NameNode low on available di
sk space. Entering safe mode.
2014-11-12 21:31:38,478 INFO org.apache.hadoop.hdfs.StateChange (org.apache.hadoop.hdfs.server.namenode.FSNamesystem$NameNodeResourceMonitor@1f1dd4b6): STATE* Safe mode is ON.
Resources are low on NN. Please add or free up more resources then turn off safe mode manually. NOTE: If you turn off safe mode before adding resources, the NN will immediately return to safe mode
. Use "hdfs dfsadmin -safemode leave" to turn safe mode off.
通过日志分析发现这样的错误:NameNode low on available disk space. Entering safe mode
在网上查到hadoop-jira上面有这样的记录:https://issues.apache.org/jira/browse/HDFS-4425
原因是Hadoop的源码里面有一个类 NameNodeResourceChecker
负责检查NameNode的磁盘空间,如果磁盘空间低于100M则进入到安全模式,导致系统不可用,那么问题来了,是什么原因导致磁盘空间暴涨了呢?最后发现是HBase的DEBUG日志没有关闭导致日志文件非常大,最终磁盘空间占满,NameNode不可用。
Hadoop Streaming编程(Python)
Hadoop Streaming是Hadoop提供的一个工具,可以允许开发者以非Java的编程语言开发Mapper和Reducer,而且在开发过程中,我们只需要专注于数据的输入和输出处理,其它的事情都交给这个工具来做。本例是用Python实现一个简单的Wordcount例子,环境为:
- Hadoop:CDH4.3.2
- Python:2.6.6
- OS:Centos 6.5
- Hadoop Streaming:hadoop-2.0.0-cdh4.3.2/share/hadoop/tools/lib/hadoop-streaming-2.0.0-cdh4.3.2.jar
基于云的Hadoop架构
今天看了Netflix分享的基于AWS的大数据平台Hadoop架构,感觉这是未来大数据平台的发展模式,将整个数据仓库部署在云端,开发者不需要考虑集群的资源扩展,也不需要接触复杂的底层命令,通过一个简单RESTFul接口就可以提交我们的MapReduce任务,还可以实时看到任务和运行状态。
S3cmd 多账户配置
关于s3cmd的安装配置参考这篇文章:s3cmd Configure,本文介绍s3cmd多账户配置。
Introduction to Item-Based Recommendations With Hadoop
本文是Mahout官网上的Item-Based CF on Hadoop的例子,原文在这里Introduction to Item-Based Recommendations with Hadoop。
Mahout Tutorial
Mahout 是一个开源的Java实现的可伸缩的机器学习库,它实现了常见的聚类和推荐等机器学习算法,并且可运行在 Hadoop 分布式平台处理上PB的数据,Mahout 社区和文档非常丰富,是目前搭建推荐系统或者其它算法平台的首选方案。