分享

Hive中的InputFormat、OutputFormat与SerDe | 四号程序员

 openlog 2014-04-10

Hive中的InputFormat、OutputFormat与SerDe

前言

Hive中,默认使用的是TextInputFormat,一行表示一条记录。在每条记录(一行中),默认使用^A分割各个字段。

在有些时候,我们往往面对多行,结构化的文档,并需要将其导入Hive处理,此时,就需要自定义InputFormat、OutputFormat,以及SerDe了。

首先来理清这三者之间的关系,我们直接引用Hive官方说法:

SerDe is a short name for “Serializer and Deserializer.”
Hive uses SerDe (and !FileFormat) to read and write table rows.
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object
Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files

总结一下,当面临一个HDFS上的文件时,Hive将如下处理(以读为例):

(1) 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
(2) 调用SerDe的Deserializer,将一行(Row),切分为各个字段。

当HIVE执行INSERT操作,将Row写入文件时,主要调用OutputFormat、SerDe的Seriliazer,顺序与读取相反。

本文将对InputFormat、OutputFormat、SerDe自定义,使Hive能够与自定义的文档格式进行交互:

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>

如上所示,每篇文档用<DOC>和</DOC>分割。文档之中的每行,为key=value的格式。

1、自定义InputFormat

Hive的InputFormat来源于Hadoop中的对应的部分。需要注意的是,其采用了mapred的老接口。

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.coder4.hive;
 
import java.io.IOException;
 
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
 
public class DocFileInputFormat extends TextInputFormat implements
        JobConfigurable {
 
    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
            JobConf job, Reporter reporter) throws IOException {
        reporter.setStatus(split.toString());
        return new DocRecordReader(job, (FileSplit) split);
    }
}

在本文实现中,我们省略了压缩、解压缩等细节,如果需要,可以参考Hadoop官方的实现。

在上述的InputFormat中,只是简单的实现了接口。对文档进行切分的业务逻辑,在DocRecordReader中完成。

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.coder4.hive;
 
import java.io.IOException;
 
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
 
public class DocRecordReader implements RecordReader<LongWritable, Text> {
 
    // Reader
    private LineRecordReader reader;
    // The current line_num and lin
    private LongWritable lineKey = null;
    private Text lineValue = null;
    // Doc related
    private StringBuilder sb = new StringBuilder();
    private boolean inDoc = false;
    private final String DOC_START = "<DOC>";
    private final String DOC_END = "</DOC>";
 
    public DocRecordReader(JobConf job, FileSplit split) throws IOException {
        reader = new LineRecordReader(job, split);
        lineKey = reader.createKey();
        lineValue = reader.createValue();
    }
 
    @Override
    public void close() throws IOException {
        reader.close();
    }
 
    @Override
    public boolean next(LongWritable key, Text value) throws IOException {
        while (true) {
            // get current line
            if (!reader.next(lineKey, lineValue)) {
                break;
            }
            if (!inDoc) {
                // not in doc, check if <doc>
                if (lineValue.toString().startsWith(DOC_START)) {
                    // reset doc status
                    inDoc = true;
                    // clean buff
                    sb.delete(0, sb.length());
                }
            } else {
                // indoc, check if </doc>
                if (lineValue.toString().startsWith(DOC_END)) {
                    // reset doc status
                    inDoc = false;
                    // set kv and return
                    key.set(key.get() + 1);
                    value.set(sb.toString());
                    return true;
                } else {
                    if (sb.length() != 0) {
                        sb.append("\n");
                    }
                    sb.append(lineValue.toString());
                }
            }
        }
        return false;
    }
 
    @Override
    public float getProgress() throws IOException {
        return reader.getProgress();
    }
 
    @Override
    public LongWritable createKey() {
        return new LongWritable(0);
    }
 
    @Override
    public Text createValue() {
        return new Text("");
    }
 
    @Override
    public long getPos() throws IOException {
        return reader.getPos();
    }
 
}

如上的代码中,使用了LineRecordReader,用于读取Split的每一行。为了节省内存,这里对lineValue、lineKey进行了复用。

2、自定义OutputFormat

OutputFormat负责写入,这里要注意的是,不能再照抄Hadoop的对应接口了,需要实现HiveOutputFormat。

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.coder4.hive;
 
import java.io.IOException;
import java.util.Properties;
 
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
 
@SuppressWarnings({ "rawtypes" })
public class DocFileOutputFormat<K extends WritableComparable, V extends Writable>
        extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
 
    public RecordWriter getHiveRecordWriter(JobConf job, Path outPath,
            Class<? extends Writable> valueClass, boolean isCompressed,
            Properties tableProperties, Progressable progress)
            throws IOException {
        FileSystem fs = outPath.getFileSystem(job);
        FSDataOutputStream out = fs.create(outPath);
 
        return new DocRecordWriter(out);
    }
}

类似的,业务逻辑在如下的RecordWriter中:

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.coder4.hive;
 
import java.io.IOException;
 
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
 
public class DocRecordWriter implements RecordWriter {
 
    private FSDataOutputStream out;
    private final String DOC_START = "<DOC>";
    private final String DOC_END = "</DOC>";
 
    public DocRecordWriter(FSDataOutputStream o) {
        this.out = o;
    }
 
    @Override
    public void close(boolean abort) throws IOException {
        out.flush();
        out.close();
    }
 
    @Override
    public void write(Writable wr) throws IOException {
        write(DOC_START);
        write("\n");
        write(wr.toString());
        write("\n");
        write(DOC_END);
        write("\n");
    }
 
    private void write(String str) throws IOException {
        out.write(str.getBytes(), 0, str.length());
    }
 
}

3、自定义SerDe or UDF?

