Kafka源码精华解读-Kafka用到的数据结构与分段加锁
1.分段加锁
生产者客户端每发送一条消息,都会调用org.apache.kafka.clients.producer.internals.RecordAccumulator#append,因此它是高并发方法,需要保证线程安全。在高并发海量吞吐的场景下,如何才能保证消息有序、高吞吐地发送是值得思考的问题。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
// check if we have an in-progress batch
/**
* 步骤一:先根据分区找到应该插入到哪个队列里面。
* 如果有已经存在的队列,那么我们就使用存在队列
* 如果队列不存在,那么我们新创建一个队列
*
* 我们肯定是有了存储批次的队列,但是大家一定要知道一个事
* 我们代码第一次执行到这儿,获取其实就是一个空的队列。
*
* 现在代码第二次执行进来。
* 假设 分区还是之前的那个分区。
*
* 这个方法里面我们之前分析,里面就是针对batchs进行的操作
* 里面kafka自己封装了一个数据结构:CopyOnWriteMap (这个数据结构本来就是线程安全的)
*
*
* 根据当前消息的信息,获取一个队列
*
*
* 线程一,线程二,线程三
*/
Deque<RecordBatch> dq = getOrCreateDeque(tp);
/**
* 假设我们现在有线程一,线程二,线程三
*
*/
synchronized (dq) {
//线程一进来了
//线程二进来了
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
/**
* 步骤二:
* 尝试往队列里面的批次里添加数据
*
* 一开始添加数据肯定是失败的,我们目前只是以后了队列
* 数据是需要存储在批次对象里面(这个批次对象是需要分配内存的)
* 我们目前还没有分配内存,所以如果按场景驱动的方式,
* 代码第一次运行到这儿其实是不成功的。
*/
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
//线程一 进来的时候,
//第一次进来的时候appendResult的值就为null
if (appendResult != null)
return appendResult;
}//释放锁
// we don't have an in-progress record batch try to allocate a new batch
/**
* 步骤三:计算一个批次的大小
* 在消息的大小和批次的大小之间取一个最大值,用这个值作为当前这个批次的大小。
* 有可能我们的一个消息的大小比一个设定好的批次的大小还要大。
* 默认一个批次的大小是16K。
* 所以我们看到这段代码以后,应该给我们一个启示。
* 如果我们生产者发送数的时候,如果我们的消息的大小都是超过16K,
* 说明其实就是一条消息就是一个批次,那也就是说消息是一条一条被发送出去的。
* 那如果是这样的话,批次这个概念的设计就没有意义了
* 所以大家一定要根据自定公司的数据大小的情况去设置批次的大小。
*
*
*
*/
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
/**
* 步骤四:
* 根据批次的大小去分配内存
*
*
* 线程一,线程二,线程三,执行到这儿都会申请内存
* 假设每个线程 都申请了 16k的内存。
*
* 线程1 16k
* 线程2 16k
* 线程3 16k
*
*/
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
//假设线程一 进来了。
//线程二进来了
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
/**
* 步骤五:
* 尝试把数据写入到批次里面。
* 代码第一次执行到这儿的时候 依然还是失败的(appendResult==null)
* 目前虽然已经分配了内存
* 但是还没有创建批次,那我们向往批次里面写数据
* 还是不能写的。
*
* 线程二进来执行这段代码的时候,是成功的。
*/
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
//失败的意思就是appendResult 还是会等于null
if (appendResult != null) {
//释放内存
//线程二到这儿,其实他自己已经把数据写到批次了。所以
//他的内存就没有什么用了,就把内存个释放了(还给内存池了。)
free.deallocate(buffer);
return appendResult;
}
/**
* 步骤六:
* 根据内存大小封装批次
*
*
* 线程一到这儿 会根据内存封装出来一个批次。
*/
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
//尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。
//线程一,就往批次里面写数据,这个时候就写成功了。
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
/**
* 步骤七:
* 把这个批次放入到这个队列的队尾
*
*
* 线程一把批次添加到队尾
*/
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}//释放锁
} finally {
appendsInProgress.decrementAndGet();
}
}
使用到java.util.concurrent.atomic.AtomicInteger
private final AtomicInteger appendsInProgress;
appendsInProgress.incrementAndGet();
appendsInProgress.decrementAndGet();
代码看起来可能有点奇怪,写了一堆synchronize,为啥不直接在完整的synchronize块中完成?这恰恰正是设计者的高明之处,其意义在于尽可能将锁的粒度更加精细化进一步提高并发,从上一节 2.5.1 得知,向BufferPool申请内存时可能会导致阻塞,假设一种场景:线程1发送的消息比较大,需要向BufferPool申请新的内存块,而此时因为BufferPool空间不足,随后进入阻塞,但此时它仍然持有Deque的锁;线程2发送的消息很小,Deque最后一个ProducerBatch的剩余空间足够,但由于线程1持有了Deque的锁导致阻塞,若类似线程2情况的线程较多时,势必会造成大量不必要的线程阻塞,降低吞吐量和并发。
2.CopyOnWriteMap 读多写少
org.apache.kafka.clients.producer.internals.RecordAccumulator类里边构造方法初始化
//CopyOnWriteMap的这样的一个数据类型。
//这个数据结构在jdk里面是没有的,是kafka自己设计的。
//这个数据结构设计得很好,因为有了这个数据结构
//整体的提升了封装批次的这个流程的性能!!
//JDK juc包下面:CopyOnWriteArrayList
//把这个空的队列存入batches 这个数据结构里面
//TopicPartition 分区 -》 Deque<RecordBatch> 队列
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
//CopyOnWriteMap的这样的一个数据类型。
//这个数据结构在jdk里面是没有的,是kafka自己设计的。
//这个数据结构设计得很好,因为有了这个数据结构
//整体的提升了封装批次的这个流程的性能!!
//JDK juc包下面:CopyOnWriteArrayList
this.batches = new CopyOnWriteMap<>();
public RecordAccumulator(int batchSize,
long totalSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
Metrics metrics,
Time time) {
this.drainIndex = 0;
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
//CopyOnWriteMap的这样的一个数据类型。
//这个数据结构在jdk里面是没有的,是kafka自己设计的。
//这个数据结构设计得很好,因为有了这个数据结构
//整体的提升了封装批次的这个流程的性能!!
//JDK juc包下面:CopyOnWriteArrayList
this.batches = new CopyOnWriteMap<>();
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
this.incomplete = new IncompleteRecordBatches();
this.muted = new HashSet<>();
this.time = time;
registerMetrics(metrics, metricGrpName);
}
/**
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
/**
* CopyonWriteMap:
* get
* put
*
*/
//直接从batches里面获取当前分区对应的存储队列
Deque<RecordBatch> d = this.batches.get(tp);
//我们现在用到是场景驱动的方式,代码第一次执行到这儿的死活
//是获取不到队列的,也就是说d 这个变量的值为null
if (d != null)
return d;
//代码继续执行,创建出来一个新的空队列,
d = new ArrayDeque<>();
//把这个空的队列存入batches 这个数据结构里面
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
//直接返回新的结果
return previous;
}
这段代码作用是取出分区对应的 ProducerBatch 队列
其中this.batches采用了CopyOnWriteMap数据结构来存放
客户端每发送一条消息都会调用一次append方法,假设Topic有3个分区,总共发送1000万条消息就需要调用1000万次getOrCreateDeque(tp),其中get调用了1000万次,putIfAbsent仅调用了3次,可见这是一个高并发读多写少的场景。针对此场景,KAFKA精心设计了CopyOnWriteMap,CopyOnWriteMap允许线程并发访问,读操作没有加锁限制,性能较高,而写操作需要先在堆内存创建新对象,再将原对象的内容拷贝至新对象,写操作需要上锁。这种数据结构的优点和缺点都非常明显,
优点是:
1.采用读写分离的思想,读操作性能很高,几乎无需额外开销,十分适用于读多写少的场景;
2.map采用volatile关键字修饰,保证了写操作对map的修改对其它线程可见;
缺点是:
每次写操作都要内存复制一份,数据量大时对内存开销较大,容易导致频繁GC;
无法保证数据的强一致性,毕竟读写是作用于新老对象;
@Override
public synchronized V putIfAbsent(K k, V v) {
//如果我们传进来的这个key不存在
if (!containsKey(k))
//那么就调用里面内部的put方法
return put(k, v);
else
//返回结果
return get(k);
}
@Override
public V get(Object k) {
return map.get(k);
}
3.ConcurrentSkipListMap 跳表数据结构
Log.scala
//java juc下面的一个数据结构
//首先这个是跳表实现的一个并发安全的Map集合。
//文件名作为key(base offset)
//value就是一个segment
//目的就是为了能我们根据offset的大小快速的定位到segment
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] =
//java juc下面的一个数据结构
//首先这个是跳表实现的一个并发安全的Map集合。
//文件名作为key(base offset)
//value就是一个segment
//目的就是为了能我们根据offset的大小快速的定位到segment
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
/* Load the log segments from the log files on disk */
private def loadSegments() {
// create the log directory if it doesn't exist
dir.mkdirs()
var swapFiles = Set[File]()
// first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
for(file <- dir.listFiles if file.isFile) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
// if the file ends in .deleted or .cleaned, delete it
file.delete()
} else if(filename.endsWith(SwapFileSuffix)) {
// we crashed in the middle of a swap operation, to recover:
// if a log, delete the .index file, complete the swap operation later
// if an index just delete it, it will be rebuilt
val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
if(baseName.getPath.endsWith(IndexFileSuffix)) {
file.delete()
} else if(baseName.getPath.endsWith(LogFileSuffix)){
// delete the index
val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
index.delete()
swapFiles += file
}
}
}
// now do a second pass and load all the .log and all index files
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
// if it is an index file, make sure it has a corresponding .log file
val logFile =
if (filename.endsWith(TimeIndexFileSuffix))
new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
else
new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
file.delete()
}
} else if(filename.endsWith(LogFileSuffix)) {
// if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val indexFile = Log.indexFilename(dir, start)
val timeIndexFile = Log.timeIndexFilename(dir, start)
val indexFileExists = indexFile.exists()
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = true)
if (indexFileExists) {
try {
segment.index.sanityCheck()
segment.timeIndex.sanityCheck()
} catch {
case e: java.lang.IllegalArgumentException =>
warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
s"${indexFile.getAbsolutePath} and rebuilding index...")
indexFile.delete()
timeIndexFile.delete()
segment.recover(config.maxMessageSize)
}
} else {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
segments.put(start, segment)
}
}
if(logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
} else {
recoverLog()
// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
activeSegment.timeIndex.resize(config.maxIndexSize)
}
java.util.concurrent.ConcurrentSkipListMap 代码:
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
==================================================================
ConcurrentSkipListMap是在JDK 1.6中新增的,为了对高并发场景下的有序Map提供更好的支持,它有几个特点:
高并发场景
key是有序的
添加、删除、查找操作都是基于跳表结构(Skip List)实现的
key和value都不能为null
跳表(Skip List)是一种类似于链表的数据结构,其查询、插入、删除的时间复杂度都是 O(logn)。
在传统的单链表结构中,查找某个元素需要从链表的头部按顺序遍历,直到找到目标元素为止,查找的时间复杂度为O(n)。
而跳表结合了树和链表的特点,其特性如下:
跳表由很多层组成;
每一层都是一个有序的链表;
最底层的链表包含所有元素;
对于每一层的任意一个节点,不仅有指向其下一个节点的指针,也有指向其下一层的指针;
如果一个元素出现在Level n层的链表中,则它在Level n层以下的链表也都会出现。
Skip List例子:
下图是一种可能的跳表结构:
如图,[1]和[40]节点有3层,[8]和[18]节点有2层。每一层都是有序的链表。
如果要查找目标节点[15],大致过程如下:
首先查看[1]节点的第1层,发现[1]节点的下一个节点为[40],大于15,那么查找[1]节点的下一层;
查找[1]节点的第2层,发现[1]节点的下一个节点为[8],小于15,接着查看下一个节点,发现下一个节点是[18],大于15,因此查找[8]节点的下一层;
查找[8]节点的第2层,发现[8]节点的下一个节点是[10],小于15,接着查看下一个节点[13],小于15,接着查看下一个节点[15],发现其值等于15,因此找到了目标节点,结束查询。
跳表实际上是一种 空间换时间 的数据结构。
ConcurrentSkipListMap用到了两种结构的节点。
ConcurrentSkipListMap 的节点主要由 Node, Index, HeadIndex 构成;下面是一个典型的ConcurrentSkipListMap 的实例的结构图:
Node节点代表了真正存储数据的节点,包含了key、value、指向下一个节点的指针next:
static final class Node<K,V> {
final K key; // 键
V val; // 值
Node<K,V> next; // 指向下一个节点的指针
Node(K key, V value, Node<K,V> next) {
this.key = key;
this.val = value;
this.next = next;
}
}
Index节点代表了跳表的层级,包含了当前节点node、下一层down、当前层的下一个节点right:
static final class Index<K,V> {
final Node<K,V> node; // 当前节点
final Index<K,V> down; // 下一层
Index<K,V> right; // 当前层的下一个节点
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
}
下面是对ConcurrentSkipListMap的简单使用的一个例子:
public class ConcurrentSkipListMapTest {
public static void main(String[] args) {
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
map.put(4, "4");
map.put(5, "5");
map.put(1, "1");
map.put(6, "6");
map.put(2, "2");
System.out.println(map.keySet());
System.out.println(map);
System.out.println(map.descendingKeySet());
System.out.println(map.descendingMap());
}
}
CopyOnWriteMap
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.utils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
* A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
*
* 1) 这个数据结构是在高并发的情况下是线程安全的。
* 2) 采用的读写分离的思想设计的数据结构
* 每次插入(写数据)数据的时候都开辟新的内存空间
* 所以会有个小缺点,就是插入数据的时候,会比较耗费内存空间。
* 3)这样的一个数据结构,适合写少读多的场景。
* 读数据的时候性能很高。
*
* batchs这个对象存储数据的时候,就是使用的这个数据结构。
* 对于batches来说,它面对的场景就是读多写少的场景。
*
*batches:
* 读数据:
* 每生产一条消息,都会从batches里面读取数据。
* 假如每秒中生产10万条消息,是不是就意味着每秒要读取10万次。
* 所以绝对是一个高并发的场景。
* 写数据:
* 假设有100个分区,那么就是会插入100次数据。
* 并且队列只需要插入一次就可以了。
* 所以这是一个低频的操作。
*
* 高性能:
*
* 读写分离读设计方案:适合的场景就是读多写少。
* 读多:
*
* 写少:
*
*
*
*/
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
/**
* 核心的变量就是一个map
* 这个map有个特点,它的修饰符是volatile关键字。
* 在多线程的情况下,如果这个map的值发生变化,其他线程也是可见的。
*
* get
* put
*/
private volatile Map<K, V> map;
public CopyOnWriteMap() {
this.map = Collections.emptyMap();
}
public CopyOnWriteMap(Map<K, V> map) {
this.map = Collections.unmodifiableMap(map);
}
@Override
public boolean containsKey(Object k) {
return map.containsKey(k);
}
@Override
public boolean containsValue(Object v) {
return map.containsValue(v);
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return map.entrySet();
}
/**
* 没有加锁,读取数据的时候性能很高(高并发的场景下,肯定性能很高)
* 并且是线程安全的。
* 因为人家采用的读写分离的思想。
* @param k
* @return
*/
@Override
public V get(Object k) {
return map.get(k);
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public Set<K> keySet() {
return map.keySet();
}
@Override
public int size() {
return map.size();
}
@Override
public Collection<V> values() {
return map.values();
}
@Override
public synchronized void clear() {
this.map = Collections.emptyMap();
}
/**
* 1):
* 整个方法使用的是synchronized关键字去修饰的,说明这个方法是线程安全。
* 即使加了锁,这段代码的性能依然很好,因为里面都是纯内存的操作。
* 2)
* 这种设计方式,采用的是读写分离的设计思想。
* 读操作和写操作 是相互不影响的。
* 所以我们读数据的操作就是线程安全的。
*3)
* 最后把值赋给了map,map是用volatile关键字修饰的。
* 说明这个map是具有可见性的,这样的话,如果get数据的时候,这儿的值发生了变化,也是能感知到的。
* @param k
* @param v
* @return
*/
@Override
public synchronized V put(K k, V v) {
//新的内存空间
//读写分离
//往新的内存空间里面插入
//读,读数据就老读空间里面去
Map<K, V> copy = new HashMap<K, V>(this.map);
//插入数据
V prev = copy.put(k, v);
//赋值给map
this.map = Collections.unmodifiableMap(copy);
return prev;
}
@Override
public synchronized void putAll(Map<? extends K, ? extends V> entries) {
Map<K, V> copy = new HashMap<K, V>(this.map);
copy.putAll(entries);
this.map = Collections.unmodifiableMap(copy);
}
@Override
public synchronized V remove(Object key) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.remove(key);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
@Override
public synchronized V putIfAbsent(K k, V v) {
//如果我们传进来的这个key不存在
if (!containsKey(k))
//那么就调用里面内部的put方法
return put(k, v);
else
//返回结果
return get(k);
}
@Override
public synchronized boolean remove(Object k, Object v) {
if (containsKey(k) && get(k).equals(v)) {
remove(k);
return true;
} else {
return false;
}
}
@Override
public synchronized boolean replace(K k, V original, V replacement) {
if (containsKey(k) && get(k).equals(original)) {
put(k, replacement);
return true;
} else {
return false;
}
}
@Override
public synchronized V replace(K k, V v) {
if (containsKey(k)) {
return put(k, v);
} else {
return null;
}
}
}