Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulkload improve #150

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ void post(MessageQueryIndex index) {
return;
}
//使用bulkload方式上传
// hFileIndexStore.appendData(index, consumer);
hFileIndexStore.appendData(index, consumer);
// indexBatchBackup
indexBatchBackup.add(index, consumer);
// indexBatchBackup.add(index, consumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Pair;
import org.hbase.async.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,9 +28,7 @@

import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand Down Expand Up @@ -72,7 +68,9 @@ public class HFileIndexStore {
private FileSystem fs;
private MessageQueryIndex lastIndex;
private HFile.Writer writer;
private Map<byte[], KeyValue> map = new TreeMap<>(new org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator());
private Pair<byte[][], byte[][]> startEndKeys;
private TreeMap<byte[], KeyValue>[] datas;
private int datasSize;

public HFileIndexStore(BackupKeyGenerator keyGenerator) throws IOException {
this.config = new DefaultBackupConfig(DynamicConfigLoader.load("backup.properties", false));
Expand All @@ -99,6 +97,8 @@ public HFileIndexStore(BackupKeyGenerator keyGenerator) throws IOException {
this.MESSAGE_SIZE_PER_HFILE = this.config.getDynamicConfig().getInt(MESSAGE_SIZE_PER_HFILE_CONFIG_KEY, DEFAULT_MESSAGE_SIZE_PER_HFILE);
this.conn = ConnectionFactory.createConnection(this.conf);
this.fs = FileSystem.get(this.conf);
this.startEndKeys = this.conn.getRegionLocator(TableName.valueOf(TABLE_NAME)).getStartEndKeys();
this.datas = new TreeMap[this.startEndKeys.getFirst().length];
}

public void appendData(MessageQueryIndex index, Consumer<MessageQueryIndex> consumer) {
Expand Down Expand Up @@ -129,16 +129,31 @@ public void appendData(MessageQueryIndex index, Consumer<MessageQueryIndex> cons
long currentTime = System.currentTimeMillis();
KeyValue kv = new KeyValue(key, FAMILY_NAME, QUALIFIERS_NAME, currentTime, value);
//LOGGER.info("消息主题 subjectkey:" + subjectKey + " messageid:" + messageId + " key:" + new String(key));
//先添加到treemap中
map.put(key, kv);
if (map.size() >= MESSAGE_SIZE_PER_HFILE) {
datasSize++;
//找到该key对应region在startEndKeys中的下标
//如果该key不是边界值,Arrays.binarySearch会返回(-(insertion point) - 1,所以需要处理一下
int idx = Arrays.binarySearch(startEndKeys.getFirst(), key, org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR);
if (idx <= -2)
idx = -(idx + 1) - 1;
else if (idx == -1)
idx = 0;
if (datas[idx] == null)
datas[idx] = new TreeMap<>(new org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator());
datas[idx].put(key, kv);
if (datasSize >= MESSAGE_SIZE_PER_HFILE) {
//bulk load开始时间
long startTime = System.currentTimeMillis();
datasSize = 0;
try {
Path HFilePath = new Path(HFILE_PATH, new String(key));
writeToHfile(HFilePath);
for (TreeMap<byte[], KeyValue> map : datas) {
if (map == null) continue;
Path HFilePath = new Path(HFILE_PATH, "key" + new String(map.firstKey()));
writeToHfile(HFilePath, map);
map.clear();
}
startEndKeys = conn.getRegionLocator(TableName.valueOf(TABLE_NAME)).getStartEndKeys();
datas = new TreeMap[startEndKeys.getFirst().length];
bulkLoad(HFILE_PATH);
map.clear();
if (consumer != null) consumer.accept(lastIndex);
} catch (IOException e) {
LOGGER.error("Message Index Bulk Load fail", e);
Expand All @@ -148,7 +163,7 @@ public void appendData(MessageQueryIndex index, Consumer<MessageQueryIndex> cons
}
}

private void writeToHfile(Path path) throws IOException {
private void writeToHfile(Path path, TreeMap<byte[], KeyValue> map) throws IOException {
HFileContext fileContext = new HFileContext();
try {
writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Pair;
import org.hbase.async.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,9 +78,10 @@ public class HFileRecordStore {
private final int MESSAGE_SIZE_PER_HFILE;
private Connection conn;
private FileSystem fs;
private byte[] lastKey;
private HFile.Writer writer;
private Map<byte[], KeyValue> map = new TreeMap<>(new org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator());
private Pair<byte[][], byte[][]> startEndKeys;
private TreeMap<byte[], KeyValue>[] datas;
private int datasSize;

public HFileRecordStore(BackupKeyGenerator keyGenerator, RocksDBStore rocksDBStore) throws IOException {
this.config = new DefaultBackupConfig(DynamicConfigLoader.load("backup.properties", false));
Expand All @@ -106,6 +108,8 @@ public HFileRecordStore(BackupKeyGenerator keyGenerator, RocksDBStore rocksDBSto
this.MESSAGE_SIZE_PER_HFILE = this.config.getDynamicConfig().getInt(MESSAGE_SIZE_PER_HFILE_CONFIG_KEY, DEFAULT_MESSAGE_SIZE_PER_HFILE);
this.conn = ConnectionFactory.createConnection(this.conf);
this.fs = FileSystem.get(this.conf);
this.startEndKeys = this.conn.getRegionLocator(TableName.valueOf(TABLE_NAME)).getStartEndKeys();
this.datas = new TreeMap[this.startEndKeys.getFirst().length];
}

public void appendData(ActionRecord record, Consumer<MessageQueryIndex> consumer) {
Expand Down Expand Up @@ -139,17 +143,32 @@ public void appendData(ActionRecord record, Consumer<MessageQueryIndex> consumer
long currentTime = System.currentTimeMillis();
KeyValue kv = new KeyValue(key, FAMILY_NAME, QUALIFIERS_NAME, currentTime, value);
//LOGGER.info("消息轨迹主题 subject:" + message.getSubject() + " key:" + new String(key));
map.put(key, kv);
lastKey = key;
datasSize++;
//找到该key对应region在startEndKeys中的下标
//如果该key不是边界值,Arrays.binarySearch会返回(-(insertion point) - 1,所以需要处理一下
int idx = Arrays.binarySearch(startEndKeys.getFirst(), key, org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR);
if (idx <= -2)
idx = -(idx + 1) - 1;
else if (idx == -1)
idx = 0;
if (datas[idx] == null)
datas[idx] = new TreeMap<>(new org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator());
datas[idx].put(key, kv);
}
if (map.size() >= MESSAGE_SIZE_PER_HFILE) {
if (datasSize >= MESSAGE_SIZE_PER_HFILE) {
//bulk load开始时间
long startTime = System.currentTimeMillis();
datasSize = 0;
try {
Path HFilePath = new Path(HFILE_PATH, new String(lastKey));
writeToHfile(HFilePath);
for (TreeMap<byte[], KeyValue> map : datas) {
if (map == null) continue;
Path HFilePath = new Path(HFILE_PATH, "key" + new String(map.firstKey()));
writeToHfile(HFilePath, map);
map.clear();
}
startEndKeys = conn.getRegionLocator(TableName.valueOf(TABLE_NAME)).getStartEndKeys();
datas = new TreeMap[startEndKeys.getFirst().length];
bulkLoad(HFILE_PATH);
map.clear();
} catch (IOException e) {
LOGGER.error("Record Info Bulk Load fail", e);
} finally {
Expand All @@ -158,7 +177,7 @@ public void appendData(ActionRecord record, Consumer<MessageQueryIndex> consumer
}
}

private void writeToHfile(Path path) throws IOException {
private void writeToHfile(Path path, TreeMap<byte[], KeyValue> map) throws IOException {
HFileContext fileContext = new HFileContext();
try {
writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ public void appendLogs(long startOffset, ByteBuf body) {
if (skipBackSubjects.getBoolean(realSubject, false)) {
continue;
}
recordBackup.add(new ActionRecord(action), null);
// recordBackup.add(new ActionRecord(action), null);
//改用bulk load方式上传
// hFileRecordStore.appendData(new ActionRecord(action), null);
hFileRecordStore.appendData(new ActionRecord(action), null);
checkpointManager.addSyncActionLogOffset(body.readerIndex() - start);
}
}
Expand Down