在自定义InputFormat、OutputFomat后,我们已经将Split拆分为了 多个Row(文档)。

接下来,我们需要将Row拆分为Field。此时,我们有两个技术选择:

(1) 写一个UDF,将Row拆分为kv对,以Map<K, V>返回。此时,Table中只需定义一个STRING类型变量即可。
(2) 实现SerDe,将Row直接转化为Table对应的字段。

先来看一下UDF的这种方法,在Json解析等字段名不确定(或要经常变更) 的 应用场景下,这种方法还是比较适用的。

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
package com.coder4.hive;
 
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.UDF;
 
public class DocToMap extends UDF {
    public Map<String, String> evaluate(String s) {
        return Doc.deserialize(s);
    }
}

其中Doc的deserilize只是自定义方法,无需重载方法或继承接口。

使用时的方法为:

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  doc STRING
)
STORED AS
  INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
  OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'
;
 
add jar /xxxxxxxx/hive-test.jar;
 
CREATE TEMPORARY FUNCTION doc_to_map AS 'com.coder4.hive.DocToMap';
 
SELECT
    raw['id'],
    raw['name']
FROM
(
    SELECT
        doc_to_map(doc) raw
    FROM
        test_table
) t;

4、自定义SerDe

如果选择自定义SerDe,实现起来要略微麻烦一点。

这里主要参考了一篇Blog,和官方的源代码

http://svn./repos/asf/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java

http://blog./blog/2012/12/how-to-use-a-serde-in-apache-hive/

# 四号程序员 http://www.
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.coder4.hive;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
 
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
 
public class MySerDe extends AbstractSerDe {
 
    // params
    private List<String> columnNames = null;
    private List<TypeInfo> columnTypes = null;
    private ObjectInspector objectInspector = null;
    // seperator
    private String nullString = null;
    private String lineSep = null;
    private String kvSep = null;
 
    @Override
    public void initialize(Configuration conf, Properties tbl)
            throws SerDeException {
        // Read sep
        lineSep = "\n";
        kvSep = "=";
        nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, "");
 
        // Read Column Names
        String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS);
        if (columnNameProp != null && columnNameProp.length() > 0) {
            columnNames = Arrays.asList(columnNameProp.split(","));
        } else {
            columnNames = new ArrayList<String>();
        }
 
        // Read Column Types
        String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
        // default all string
        if (columnTypeProp == null) {
            String[] types = new String[columnNames.size()];
            Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);
            columnTypeProp = StringUtils.join(types, ":");
        }
        columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);
 
        // Check column and types equals
        if (columnTypes.size() != columnNames.size()) {
            throw new SerDeException("len(columnNames) != len(columntTypes)");
        }
 
        // Create ObjectInspectors from the type information for each column
        List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>();
        ObjectInspector oi;
        for (int c = 0; c < columnNames.size(); c++) {
            oi = TypeInfoUtils
                    .getStandardJavaObjectInspectorFromTypeInfo(columnTypes
                            .get(c));
            columnOIs.add(oi);
        }
        objectInspector = ObjectInspectorFactory
                .getStandardStructObjectInspector(columnNames, columnOIs);
 
    }
 
    @Override
    public Object deserialize(Writable wr) throws SerDeException {
        // Split to kv pair
        if (wr == null)
            return null;
        Map<String, String> kvMap = new HashMap<String, String>();
        Text text = (Text) wr;
        for (String kv : text.toString().split(lineSep)) {
            String[] pair = kv.split(kvSep);
            if (pair.length == 2) {
                kvMap.put(pair[0], pair[1]);
            }
        }
 
        // Set according to col_names and col_types
        ArrayList<Object> row = new ArrayList<Object>();
        String colName = null;
        TypeInfo type_info = null;
        Object obj = null;
        for (int i = 0; i < columnNames.size(); i++) {
            colName = columnNames.get(i);
            type_info = columnTypes.get(i);
            obj = null;
            if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) {
                PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info;
                switch (p_type_info.getPrimitiveCategory()) {
                case STRING:
                    obj = StringUtils.defaultString(kvMap.get(colName), "");
                    break;
                case LONG:
                case INT:
                    try {
                        obj = Long.parseLong(kvMap.get(colName));
                    } catch (Exception e) {
                    }
                }
            }
            row.add(obj);
        }
 
        return row;
    }
 
    @Override
    public ObjectInspector getObjectInspector() throws SerDeException {
        return objectInspector;
    }
 
    @Override
    public SerDeStats getSerDeStats() {
        // Not suppourt yet
        return null;
    }
 
    @Override
    public Class<? extends Writable> getSerializedClass() {
        // Not suppourt yet
        return Text.class;
    }
 
    @Override
    public Writable serialize(Object arg0, ObjectInspector arg1)
            throws SerDeException {
        // Not suppourt yet
        return null;
    }
 
}

最终的Hive定义为:

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
add jar /xxxxxxxx/hive-test.jar;
 
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  id BIGINT,
  name STRING
)
ROW FORMAT SERDE 'com.coder4.hive.MySerDe'
STORED AS
  INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
  OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'

我们自定义的SerDe,会将每一个<DOC>内的文档,根据k=v切分,若key name为id,name,则将其置入对应的字段中。

5、测试,效果:

首先,我们在hdfs目录/user/heyuan.lhy/doc/ 放置了一个文件,内容如下:

# 四号程序员 http://www.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>

在如4中,定义了表的schema后,我们来SELECT。

# 四号程序员 http://www.
1
2
3
4
5
6
SELECT * FROM test_table;
OK
1       a
2       b
3       c
4       d

可以看到,id和name字段被分别解析出来了。

由于我们的SerDe没有实现serialize方法,因此无法实现写入。

如果有需要,可以使用UDF + Map的方法,完成。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多