资源预览内容
第1页 / 共11页
第2页 / 共11页
第3页 / 共11页
第4页 / 共11页
第5页 / 共11页
第6页 / 共11页
第7页 / 共11页
第8页 / 共11页
第9页 / 共11页
第10页 / 共11页
亲,该文档总共11页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述
hbasehbase 源码学习之源码学习之 putputhbase-0.90.5一,首先来看下 put 的构造函数:ts 为时间戳,public Put(byte row) this(row, null);, public Put(byte row, RowLock rowLock) this(row, HConstants.LATEST_TIMESTAMP, rowLock);, public Put(byte row, RowLock rowLock) this(row, HConstants.LATEST_TIMESTAMP, rowLock);,public Put(byte row, long ts) this(row, ts, null);,public Put(byte row, long ts, RowLock rowLock) if(row = null | row.length HConstants.MAX_ROW_LENGTH) throw new IllegalArgumentException(“Row key is invalid“);this.row = Arrays.copyOf(row, row.length);this.timestamp = ts;if(rowLock != null) this.lockId = rowLock.getLockId();看到的时候很明显了,有传入的参数 key,如果时间戳为空,则为 null,如果传入的 rowlock 不为空,那么就通过 rowLock.getLockId()拿到 lockId,赋值给当前 lockid. 638 棋牌 http:/www.rodlg.com另外还有个public Put(Put putToCopy) Copy constructor. Creates a Put operation cloned from the specified Put.this.writeToWAL = putToCopy.writeToWAL;HBase 中 WAL(Write Ahead Log) 的存储格式二,add 操作:public Put add(byte family, byte qualifier, long ts, byte value) List list = getKeyValueList(family);KeyValue kv = createPutKeyValue(family, qualifier, ts, value);list.add(kv);familyMap.put(kv.getFamily(), list);return this;List list先取出依据 family 从 familyMap 拿到 List list,如果 list 为空,则创建一个 list,然后依据参数 family, qualifier, ts, value 生成一个 KeyValue然后将 KeyValue 放入 familyMap 中private List getKeyValueList(byte family) List list = familyMap.get(family);if(list = null) list = new ArrayList(0);return list;早来看下 htable 的 put 方法:private void doPut(final List puts) throws IOException int n = 0;for (Put put : puts) validatePut(put);writeBuffer.add(put);currentWriteBufferSize += put.heapSize();/ we need to periodically see if the writebuffer is full instead of waiting until the end of the Listn+;if (n % DOPUT_WB_CHECK = 0 if (autoFlush | currentWriteBufferSize writeBufferSize) flushCommits();如果 currentWriteBufferSize writeBufferSize,此时就会调用 flushCommits()方法。天地棋牌 http:/www.dadiqipaigw.cnOverridepublic void flushCommits() throws IOException try connection.processBatchOfPuts(writeBuffer, tableName, pool); finally if (clearBufferOnFail) writeBuffer.clear();currentWriteBufferSize = 0; else / the write buffer was adjusted by processBatchOfPutscurrentWriteBufferSize = 0;for (Put aPut : writeBuffer) currentWriteBufferSize += aPut.heapSize();HConnectionManager.classconnection.processBatchOfPuts(writeBuffer, tableName, pool);最终调用的是 processBatch(List) list, tableName, pool, results);方法processBatch 内部有 retry 机制,/ sleep first, if this is a retrysleep 时间: long sleepTime = getPauseTime(tries);此后回依据参数调用 locateRegion 的去定位 RegionHRegionLocation loc = locateRegion(tableName, row.getRow(), true);private HRegionLocation locateRegion(final byte tableName,final byte row, boolean useCache)在这个函数中:if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME) try HServerAddress hsa =this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);LOG.debug(“Lookedup root region location, connection=“ + this +“; hsa=“ + hsa);if (hsa = null) return null;return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa); catch (InterruptedException e) Thread.currentThread().interrupt();return null; else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME) return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,useCache, metaRegionLock); else / Region not in the cache - have to go to the meta RSreturn locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,useCache, userRegionLock);, 如果 tableName = -ROOT- 就会调用 waitRootRegionLocation 方法,通过zookeeper 得到 rootregion 的地址。返回一个 new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa); 通过 zookeeper 得到 rootregion 的地址:RootRegionTracker.class-public HServerAddress waitRootRegionLocation(long timeout)throws InterruptedException return dataToHServerAddress(super.blockUntilAvailable(timeout);ZooKeeperNodeTracker.class 中:public synchronized byte blockUntilAvailable(long timeout)throws InterruptedException if (timeout 0) continue;wait(remaining);remaining = timeout - (System.currentTimeMillis() - startTime);return data;在 start 方法中可以看到 data:byte data = ZKUtil.getDataAndWatch(watcher, node);, 如果 tableName = .META.,就会调用 locateRegionInMeta 方法,locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,useCache, metaRegionLock);locateRegionInMeta 中:if (useCache) location = getCachedLocation(tableName, row);先去从缓存中拿,如果缓存中没有,得到 metakey,依据这个 key 首先定位 root 和meta region,然后HRegionInterface server =getHRegionConnection(metaLocation.getServerAddress();通过去定位 serveraddress,首先是通过 regionInfoRow = server.getClosestRowBefore 得到一个 regionInfoRow,在得到一个value,regionInfoRow.getValue,最终得到 serveraddress:serverAddress = Bytes.toString(value);, 如果不是.META.表也不是-ROOT-表,那么也会调用 locateRegionInMeta 方法, / Region not in the cache - have to go to the meta RSreturn locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,useCache, userRegionLock);传入 meta 表,定位获得 serveraddress之后组装 actions,即 put,get,delete,等操作之后交给线程池一步
收藏 下载该资源
网站客服QQ:2055934822
金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号