背景

在搜狗网页搜索,我们会基于用户行为日志,使用Hadoop相关技术,对搜索流量、用户行为进行分析。但随着数据规模扩大、分析维度增多,之前的数据分析流程逐渐不能满足高速增长的需求。在2016年初,我们着手对现有数据分析流程进行改造,旨在规范数据分析流程、提高数据稳定性与正确性,并提升数据分析人员的开发效率,以应对繁多复杂的数据分析需求。

基于Kafka进行实时日志传输

日志收集是数据分析的起点,在搜狗有数十种产品线、以及各类服务器日志需要进行收集传输。早期的日志采集流程如下:

线上服务器在本地记录日志,每隔1小时将生成的日志上传HDFS,再经过一次跨级群拷贝将日志拷贝至计算集群HDFS,最后再通过定时脚本将HDFS的原始日志解析并导入至Hive。可见老数据传输流程存在如下问题:

为此,我们引入了Kafka改造数据传输流程,新的流程如下:

在新的数据传输流程中,所有的日志均通过网络进行传输,最终写入Kafka,对于批处理流程,将数据导入Hive,同时也可接入一些流计算引擎直接对Kafka的数据进行实时计算。线上服务器在不同的机房均有部署,我们在Kafka集群前增加了一层Scribed集群,与线上服务器进行部署在同机房,当Kafka集群异常后,数据堆在Scribed集群,不会在线上服务器堆积。

对于Kafka导入Hive,我们基于Flume开发了一个Sink插件flume-hive-batch-sink,有如下特性:

经过架构调整后,数据传输流程延迟由2~3小时缩短至2分钟,ORC行列存储使得压缩率提升30%,计算性能大幅提升,同时为后续使用使用流计算引擎进行实时分析提供了可能。

规范数据仓库流程

由Kafka经过导入到Hive的表,只是经过简单的日志模板解析,我们叫它Default表。后续我们会经过ETL流程对Default表进行数据清洗,最终生成的表,我们叫它Custom表。在ETL过程中,我们会对不同类别的日志进行join、PV日志与点击日志join、抽取字段、过滤去重、维度指标计算等操作。最后,我们会推荐用户最终使用Custom表进行统计,字段丰富,并且经过过滤去重,易用性强,并且由于用户无需编写复杂的过滤逻辑,使不同用户编写的统计代码的数据准确性有了保证。

早期我们使用Pig/Hive脚本进行ETL流程的编写,开发效率较低,需要使用Pig Latin或HiveQL表达复杂的逻辑,而且还需要编写UDF并打包成jar文件,通常一个复杂的ETL流程需要2~3天的开发周期。

近期我们引入了Spark DataFrame API来编写ETL任务,可以混用SQL和Scala,使得开发效率大幅提升。同时基于Spark计算引擎,计算效率也较MapReduce引擎有所提升。同时,我们还引入了统一的建表和ETL接口,用户只需要实现API返回一个DataFrame即可,无需关注导入Hive的具体流程,进一步降低用户的开发门槛。如下为一个简单的ETL流程样例代码,继承HiveTransformer接口,实现transform方法,返回一个DataFrame即可:

class ExampleHiveETLProcessor extends HiveTransformer {
  override def transform(sqlContext: HiveContext, date: String) = {
    def toCity = udf((ip: String) => {
      IPUtils.ipToCity(ip)
    })

    val df = sqlContext.sql(
      s"""
         |SELECT t1.timestamp, t1.userId, ti.ip, t1.query, t2.clickUrl
         |FROM default.pvTable t1 JOIN default.clickTable t2 ON t1.id = t2.id
         |WHERE t1.date = $date AND t2.date = $date
      """.stripMargin)

    df.withColumn("city", toCity(df("ip")))
  }
}

通过使用DataFrame API和ETL API,用户的开发效率大幅提升,一个ETL流程的开发周期缩短至半天,由于ORC和Spark引擎的应用,计算性能也得到一定提升。

更新SQL引擎技术栈

早期我们的SQL引擎技术栈较为单一,只是使用Pig/Hive基于数据仓库进行统计,计算耗时较久。我们对于SQL的需求有如下几种:

对于定时统计任务,我们逐渐迁移至Spark平台,主要考虑点是Spark API表达力更强,容易与其他数据源进行集成,性能较MapReduce有也所提升,逐渐统一计算引擎也便于我们进行统一管理和优化。在使用Spark过程中,也遇到一些坑,主要是在大数据量下容易跑挂,一般用户通常无法定位问题,需要我们进行介入排查,当然随着Spark的快速发展,相信稳定性会逐渐提升,Spark 1.6版本已经有较大提升,之前写过一篇博客介绍了一些坑,详见:Spark性能优化总结

对于即系查询,我们引入了Presto查询引擎,Presto是Facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节,响应时间小于1秒到几分钟。我们主要使用Presto去查询Hive数据仓库,做一些临时性数据分析、数据探索,对于简单的查询基本可以秒级返回结果。这块我们目前的使用规模不大,尚处于探索阶段,后续我们会把它上线内部的一个在线SQL查询系统,暴露给更多的用户使用。

我们之前的内部数据分析系统,一般采用HBase进行存储,用户需编写一些复杂的MapReduce入库程序,并写使用Get/Scan API对HBase进行查询,这对我们的数据统计人员而言开发成本有些高,他们更熟悉SQL语言而非Java API。我们后续引入了Phoenix,作为HBase上的SQL引擎。Phoenix对用户暴露的接口很友好,用户可以使用Create Table语句建表,使用JDBC接口进行查询,我们还为用户提供了Phoenix入库的API,使得用户可以方便地完成导入。不过设计Phoenix表时,需要设计好主键(对应HBase的rowkey),并确定好自己的查询方式,做到按rowkey进行区域查询,而非扫全表。对于多维度,可以考虑创建二级索引,不过会降低写入速度。Phoenix还支持salt、skip scan、pre split等功能,表设计和优化时也可以考虑下。

开发者SDK - Bigdatakit

随着Hadoop生态的发展,各种新技术层出不穷,对我们用户的负担越来越大,需要学习各种新技术、搭建各种开发运行环境,Hadoop的技术门槛逐渐成为用户的拦路虎。为此,我们开始建立Bigdatakit项目,愿景是简化用户的大数据分析流程。我们希望暴露给用户的是功能接口,如ETL API、流计算API,技术实现被隐藏在底层,用户只需要用最少的代码实现自己的业务逻辑即可。

如下是一个使用Spark Streaming实时输出Kafka Topic内容的程序,用户只需要使用Java或Scala语言实现LineProcessor的process方法,来处理Kafka中存储的每一行数据即可:

public class EchoLineProcessor implements LineProcessor {
  public void process(String message) {
    System.out.println("process: " + message);
  }
}

然后使用bigdatakit命令行工具提交任务即可,只需要指定topic等必要参数即可:

bigdatakit spark-streaming \
  -Dtopics=myTopic \
  -Dprocessor=com.sogou.bigdatakit.example.streaming.EchoLineProcessor \
  bigdatakit-example-spark-streaming-1.0-SNAPSHOT.jar

总结与展望

通过对数据传输、数据仓库、SQL引擎、开发者SDK的改进,数据分析流程得到改善和规范,数据实时性和计算性能得到增强,开发者的开发效率也得到提升。我们后续会进一步优化我们的数据分析流程,并且会尝试探索一些流式计算的架构。