Findhy's Blog

Art is long, Life is short.

Hadoop数据倾斜问题总结

| Comments

首先解释什么是数据倾斜问题,举一个例子,图书馆有A0、A1、A2…A9共10个书架,A0书架有1000本书,而A1到A9书架有100本书,现在要统计图书的总数量,这时我们会发现A1到A9很快就统计完了,而A0书架需要统计很长时间,显然A0成为整个统计过程的瓶颈。

Hadoop最根本的思想就是分而治之,数据倾斜会导致分布式的计算效率依赖单个节点,这与它的初衷是背道而驰的,但是Hadoop又没有从根本层面解决这个问题,所以需要我们在业务设计和开发时来解决这个问题,最好的解决方法是设计时避免让这个问题出现。如果无法避免,那么解决数据倾斜问题和核心思想只有一个:让map输出的key均匀分布到各个reduce中去(mapjoin例外,直接在内存中解决,牺牲内存空间提高效率,只适用于较小的输入数据)

产生数据倾斜的具体原因以及解决方案可以参考阿里的这篇博客:http://www.alidata.org/archives/2109

本文主要想总结一下各种处理方案的思路:

HBase通过Mysql建立二级索引

| Comments

HBase查询数据有三种模式:Rowkey查询、Rowkey范围scan和全表扫描,HBase的Rowkey查询效率很高,毫秒级的,所以我们应该尽量对HBase使用Rowkey查询,而避免范围和全表扫描,思路有很多种,一种是Rowkey的巧妙设计,例如Rowkey是userId#timestamp,这样就可以支持用户和时间戳的查询了,这种方式最简单但也比较局限,例如无法支持较复杂的查询条件,还有Rowkey设计与region数据分布的问题,我们总是希望数据在多个region上均匀分布,又希望经常被一起查的数据在同一个region上,所以全靠Rowkey设计来解决还是比较局限的,本文介绍通过Mysql来建立HBase二级索引的方式,来实现HBase高效率的分页查询、复杂条件组合查询。

思路

  • 在Mysql中设计HBase的索引表,Rowkey作为主键,其它查询条件作为其它字段,由于数据量很大,Mysql索引表需要做分表
  • 通过Python编码实现定时将HBase的Rowkey扫描到Mysql中,这里注意一个点,每次扫描完将最后的Rowkey记录在redis中,下次扫描从这个Rowkey开始
  • 前端分页查询先查询索引表,这样就能够实现和Mysql一样的分页效果,总行数还有直接跳转到具体页,HBase是实现不了直接跳转到具体页的
  • 从Mysql中根据查询条件查询到具体的Rowkey,再用这些Rowkey到HBase中批量查询,每一个都是Rowkey直接定位,批量查询效率也是毫秒左右返回,整体查询效率是在秒级响应的

HBase分页

| Comments

HBase分页的难点在于两点:获取总行数困难、没有行标号。根据HBase的特点获取数据只有三种模式:RowKey唯一定位、Rowkey范围扫描和全表扫描。那么分页可以采用Rowkey的范围扫描,每次扫描一定范围内的数据并且通过PageFilter过滤来限定扫描的行数,如果我们每页10条,那么每次就扫描11条,记住第11条的Rowkey,当点击下一页的时候,将startRowkey设置为第11条的Rowkey,就实现了分页的效果,下面是相关的核心代码,具体项目可以参考github-HBase-page

public class PageHBase {

    private Integer currentPageNo=1;//当前页码
    private Integer pageSize=5;//每页显示行数
    private Integer totalCount;//总行数
    private Integer totalPage;//总页数
    private Integer direction;//下一页:1 上一页2
    private Boolean hasNext=false;//是否有下一页
    private String nextPageRowkey;//下一页起始rowkey
    private List<Map<String, String>> resultList;//结果集List
    private Map<String,String> paramMap=new HashMap<String,String>();//分页查询参数
    private Map<Integer,String> pageStartRowMap=new HashMap<Integer,String>();//每页对应的startRow,key为currentPageNo,value为Rowkey
    private Scan scan=new Scan();

    public Scan getScan(String startRowkey,String endRowkey){
        scan.setCaching(100);
        if(direction==1&&hasNext){
            scan.setStartRow(Bytes.toBytes(startRowkey));
            scan.setStopRow(Bytes.toBytes(endRowkey));
        }else{
            if(pageStartRowMap.get(currentPageNo)!=null){
                scan.setStartRow(Bytes.toBytes(pageStartRowMap.get(currentPageNo)));
                scan.setStopRow(Bytes.toBytes(endRowkey));
            }else{
                scan.setStartRow(Bytes.toBytes(startRowkey));
                scan.setStopRow(Bytes.toBytes(endRowkey));
            }
        }
        this.hasNext=false;
        this.nextPageRowkey=null;
        return scan;
    }
}


public class AdunionHbaseService {

    private static final Logger log = LoggerFactory
            .getLogger(AdunionHbaseService.class);
    public static final String CLICK_COLUMN_NAME="c";
    public static final String CLICK_COLUMN_KEY="v";
    public static final String POSTBACK_COLUMN_NAME="p";
    public static final String POSTBACK_COLUMN_KEY="v";
    public static final String ADUNION_TABLE_NAME="adunion_active";
    private Configuration configuration;
    private String clientPort;
    private String retriesNumber;
    private String zookeeperQuorum;

    public AdunionHbaseService(String zookeeeper,String port,String retries){
        try {
            configuration = HBaseConfiguration.create();
            this.setClientPort(port);
            this.setRetriesNumber(retries);
            this.setZookeeperQuorum(zookeeeper);
            configuration.set("hbase.zookeeper.property.clientPort", this.getClientPort());
            configuration.set("hbase.client.retries.number", this.getRetriesNumber());
            configuration.set("hbase.zookeeper.quorum",this.getZookeeperQuorum());
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    public PageHBase getPageHBaseData(String tableName, String columnName,String columnKey,String businessId,String publisherId,String offerId,
            Date startDate,Date endDate, PageHBase pager) {
        HConnection con = null;
        HTable table = null;
        ResultScanner rs=null;
        List<Map<String,String>> resultList=new ArrayList<Map<String,String>>();
        try {
            if(startDate==null||endDate==null) return pager;

            con = HConnectionManager.createConnection(configuration);
            table=(HTable)con.getTable(tableName);

            StringBuffer startRow=new StringBuffer();
            StringBuffer endRow=new StringBuffer();
            startRow.append(startDate.getTime());
            endRow.append(endDate.getTime());
            if(pager.getNextPageRowkey()==null) pager.setNextPageRowkey(startRow.toString());
            Scan scan=pager.getScan(pager.getNextPageRowkey(),endRow.toString());

            List<Filter> filters=new ArrayList<Filter>();

            if(StringUtils.isNotBlank(publisherId)){
                SingleColumnValueFilter filter=new SingleColumnValueFilter(
                        Bytes.toBytes(columnName),
                        Bytes.toBytes("p"),
                        CompareFilter.CompareOp.EQUAL,
                        Bytes.toBytes(publisherId));
                filters.add(filter);
            }
            if(StringUtils.isNotBlank(offerId)){
                SingleColumnValueFilter filter=new SingleColumnValueFilter(
                        Bytes.toBytes(columnName),
                        Bytes.toBytes("o"),
                        CompareFilter.CompareOp.EQUAL,
                        Bytes.toBytes(offerId));
                filters.add(filter);
            }
            if(StringUtils.isNotBlank(businessId)){
                SingleColumnValueFilter filter=new SingleColumnValueFilter(
                        Bytes.toBytes(columnName),
                        Bytes.toBytes("b"),
                        CompareFilter.CompareOp.EQUAL,
                        Bytes.toBytes(businessId));
                filters.add(filter);
            }
            Filter pageFilter=new PageFilter(pager.getPageSize()+1);
            Filter familyFilter=new FamilyFilter(CompareFilter.CompareOp.EQUAL,
                    new BinaryComparator(Bytes.toBytes(columnName)));
            Filter qualifierFilter=new QualifierFilter(CompareFilter.CompareOp.EQUAL,
                    new BinaryComparator(Bytes.toBytes(columnKey)));
            filters.add(familyFilter);
            filters.add(qualifierFilter);
            filters.add(pageFilter);
            FilterList filterList=new FilterList(filters);
            scan.setFilter(filterList);
            rs = table.getScanner(scan);
            int totalRow=0;
            if(rs!=null){
                for(Result result : rs){
                    totalRow++;
                    if(totalRow==1){
                        pager.getPageStartRowMap().put(pager.getCurrentPageNo(),Bytes.toString(result.getRow()));
    pager.setTotalPage(pager.getPageStartRowMap().size());
    }
                    if(totalRow>pager.getPageSize()){
                        pager.setNextPageRowkey(new String(result.getRow()));
                        pager.setHasNext(true);
                    }else{
                        Map<String,String> map = new HashMap<String,String>();
                        for(KeyValue keyValue:result.raw()){
                            String family=new String(keyValue.getFamily());
                            byte[] value=keyValue.getValue();
                            //map.put("rowkey", new String(result.getRow()));
                            Map<Object,Object> jsonMap=JsonUtil.getMapFromJsonObjStr(new String(value));
                            for(Object obj:jsonMap.keySet()){
                                map.put(obj.toString(), jsonMap.get(obj).toString());
                            }
                        }
                        resultList.add(map);
                    }
                }
            }
            pager.setResultList(resultList);
        }catch(IOException ioe){
            log.error(ioe.getMessage());
        } catch (Exception e) {
            log.error(e.getMessage());
        }finally{
            try {
                rs.close();
            } catch (Exception e) {
                log.error(e.getMessage());
            }
            try {
                table.close();
            } catch (Exception e) {
                log.error(e.getMessage());
            }
            try {
                con.close();
            } catch (Exception e) {
                log.error(e.getMessage());
            }
        }
        return pager;
    }
}<p class='post-footer'>
        original link:
        <a href='http://findhy.github.io/blog/2014/11/28/hbase-page/'>http://findhy.github.io/blog/2014/11/28/hbase-page/</a><br/>
        written by <a href='http://findhy.github.io'>Findhy</a>
        &nbsp;posted at <a href='http://findhy.github.io'>http://findhy.github.io</a>
        </p>

Hadoop集群NameNode运行中进入安全模式的问题:NameNode Low on Available Disk Space

| Comments

问题描述:Hadoop集群在运行过程,早上来的时候发现NameNode自动进入安全模式,导致系统运行的MR任务失败。

最开始怀疑是HDFS的块复制没有达到最低的要求,通过hadoop fsck -blocks查看,Minimally replicated blocks为100%,说明blocks都达到了复制要求,最后查看了NameNode的日志发现几行警告信息:

2014-11-12 21:31:38,478 WARN org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker (org.apache.hadoop.hdfs.server.namenode.FSNamesystem$NameNodeResourceMonitor@1f1dd4b6): Space available o
n volume 'null' is 13119488, which is below the configured reserved amount 104857600
2014-11-12 21:31:38,478 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem (org.apache.hadoop.hdfs.server.namenode.FSNamesystem$NameNodeResourceMonitor@1f1dd4b6): NameNode low on available di
sk space. Entering safe mode.
2014-11-12 21:31:38,478 INFO org.apache.hadoop.hdfs.StateChange (org.apache.hadoop.hdfs.server.namenode.FSNamesystem$NameNodeResourceMonitor@1f1dd4b6): STATE* Safe mode is ON. 
Resources are low on NN. Please add or free up more resources then turn off safe mode manually. NOTE:  If you turn off safe mode before adding resources, the NN will immediately return to safe mode
. Use "hdfs dfsadmin -safemode leave" to turn safe mode off.

通过日志分析发现这样的错误:NameNode low on available disk space. Entering safe mode

在网上查到hadoop-jira上面有这样的记录:https://issues.apache.org/jira/browse/HDFS-4425

另外这里还有详细的解释:https://support.pivotal.io/hc/en-us/articles/201455807-Namenode-logs-reports-Space-available-on-volume-null-is-below-threshold-and-enters-safe-mode

原因是Hadoop的源码里面有一个类 NameNodeResourceChecker 负责检查NameNode的磁盘空间,如果磁盘空间低于100M则进入到安全模式,导致系统不可用,那么问题来了,是什么原因导致磁盘空间暴涨了呢?最后发现是HBase的DEBUG日志没有关闭导致日志文件非常大,最终磁盘空间占满,NameNode不可用。

Hadoop Streaming编程(Python)

| Comments

Hadoop Streaming是Hadoop提供的一个工具,可以允许开发者以非Java的编程语言开发Mapper和Reducer,而且在开发过程中,我们只需要专注于数据的输入和输出处理,其它的事情都交给这个工具来做。本例是用Python实现一个简单的Wordcount例子,环境为:

  • Hadoop:CDH4.3.2
  • Python:2.6.6
  • OS:Centos 6.5
  • Hadoop Streaming:hadoop-2.0.0-cdh4.3.2/share/hadoop/tools/lib/hadoop-streaming-2.0.0-cdh4.3.2.jar

基于云的Hadoop架构

| Comments

今天看了Netflix分享的基于AWS的大数据平台Hadoop架构,感觉这是未来大数据平台的发展模式,将整个数据仓库部署在云端,开发者不需要考虑集群的资源扩展,也不需要接触复杂的底层命令,通过一个简单RESTFul接口就可以提交我们的MapReduce任务,还可以实时看到任务和运行状态。

Mahout Tutorial

| Comments

Mahout 是一个开源的Java实现的可伸缩的机器学习库,它实现了常见的聚类和推荐等机器学习算法,并且可运行在 Hadoop 分布式平台处理上PB的数据,Mahout 社区和文档非常丰富,是目前搭建推荐系统或者其它算法平台的首选方案。