分享

memory barrier 内存屏障 编译器导致的乱序

 码农书馆 2021-11-19

小结:

1、

很多时候,编译器和 CPU 引起内存乱序访问不会带来什么问题,但一些特殊情况下,程序逻辑的正确性依赖于内存访问顺序,这时候内存乱序访问会带来逻辑上的错误,

2、

 

 https://github.com/torvalds/linux/blob/master/Documentation/memory-barriers.txt#L111

============================
ABSTRACT MEMORY ACCESS MODEL
============================

Consider the following abstract model of the system:

		            :                :
		            :                :
		            :                :
		+-------+   :   +--------+   :   +-------+
		|       |   :   |        |   :   |       |
		|       |   :   |        |   :   |       |
		| CPU 1 |<----->| Memory |<----->| CPU 2 |
		|       |   :   |        |   :   |       |
		|       |   :   |        |   :   |       |
		+-------+   :   +--------+   :   +-------+
		    ^       :       ^        :       ^
		    |       :       |        :       |
		    |       :       |        :       |
		    |       :       v        :       |
		    |       :   +--------+   :       |
		    |       :   |        |   :       |
		    |       :   |        |   :       |
		    +---------->| Device |<----------+
		            :   |        |   :
		            :   |        |   :
		            :   +--------+   :
		            :                :

 

Each CPU executes a program that generates memory access operations.  In the
abstract CPU, memory operation ordering is very relaxed, and a CPU may actually
perform the memory operations in any order it likes, provided program causality
appears to be maintained.  Similarly, the compiler may also arrange the
instructions it emits in any order it likes, provided it doesn't affect the
apparent operation of the program.

So in the above diagram, the effects of the memory operations performed by a
CPU are perceived by the rest of the system as the operations cross the
interface between the CPU and rest of the system (the dotted lines).


For example, consider the following sequence of events:

	CPU 1		CPU 2
	===============	===============
	{ A == 1; B == 2 }
	A = 3;		x = B;
	B = 4;		y = A;

The set of accesses as seen by the memory system in the middle can be arranged
in 24 different combinations:

	STORE A=3,	STORE B=4,	y=LOAD A->3,	x=LOAD B->4
	STORE A=3,	STORE B=4,	x=LOAD B->4,	y=LOAD A->3
	STORE A=3,	y=LOAD A->3,	STORE B=4,	x=LOAD B->4
	STORE A=3,	y=LOAD A->3,	x=LOAD B->2,	STORE B=4
	STORE A=3,	x=LOAD B->2,	STORE B=4,	y=LOAD A->3
	STORE A=3,	x=LOAD B->2,	y=LOAD A->3,	STORE B=4
	STORE B=4,	STORE A=3,	y=LOAD A->3,	x=LOAD B->4
	STORE B=4, ...
	...

and can thus result in four different combinations of values:

	x == 2, y == 1
	x == 2, y == 3
	x == 4, y == 1
	x == 4, y == 3


Furthermore, the stores committed by a CPU to the memory system may not be
perceived by the loads made by another CPU in the same order as the stores were
committed.


As a further example, consider this sequence of events:

	CPU 1		CPU 2
	===============	===============
	{ A == 1, B == 2, C == 3, P == &A, Q == &C }
	B = 4;		Q = P;
	P = &B;		D = *Q;

There is an obvious data dependency here, as the value loaded into D depends on
the address retrieved from P by CPU 2.  At the end of the sequence, any of the
following results are possible:

	(Q == &A) and (D == 1)
	(Q == &B) and (D == 2)
	(Q == &B) and (D == 4)

Note that CPU 2 will never try and load C into D because the CPU will load P
into Q before issuing the load of *Q.

 

 

golang-notes/memory_barrier.md at master · cch123/golang-notes https://github.com/cch123/golang-notes/blob/master/memory_barrier.md

 理解 Memory barrier(内存屏障)_上善若水-CSDN博客 https://blog.csdn.net/zhangxiao93/article/details/42966279

memory barrier

There are only two hard things in Computer Science: cache invalidation and naming things.

-- Phil Karlton

https:///bliki/TwoHardThings.html

memory barrier 也称为 memory fence。

CPU 架构

 ┌─────────────┐                ┌─────────────┐   
 │    CPU 0    │                │    CPU 1    │   
 └───────────┬─┘                └───────────┬─┘   
   ▲         │                     ▲        │     
   │         │                     │        │     
   │         │                     │        │     
   │         │                     │        │     
   │         ▼                     │        ▼     
   │    ┌────────┐                 │    ┌────────┐
   │◀───┤ Store  │                 │◀───┤ Store  │
   ├───▶│ Buffer │                 ├───▶│ Buffer │
   │    └────┬───┘                 │    └───┬────┘
   │         │                     │        │     
   │         │                     │        │     
   │         │                     │        │     
   │         │                     │        │     
   │         ▼                     │        ▼     
┌──┴────────────┐               ┌──┴────────────┐ 
│               │               │               │ 
│     Cache     │               │     Cache     │ 
│               │               │               │ 
└───────┬───────┘               └───────┬───────┘ 
        │                               │         
        │                               │         
        │                               │         
 ┌──────┴──────┐                 ┌──────┴──────┐  
 │ Invalidate  │                 │ Invalidate  │  
 │    Queue    │                 │    Queue    │  
 └──────┬──────┘                 └──────┬──────┘  
        │                               │         
        │         Interconnect          │         
        └───────────────┬───────────────┘         
                        │                         
                        │                         
                        │                         
                        │                         
                ┌───────┴───────┐                 
                │               │                 
                │    Memory     │                 
                │               │                 
                └───────────────┘                 

CPU 和内存间是个多层的架构,有些资料上会把 L1/L2/L3 都叫作内存,并把这个整体称为内存分层: memory hierachy。

CPU 的 cache 层需要保证其本身能保证一定的一致性,一般叫 cache coherence,这是通过 MESI 协议来保证的。

但是 MESI 协议中,如果要写入,需要把其它 cache 中的数据 invalidate 掉。这会导致 CPU 需要等待本地的 cacheline 变为 E(Exclusive)状态才能写入,也就是会带来等待,为了优化这种等待,最好是攒一波写,统一进行触发,所以积攒这些写入操作的地方就是 store buffer。

如果 CPU 收到了其它核心发来的 invalidate 消息(BusRdX),这时候 CPU 需要将本地的 cache invalidate 掉,又涉及了 cacheline 的修改和 store buffer 的处理,需要等待。为了消除这个等待,CPU 会把这些 invalidate 消息放在 invalidate queue 中,只要保证该消息之后一定会被处理后才对相应的 cache line 进行处理,那么就可以立即对其它核心发来的 invalidate 消息进行响应,以避免阻塞其它核心的流程处理。

这样的多层结构在我们编写程序的时候,也给我们带来了一些困扰。

不过我们先来看看 cache 层怎么保证多核心间数据的一致性,即 mesi 协议。

mesi 协议

 p0p1p2p3
initial state I I I I
p0 reads X E I I I
p1 reads X S S I I
p2 reads X S S S I
p3 writes X I I I M
p0 readx X S I I S

下面是不同类型的处理器请求和总线侧的请求:

处理器向 Cache 发请求包括下面这些操作:

PrRd: 处理器请求读取一个 Cache block PrWr: 处理器请求写入一个 Cache block

总线侧的请求包含:

BusRd: 监听到请求,表示当前有其它处理器正在发起对某个 Cache block 的读请求

BusRdX: 监听到请求,表示当前有其它处理器正在发起对某个其未拥有的 Cache block 的写请求

BusUpgr: 监听到请求,表示有另一个处理器正在发起对某 Cache block 的写请求,该处理器已经持有此 Cache block 块

Flush: 监听到请求,表示整个 cache 块已被另一个处理器写入到主存中

FlushOpt: 监听到请求,表示一个完整的 cache 块已经被发送到总线,以提供给另一个处理器使用(Cache 到 Cache 传数)

Cache 到 Cache 的传送可以降低 read miss 导致的延迟,如果不这样做,需要先将该 block 写回到主存,再读取,延迟会大大增加,在基于总线的系统中,这个结论是正确的。但在多核架构中,coherence 是在 L2 caches 这一级保证的,从 L3 中取可能要比从另一个 L2 中取还要快。

mesi 协议解决了多核环境下,内存多层级带来的问题。使得 cache 层对于 CPU 来说可以认为是透明的,不存在的。单一地址的变量的写入,可以以线性的逻辑进行理解。

但 mesi 协议有两个问题没有解决,一种是 RMW 操作,或者叫 CAS;一种是 ADD 操作。因为这两种操作都需要先读到原始值,进行修改,然后再写回到内存中。

同时,在 CPU 架构中我们看到 CPU 除了 cache 这一层之外,还存在 store buffer,而 store buffer 导致的内存乱序问题,mesi 协议是解决不了的,这是 memory consistency 范畴讨论的问题。

下面一节是 wikipedia 上对于 store buffer 和 invalidate queue 的描述,可能比我开头写的要权威一些。

store buffer 和 invalidate queue

Store Buffer:

当写入到一行 invalidate 状态的 cache line 时便会使用到 store buffer。写如果要继续执行,CPU 需要先发出一条 read-invalid 消息(因为需要确保所有其它缓存了当前内存地址的 CPU 的 cache line 都被 invalidate 掉),然后将写推入到 store buffer 中,当最终 cache line 达到当前 CPU 时再执行这个写操作。

CPU 存在 store buffer 的直接影响是,当 CPU 提交一个写操作时,这个写操作不会立即写入到 cache 中。因而,无论什么时候 CPU 需要从 cache line 中读取,都需要先扫描它自己的 store buffer 来确认是否存在相同的 line,因为有可能当前 CPU 在这次操作之前曾经写入过 cache,但该数据还没有被刷入过 cache(之前的写操作还在 store buffer 中等待)。需要注意的是,虽然 CPU 可以读取其之前写入到 store buffer 中的值,但其它 CPU 并不能在该 CPU 将 store buffer 中的内容 flush 到 cache 之前看到这些值。即 store buffer 是不能跨核心访问的,CPU 核心看不到其它核心的 store buffer。

Invalidate Queues:

为了处理 invalidation 消息,CPU 实现了 invalidate queue,借以处理新达到的 invalidate 请求,在这些请求到达时,可以马上进行响应,但可以不马上处理。取而代之的,invalidation 消息只是会被推进一个 invalidation 队列,并在之后尽快处理(但不是马上)。因此,CPU 可能并不知道在它 cache 里的某个 cache line 是 invalid 状态的,因为 invalidation 队列包含有收到但还没有处理的 invalidation 消息,这是因为 CPU 和 invalidation 队列从物理上来讲是位于 cache 的两侧的。

从结果上来讲,memory barrier 是必须的。一个 store barrier 会把 store buffer flush 掉,确保所有的写操作都被应用到 CPU 的 cache。一个 read barrier 会把 invalidation queue flush 掉,也就确保了其它 CPU 的写入对执行 flush 操作的当前这个 CPU 可见。再进一步,MMU 没有办法扫描 store buffer,会导致类似的问题。这种效果对于单线程处理器来说已经是会发生的了。

因为前面提到的 store buffer 的存在,会导致多核心运行用户代码时,读和写以非程序顺序的顺序完成。下面我们用工具来对这种乱序进行一些观察。

CPU 导致乱序

使用 litmus 进行形式化验证:

cat sb.litmus

X86 SB
{ x=0; y=0; }
 P0          | P1          ;
 MOV [x],$1  | MOV [y],$1  ;
 MOV EAX,[y] | MOV EAX,[x] ;
locations [x;y;]
exists (0:EAX=0 /\ 1:EAX=0)
~ ❯❯❯ bin/litmus7 ./sb.litmus
%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Results for ./sb.litmus %
%%%%%%%%%%%%%%%%%%%%%%%%%%%
X86 SB

{x=0; y=0;}

 P0          | P1          ;
 MOV [x],$1  | MOV [y],$1  ;
 MOV EAX,[y] | MOV EAX,[x] ;

locations [x; y;]
exists (0:EAX=0 /\ 1:EAX=0)
Generated assembler
	##START _litmus_P0
	movl	$1, -4(%rbx,%rcx,4)
	movl	-4(%rsi,%rcx,4), %eax
	##START _litmus_P1
	movl	$1, -4(%rsi,%rcx,4)
	movl	-4(%rbx,%rcx,4), %eax

Test SB Allowed
Histogram (4 states)
96    *>0:EAX=0; 1:EAX=0; x=1; y=1;
499878:>0:EAX=1; 1:EAX=0; x=1; y=1;
499862:>0:EAX=0; 1:EAX=1; x=1; y=1;
164   :>0:EAX=1; 1:EAX=1; x=1; y=1;
Ok

Witnesses
Positive: 96, Negative: 999904
Condition exists (0:EAX=0 /\ 1:EAX=0) is validated
Hash=2d53e83cd627ba17ab11c875525e078b
Observation SB Sometimes 96 999904
Time SB 0.11

在两个核心上运行汇编指令,意料之外的情况 100w 次中出现了 96 次。虽然很少,但确实是客观存在的情况。

barrier

从功能上来讲,barrier 有四种:

#LoadLoad#LoadStore
#StoreLoad #StoreStore

具体说明:

barrier namedesc
#LoadLoad 阻止不相关的 Load 操作发生重排
#LoadStore 阻止 Store 被重排到 Load 之前
#StoreLoad 阻止 Load 被重排到 Store 之前
#StoreStore 阻止 Store 被重排到 Store 之前

需要注意的是,这里所说的重排都是内存一致性范畴,即全局视角的不同变量操作。如果是同一个变量的读写的话,显然是不会发生重排的,因为不按照 program order 来执行程序的话,相当于对用户的程序发生了破坏行为。

lfence, sfence, mfence

Intel x86/x64 平台,total store ordering(但未来不一定还是 TSO):

mm_lfence("load fence": wait for all loads to complete)

不保证在 lfence 之前的写全局可见(globally visible)。并不是把读操作都序列化。只是保证这些读操作和 lfence 之间的先后关系。

mm_sfence("store fence": wait for all stores to complete)

The Semantics of a Store Fence On all Intel and AMD processors, a store fence has two effects:

Serializes all memory stores. All stores that precede the store fence in program order will become globally observable before any stores that succeed the store fence in program order become globally observable. Note that a store fence is ordered, by definition, with other store fences. In the cycle that follows the cycle when the store fence instruction retires, all stores that precede the store fence in program order are guaranteed to be globally observable. Any store that succeeds the store fence in program order may or may not be globally observable immediately after retiring the store fence. See Section 7.4 of the Intel optimization manual and Section 7.5 of the Intel developer manual Volume 2. Global observability will be discussed in the next section. However, that “SFENCE operation” is not actually a fully store serializing operation even though there is the word SFENCE in its name. This is clarified by the emphasized part. I think there is a typo here; it should read “store fence” not “store, fence.” The manual specifies that while clearing IA32_RTIT_CTL.TraceEn serializes all previous stores with respect to all later stores, it does not by itself ensure global observability of previous stores. It is exactly for this reason why the Linux kernel uses a full store fence (called wmb, which is basically SFENCE) after clearing TraceEn.

mm_mfence("mem fence": wait for all memory operations to complete)

mfence 会等待当前核心中的 store buffer 排空之后再执行后续指令。

https:///questions/27595595/when-are-x86-lfence-sfence-and-mfence-instructions-required

直接考虑硬件的 fence 指令太复杂了,因为硬件提供的 fence 指令和我们前面说的 RW RR WW WR barrier 逻辑并不是严格对应的。

我们在写程序的时候的思考过程应该是,先从程序逻辑出发,然后考虑需要使用哪种 barrier/fence(LL LS SS SL),然后再找对应硬件平台上对应的 fence 指令。

acquire/release 语义

barriers

https:///20130922/acquire-and-release-fences/

在 x86/64 平台上,只有 StoreLoad 乱序,所以你使用 acquire release 时,实际上生成的 fence 是 NOP。

在 Go 语言中也不需要操心这个问题,Go 语言的 atomic 默认是最强的内存序保证,即 sequential consistency。该一致性保证由 Go 保证,在所有运行 Go 的硬件平台上都是一致的。当然,这里说的只是 sync/atomic 暴露出来的接口。Go 在 runtime 层有较弱内存序的相关接口,位置在: runtime/internal/atomic。

memory order 参数

硬件会提供其 memory order,而语言本身可能也会有自己的 memory order,在 C/C++ 语言中会根据传给 atomic 的参数来决定其使用的 memory order,从而进行一些重排,这里的重排不但有硬件重排,还有编译器级别的重排。

下面是 C++ 中对 memory_order 参数的描述:

std::memory_order specifies how memory accesses, including regular, non-atomic memory accesses, are to be ordered around an atomic operation. Absent any constraints on a multi-core system, when multiple threads simultaneously read and write to several variables, one thread can observe the values change in an order different from the order another thread wrote them. Indeed, the apparent order of changes can even differ among multiple reader threads. Some similar effects can occur even on uniprocessor systems due to compiler transformations allowed by the memory model.

The default behavior of all atomic operations in the library provides for sequentially consistent ordering (see discussion below). That default can hurt performance, but the library's atomic operations can be given an additional std::memory_order argument to specify the exact constraints, beyond atomicity, that the compiler and processor must enforce for that operation.

这也是一个 Go 不需要操心的问题。

sequential consistency

Sequential consistency is a strong safety property for concurrent systems. Informally, sequential consistency implies that operations appear to take place in some total order, and that that order is consistent with the order of operations on each individual process.

可以理解为,同一块内存每次一定只有一个核心能执行。是最符合人的直觉的多线程执行顺序,可以用顺序的逻辑来理解程序执行结果。可以理解为完全没有读写重排。

其执行流程类似下图:

┌──────────────┐    ┌──────────────┐   ┌──────────────┐    ┌──────────────┐
 Processor 1       Processor 2      Processor 3       Processor 4  
└──────────────┘    └──────────────┘   └──────────────┘    └──────────────┘
                                                                          
                                                                          
                                                                          
                                                                          
                                                                          
                                                                          
                                                                          
        └──────────────────────────┐                                       
                                                                          
                                                                          
                                                                          
                                                                          
                                                                          
                                                                          
                                                                          
                         ┌──────────────────┐                              
                                                                         
                               Memory                                    
                                                                         
                         └──────────────────┘                              

内存上有一个开关,可以拨到任意一个 Processor 上,拨动到位后 Processor 即开始访问和修改内存。

保证 Sequential Consistency 的话,性能会非常差。体现不了多核心的优势。Intel 的 TSO 虽然是较强的 Memory Model,但也会有 WR 的重排。

cache coherency vs memory consistency

MESI 协议使 cache 层对 CPU 透明。多线程程序不需要担心某个核心读到过期数据,或者多个核心同时写入到一行 cache line 的不同部分,而导致 half write 的 cache line 被同步到主存中。

然而这种一致性机制没有办法解决 read-modify-write 操作的问题,比如 increment 操作,compare and swap 等操作。MESI 协议并不会阻止两个核心读到相同的内存内容,然后每个核心分别对其加一,再将新的相同的值写回到内存中,相当于将两次加一操作合并成了一次。

在现代 CPU 上,Lock 前缀会将 cache line 上锁,从而使 read-modify-write 操作在逻辑上具备原子性。下面的说明进行了简化,不过应该可以说明问题。

Unlocked increment:

  1. Acquire cache line, shareable is fine. Read the value.
  2. Add one to the read value.
  3. Acquire cache line exclusive (if not already E or M) and lock it.
  4. Write the new value to the cache line.
  5. Change the cache line to modified and unlock it.

Locked increment:

  1. Acquire cache line exclusive (if not already E or M) and lock it.
  2. Read value.
  3. Add one to it.
  4. Write the new value to the cache line.
  5. Change the cache line to modified and unlock it.

在 unlocked increment 操作中,cache line 只在写内存操作时被锁住,和所有类型的写一样。在 locked increment 中,cache line 在整个指令阶段都被持有,从读一直到最后的写操作。

某些 CPU 除了 cache 之外,还有其它的组件会影响内存的可见性。比如一些 CPU 有一个 read prefetcher(预取器)或者 write buffer,会导致内存操作执行乱序。不管有什么组件,LOCK 前缀(或其它 CPU 上的等价方式)可以避免一些内存操作上的顺序问题。

https:///questions/29880015/lock-prefix-vs-mesi-protocol

总结一下,cache coherence 解决的是单一地址的写问题,可以使多核心对同一地址的写入序列化。而 memory consistency 说的是不同地址的读写的顺序问题。即全局视角对读写的观测顺序问题。解决 cache coherence 的协议(MESI)并不能解决 CAS 类的问题。同时也解决不了 memory consistency,即不同内存地址读写的可见性问题,要解决 memory consistency 的问题,需要使用 memory barrier 之类的工具。

编译器导致乱序

snippet 1

X = 0
for i in range(100):
    X = 1
    print X

snippet 2

X = 1
for i in range(100):
    print X

snippet 1 和 snippet 2 从逻辑上等价的。

如果这时候,假设有 Processor 2 同时在执行一条指令:

X = 0

P2 中的指令和 snippet 1 交错执行时,可能产生的结果是:111101111..

P2 中的指令和 snippet 2 交错执行时,可能产生的结果是:11100000…​

多核心下,编译器对代码的优化也可能导致内存读写的重排。

有人说这个例子不够有说服力,我们看看参考资料中的另一个例子:

int a, b;
int foo()
{
    a = b + 1;
    b = 0; 
    return 1;
}

输出汇编:

mov eax, DWORD PTR b[rip]
add eax, 1
mov DWORD PTR a[rip], eax    // --> store to a
mov DWORD PTR b[rip], 0      // --> store to b

开启 O2 优化后,输出汇编:

mov eax, DWORD PTR b[rip]
mov DWORD PTR b[rip], 0      // --> store to b
add eax, 1
mov DWORD PTR a[rip], eax    // --> store to a

可见 compiler 也是可能会修改赋值的顺序的。

atomic/lock 操作成本 in Go

package main

import (
	"sync/atomic"
	"testing"
)

var a int64

func BenchmarkAtomic(b *testing.B) {
	for i := 0; i < b.N; i++ {
		atomic.StoreInt64(&a, int64(i))
	}
}

func BenchmarkNormal(b *testing.B) {
	for i := 0; i < b.N; i++ {
		a = 1
	}
}

func BenchmarkAtomicParallel(b *testing.B) {
	b.SetParallelism(100)
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			atomic.StoreInt64(&a, int64(0))
		}
	})
}

结果:

goos: darwin
goarch: amd64
BenchmarkAtomic-4           	200000000	         7.01 ns/op
BenchmarkNormal-4           	2000000000	         0.63 ns/op
BenchmarkAtomicParallel-4   	100000000	        15.8 ns/op
PASS
ok  	_/Users/didi/test/go/atomic_bench	5.051s

可见,atomic 耗时比普通赋值操作要高一个数量级,在有竞争的情况下会更加慢。

false sharing / true sharing

true sharing 的概念比较好理解,在对全局变量或局部变量进行多线程修改时,就是一种形式的共享,而且非常字面意思,就是 true sharing。true sharing 带来的明显的问题,例如 RWMutex scales poorly 的官方 issue,即 RWMutex 的 RLock 会对 RWMutex 这个对象的 readerCount 原子加一。本质上就是一种 true sharing。

false sharing 指的是那些意料之外的共享。我们知道 CPU 是以 cacheline 为单位进行内存加载的,L1 的 cache line 大小一般是 64 bytes,如果两个变量,或者两个结构体在内存上相邻,那么在 CPU 加载并修改前一个变量的时候,会把第二个变量也加载到 cache line 中,这时候如果恰好另一个核心在使用第二个变量,那么在 P1 修改掉第一个变量的时候,会把整个 cache line invalidate 掉,这时候 P2 要修改第二个变量的话,就需要再重新加载该 cache line。导致了无谓的加载。

在 Go 的 runtime 中有不少例子,特别是那些 per-P 的结构,大多都有针对 false sharing 的优化:

runtime/time.go

var timers [timersLen]struct {
	timersBucket

	// The padding should eliminate false sharing
	// between timersBucket values.
	pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte
}

runtime/sema.go

var semtable [semTabSize]struct {
	root semaRoot
	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

用户态的代码对 false sharing 其实关注的比较少。

runtime 中的 publicationBarrier

TODO

https://github.com/golang/go/issues/35541

参考资料:

https://homes.cs./~bornholt/post/memory-models.html

https://people.eecs./~rcs/research/interactive_latency.html

https://monkeysayhi./2017/12/28/%E4%B8%80%E6%96%87%E8%A7%A3%E5%86%B3%E5%86%85%E5%AD%98%E5%B1%8F%E9%9A%9C/

https://blog.csdn.net/zhangxiao93/article/details/42966279

http:///2017/09/22/memory-barrier/

https://blog.csdn.net/dd864140130/article/details/56494925

https:///20120515/memory-reordering-caught-in-the-act/

https:///20120710/memory-barriers-are-like-source-control-operations/

https:///20120625/memory-ordering-at-compile-time/

https:///20120612/an-introduction-to-lock-free-programming/

https:///20130922/acquire-and-release-fences/

https://webcourse.cs./234267/Spring2016/ho/WCFiles/tirgul%205%20mesi.pdf

https://www.scss./Jeremy.Jones/VivioJS/caches/MESIHelp.htm

http://15418.courses.cs./spring2017/lectures

https://software.intel.com/en-us/articles/how-memory-is-accessed

https://software.intel.com/en-us/articles/detect-and-avoid-memory-bottlenecks#_Move_Instructions_into

https:///questions/29880015/lock-prefix-vs-mesi-protocol

https://github.com/torvalds/linux/blob/master/Documentation/memory-barriers.txt

 

 

参考文献列表:
http://en./wiki/Memory_barrier
http://en./wiki/Out-of-order_execution
https://www./doc/Documentation/memory-barriers.txt

本文例子均在 Linux(g++)下验证通过,CPU 为 X86-64 处理器架构。所有罗列的 Linux 内核代码也均在(或只在)X86-64 下有效。

本文首先通过范例(以及内核代码)来解释 Memory barrier,然后介绍一个利用 Memory barrier 实现的无锁环形缓冲区。

Memory barrier 简介

程序在运行时内存实际的访问顺序和程序代码编写的访问顺序不一定一致,这就是内存乱序访问。内存乱序访问行为出现的理由是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:

  1. 编译时,编译器优化导致内存乱序访问(指令重排)
  2. 运行时,多 CPU 间交互引起内存乱序访问

Memory barrier 能够让 CPU 或编译器在内存访问上有序。一个 Memory barrier 之前的内存访问操作必定先于其之后的完成。Memory barrier 包括两类:

  1. 编译器 barrier
  2. CPU Memory barrier

很多时候,编译器和 CPU 引起内存乱序访问不会带来什么问题,但一些特殊情况下,程序逻辑的正确性依赖于内存访问顺序,这时候内存乱序访问会带来逻辑上的错误,例如:

  1. // thread 1
  2. while (!ok);
  3. do(x);
  4.  
  5. // thread 2
  6. x = 42;
  7. ok = 1;

此段代码中,ok 初始化为 0,线程 1 等待 ok 被设置为 1 后执行 do 函数。假如说,线程 2 对内存的写操作乱序执行,也就是 x 赋值后于 ok 赋值完成,那么 do 函数接受的实参就很可能出乎程序员的意料,不为 42。

编译时内存乱序访问

在编译时,编译器对代码做出优化时可能改变实际执行指令的顺序(例如 gcc 下 O2 或 O3 都会改变实际执行指令的顺序):

  1. // test.cpp
  2. int x, y, r;
  3. void f()
  4. {
  5. x = r;
  6. y = 1;
  7. }

编译器优化的结果可能导致 y = 1 在 x = r 之前执行完成。首先直接编译此源文件:

  1. g++ -S test.cpp

得到相关的汇编代码如下:

  1. movl r(%rip), %eax
  2. movl %eax, x(%rip)
  3. movl $1, y(%rip)

这里我们看到,x = r 和 y = 1 并没有乱序。现使用优化选项 O2(或 O3)编译上面的代码(g++ -O2 -S test.cpp),生成汇编代码如下:

  1. movl r(%rip), %eax
  2. movl $1, y(%rip)
  3. movl %eax, x(%rip)

我们可以清楚的看到经过编译器优化之后 movl $1, y(%rip) 先于 movl %eax, x(%rip) 执行。避免编译时内存乱序访问的办法就是使用编译器 barrier(又叫优化 barrier)。Linux 内核提供函数 barrier() 用于让编译器保证其之前的内存访问先于其之后的完成。内核实现 barrier() 如下(X86-64 架构):

  1. #define barrier() __asm__ __volatile__("" ::: "memory")

现在把此编译器 barrier 加入代码中:

  1. int x, y, r;
  2. void f()
  3. {
  4. x = r;
  5. __asm__ __volatile__("" ::: "memory");
  6. y = 1;
  7. }

这样就避免了编译器优化带来的内存乱序访问的问题了(如果有兴趣可以再看看编译之后的汇编代码)。本例中,我们还可以使用 volatile 这个关键字来避免编译时内存乱序访问(而无法避免后面要说的运行时内存乱序访问)。volatile 关键字能够让相关的变量之间在内存访问上避免乱序,这里可以修改 x 和 y 的定义来解决问题:

  1. volatile int x, y;
  2. int r;
  3. void f()
  4. {
  5. x = r;
  6. y = 1;
  7. }

现加上了 volatile 关键字,这使得 x 相对于 y、y 相对于 x 在内存访问上有序。在 Linux 内核中,提供了一个宏 ACCESS_ONCE 来避免编译器对于连续的 ACCESS_ONCE 实例进行指令重排。其实 ACCESS_ONCE 实现源码如下:

  1. #define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))

此代码只是将变量 x 转换为 volatile 的而已。现在我们就有了第三个修改方案:

  1. int x, y, r;
  2. void f()
  3. {
  4. ACCESS_ONCE(x) = r;
  5. ACCESS_ONCE(y) = 1;
  6. }

到此基本上就阐述完了我们的编译时内存乱序访问的问题。下面开始介绍运行时内存乱序访问。

运行时内存乱序访问

在运行时,CPU 虽然会乱序执行指令,但是在单个 CPU 的上,硬件能够保证程序执行时所有的内存访问操作看起来像是按程序代码编写的顺序执行的,这时候 Memory barrier 没有必要使用(不考虑编译器优化的情况下)。这里我们了解一下 CPU 乱序执行的行为。在乱序执行时,一个处理器真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。
早期的处理器为有序处理器(In-order processors),有序处理器处理指令通常有以下几步:

  1. 指令获取
  2. 如果指令的输入操作对象(input operands)可用(例如已经在寄存器中了),则将此指令分发到适当的功能单元中。如果一个或者多个操作对象不可用(通常是由于需要从内存中获取),则处理器会等待直到它们可用
  3. 指令被适当的功能单元执行
  4. 功能单元将结果写回寄存器堆(Register file,一个 CPU 中的一组寄存器)

相比之下,乱序处理器(Out-of-order processors)处理指令通常有以下几步:

  1. 指令获取
  2. 指令被分发到指令队列
  3. 指令在指令队列中等待,直到输入操作对象可用(一旦输入操作对象可用,指令就可以离开队列,即便更早的指令未被执行)
  4. 指令被分配到适当的功能单元并执行
  5. 执行结果被放入队列(而不立即写入寄存器堆)
  6. 只有所有更早请求执行的指令的执行结果被写入寄存器堆后,指令执行的结果才被写入寄存器堆(执行结果重排序,让执行看起来是有序的)

从上面的执行过程可以看出,乱序执行相比有序执行能够避免等待不可用的操作对象(有序执行的第二步)从而提高了效率。现代的机器上,处理器运行的速度比内存快很多,有序处理器花在等待可用数据的时间里已经可以处理大量指令了。
现在思考一下乱序处理器处理指令的过程,我们能得到几个结论:

  1. 对于单个 CPU 指令获取是有序的(通过队列实现)
  2. 对于单个 CPU 指令执行结果也是有序返回寄存器堆的(通过队列实现)

由此可知,在单 CPU 上,不考虑编译器优化导致乱序的前提下,多线程执行不存在内存乱序访问的问题。我们从内核源码也可以得到类似的结论(代码不完全的摘录):

  1. #ifdef CONFIG_SMP
  2. #define smp_mb() mb()
  3. #else
  4. #define smp_mb() barrier()
  5. #endif

这里可以看到,如果是 SMP 则使用 mb,mb 被定义为 CPU Memory barrier(后面会讲到),而非 SMP 时,直接使用编译器 barrier。

在多 CPU 的机器上,问题又不一样了。每个 CPU 都存在 cache(cache 主要是为了弥补 CPU 和内存之间较慢的访问速度),当一个特定数据第一次被特定一个 CPU 获取时,此数据显然不在 CPU 的 cache 中(这就是 cache miss)。此 cache miss 意味着 CPU 需要从内存中获取数据(这个过程需要 CPU 等待数百个周期),此数据将被加载到 CPU 的 cache 中,这样后续就能直接从 cache 上快速访问。当某个 CPU 进行写操作时,它必须确保其他的 CPU 已经将此数据从它们的 cache 中移除(以便保证一致性),只有在移除操作完成后此 CPU 才能安全的修改数据。显然,存在多个 cache 时,我们必须通过一个 cache 一致性协议来避免数据不一致的问题,而这个通讯的过程就可能导致乱序访问的出现,也就是这里说的运行时内存乱序访问。这里不再深入讨论整个细节,这是一个比较复杂的问题,有兴趣可以研究http://www./users/paulmck/scalability/paper/whymb.2010.06.07c.pdf 一文,其详细的分析了整个过程。

现在通过一个例子来说明多 CPU 下内存乱序访问:

  1. // test2.cpp
  2. #include <pthread.h>
  3. #include <assert.h>
  4.  
  5. // -------------------
  6. int cpu_thread1 = 0;
  7. int cpu_thread2 = 1;
  8.  
  9. volatile int x, y, r1, r2;
  10.  
  11. void start()
  12. {
  13. x = y = r1 = r2 = 0;
  14. }
  15.  
  16. void end()
  17. {
  18. assert(!(r1 == 0 && r2 == 0));
  19. }
  20.  
  21. void run1()
  22. {
  23. x = 1;
  24. r1 = y;
  25. }
  26.  
  27. void run2()
  28. {
  29. y = 1;
  30. r2 = x;
  31. }
  32.  
  33. // -------------------
  34. static pthread_barrier_t barrier_start;
  35. static pthread_barrier_t barrier_end;
  36.  
  37. static void* thread1(void*)
  38. {
  39. while (1) {
  40. pthread_barrier_wait(&barrier_start);
  41. run1();
  42. pthread_barrier_wait(&barrier_end);
  43. }
  44.  
  45. return NULL;
  46. }
  47.  
  48. static void* thread2(void*)
  49. {
  50. while (1) {
  51. pthread_barrier_wait(&barrier_start);
  52. run2();
  53. pthread_barrier_wait(&barrier_end);
  54. }
  55.  
  56. return NULL;
  57. }
  58.  
  59. int main()
  60. {
  61. assert(pthread_barrier_init(&barrier_start, NULL, 3) == 0);
  62. assert(pthread_barrier_init(&barrier_end, NULL, 3) == 0);
  63.  
  64. pthread_t t1;
  65. pthread_t t2;
  66. assert(pthread_create(&t1, NULL, thread1, NULL) == 0);
  67. assert(pthread_create(&t2, NULL, thread2, NULL) == 0);
  68.  
  69. cpu_set_t cs;
  70. CPU_ZERO(&cs);
  71. CPU_SET(cpu_thread1, &cs);
  72. assert(pthread_setaffinity_np(t1, sizeof(cs), &cs) == 0);
  73. CPU_ZERO(&cs);
  74. CPU_SET(cpu_thread2, &cs);
  75. assert(pthread_setaffinity_np(t2, sizeof(cs), &cs) == 0);
  76.  
  77. while (1) {
  78. start();
  79. pthread_barrier_wait(&barrier_start);
  80. pthread_barrier_wait(&barrier_end);
  81. end();
  82. }
  83.  
  84. return 0;
  85. }

这里创建了两个线程来运行测试代码(需要测试的代码将放置在 run 函数中)。我使用了 pthread barrier(区别于本文讨论的 Memory barrier)主要为了让两个子线程能够同时运行它们的 run 函数。此段代码不停的尝试同时运行两个线程的 run 函数,以便得出我们期望的结果。在每次运行 run 函数前会调用一次 start 函数(进行数据初始化),run 运行后会调用一次 end 函数(进行结果检查)。run1 和 run2 两个函数运行在哪个 CPU 上则通过 cpu_thread1 和 cpu_thread2 两个变量控制。
先编译此程序:g++ -lpthread -o test2 test2.cpp(这里未优化,目的是为了避免编译器优化的干扰)。需要注意的是,两个线程运行在两个不同的 CPU 上(CPU 0 和 CPU 1)。只要内存不出现乱序访问,那么 r1 和 r2 不可能同时为 0,因此断言失败表示存在内存乱序访问。编译之后运行此程序,会发现存在一定概率导致断言失败。为了进一步说明问题,我们把 cpu_thread2 的值改为 0,换而言之就是让两个线程跑在同一个 CPU 下,再运行程序发现断言不再失败。

最后,我们使用 CPU Memory barrier 来解决内存乱序访问的问题(X86-64 架构下):

  1. int cpu_thread1 = 0;
  2. int cpu_thread2 = 1;
  3.  
  4. void run1()
  5. {
  6. x = 1;
  7. __asm__ __volatile__("mfence" ::: "memory");
  8. r1 = y;
  9. }
  10.  
  11. void run2()
  12. {
  13. y = 1;
  14. __asm__ __volatile__("mfence" ::: "memory");
  15. r2 = x;
  16. }

准备使用 Memory barrier

Memory barrier 常用场合包括:

  1. 实现同步原语(synchronization primitives)
  2. 实现无锁数据结构(lock-free data structures)
  3. 驱动程序

实际的应用程序开发中,开发者可能完全不知道 Memory barrier 就可以开发正确的多线程程序,这主要是因为各种同步机制中已经隐含了 Memory barrier(但和实际的 Memory barrier 有细微差别),这就使得不直接使用 Memory barrier 不会存在任何问题。但是如果你希望编写诸如无锁数据结构,那么 Memory barrier 还是很有用的。

通常来说,在单个 CPU 上,存在依赖的内存访问有序:

  1. Q = P;
  2. D = *Q;

这里内存操作有序。然而在 Alpha CPU 上,存在依赖的内存读取操作不一定有序,需要使用数据依赖 barrier(由于 Alpha 不常见,这里就不详细解释了)。

在 Linux 内核中,除了前面说到的编译器 barrier — barrier() 和 ACCESS_ONCE(),还有 CPU Memory barrier:

  1. 通用 barrier,保证读写操作有序的,mb() 和 smp_mb()
  2. 写操作 barrier,仅保证写操作有序的,wmb() 和 smp_wmb()
  3. 读操作 barrier,仅保证读操作有序的,rmb() 和 smp_rmb()

注意,所有的 CPU Memory barrier(除了数据依赖 barrier 之外)都隐含了编译器 barrier。这里的 smp 开头的 Memory barrier 会根据配置在单处理器上直接使用编译器 barrier,而在 SMP 上才使用 CPU Memory barrier(也就是 mb()、wmb()、rmb(),回忆上面相关内核代码)。

最后需要注意一点的是,CPU Memory barrier 中某些类型的 Memory barrier 需要成对使用,否则会出错,详细来说就是:一个写操作 barrier 需要和读操作(或数据依赖)barrier 一起使用(当然,通用 barrier 也是可以的),反之依然。

Memory barrier 的范例

读内核代码进一步学习 Memory barrier 的使用。
Linux 内核实现的无锁(只有一个读线程和一个写线程时)环形缓冲区 kfifo 就使用到了 Memory barrier,实现源码如下:

  1. /*
  2. * A simple kernel FIFO implementation.
  3. *
  4. * Copyright (C) 2004 Stelian Pop <stelian@popies.net>
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program; if not, write to the Free Software
  18. * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  19. *
  20. */
  21.  
  22. #include <linux/kernel.h>
  23. #include <linux/module.h>
  24. #include <linux/slab.h>
  25. #include <linux/err.h>
  26. #include <linux/kfifo.h>
  27. #include <linux/log2.h>
  28.  
  29. /**
  30. * kfifo_init - allocates a new FIFO using a preallocated buffer
  31. * @buffer: the preallocated buffer to be used.
  32. * @size: the size of the internal buffer, this have to be a power of 2.
  33. * @gfp_mask: get_free_pages mask, passed to kmalloc()
  34. * @lock: the lock to be used to protect the fifo buffer
  35. *
  36. * Do NOT pass the kfifo to kfifo_free() after use! Simply free the
  37. * &struct kfifo with kfree().
  38. */
  39. struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
  40. gfp_t gfp_mask, spinlock_t *lock)
  41. {
  42. struct kfifo *fifo;
  43.  
  44. /* size must be a power of 2 */
  45. BUG_ON(!is_power_of_2(size));
  46.  
  47. fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
  48. if (!fifo)
  49. return ERR_PTR(-ENOMEM);
  50.  
  51. fifo->buffer = buffer;
  52. fifo->size = size;
  53. fifo->in = fifo->out = 0;
  54. fifo->lock = lock;
  55.  
  56. return fifo;
  57. }
  58. EXPORT_SYMBOL(kfifo_init);
  59.  
  60. /**
  61. * kfifo_alloc - allocates a new FIFO and its internal buffer
  62. * @size: the size of the internal buffer to be allocated.
  63. * @gfp_mask: get_free_pages mask, passed to kmalloc()
  64. * @lock: the lock to be used to protect the fifo buffer
  65. *
  66. * The size will be rounded-up to a power of 2.
  67. */
  68. struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
  69. {
  70. unsigned char *buffer;
  71. struct kfifo *ret;
  72.  
  73. /*
  74. * round up to the next power of 2, since our 'let the indices
  75. * wrap' technique works only in this case.
  76. */
  77. if (!is_power_of_2(size)) {
  78. BUG_ON(size > 0x80000000);
  79. size = roundup_pow_of_two(size);
  80. }
  81.  
  82. buffer = kmalloc(size, gfp_mask);
  83. if (!buffer)
  84. return ERR_PTR(-ENOMEM);
  85.  
  86. ret = kfifo_init(buffer, size, gfp_mask, lock);
  87.  
  88. if (IS_ERR(ret))
  89. kfree(buffer);
  90.  
  91. return ret;
  92. }
  93. EXPORT_SYMBOL(kfifo_alloc);
  94.  
  95. /**
  96. * kfifo_free - frees the FIFO
  97. * @fifo: the fifo to be freed.
  98. */
  99. void kfifo_free(struct kfifo *fifo)
  100. {
  101. kfree(fifo->buffer);
  102. kfree(fifo);
  103. }
  104. EXPORT_SYMBOL(kfifo_free);
  105.  
  106. /**
  107. * __kfifo_put - puts some data into the FIFO, no locking version
  108. * @fifo: the fifo to be used.
  109. * @buffer: the data to be added.
  110. * @len: the length of the data to be added.
  111. *
  112. * This function copies at most @len bytes from the @buffer into
  113. * the FIFO depending on the free space, and returns the number of
  114. * bytes copied.
  115. *
  116. * Note that with only one concurrent reader and one concurrent
  117. * writer, you don't need extra locking to use these functions.
  118. */
  119. unsigned int __kfifo_put(struct kfifo *fifo,
  120. const unsigned char *buffer, unsigned int len)
  121. {
  122. unsigned int l;
  123.  
  124. len = min(len, fifo->size - fifo->in + fifo->out);
  125.  
  126. /*
  127. * Ensure that we sample the fifo->out index -before- we
  128. * start putting bytes into the kfifo.
  129. */
  130.  
  131. smp_mb();
  132.  
  133. /* first put the data starting from fifo->in to buffer end */
  134. l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
  135. memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
  136.  
  137. /* then put the rest (if any) at the beginning of the buffer */
  138. memcpy(fifo->buffer, buffer + l, len - l);
  139.  
  140. /*
  141. * Ensure that we add the bytes to the kfifo -before-
  142. * we update the fifo->in index.
  143. */
  144.  
  145. smp_wmb();
  146.  
  147. fifo->in += len;
  148.  
  149. return len;
  150. }
  151. EXPORT_SYMBOL(__kfifo_put);
  152.  
  153. /**
  154. * __kfifo_get - gets some data from the FIFO, no locking version
  155. * @fifo: the fifo to be used.
  156. * @buffer: where the data must be copied.
  157. * @len: the size of the destination buffer.
  158. *
  159. * This function copies at most @len bytes from the FIFO into the
  160. * @buffer and returns the number of copied bytes.
  161. *
  162. * Note that with only one concurrent reader and one concurrent
  163. * writer, you don't need extra locking to use these functions.
  164. */
  165. unsigned int __kfifo_get(struct kfifo *fifo,
  166. unsigned char *buffer, unsigned int len)
  167. {
  168. unsigned int l;
  169.  
  170. len = min(len, fifo->in - fifo->out);
  171.  
  172. /*
  173. * Ensure that we sample the fifo->in index -before- we
  174. * start removing bytes from the kfifo.
  175. */
  176.  
  177. smp_rmb();
  178.  
  179. /* first get the data from fifo->out until the end of the buffer */
  180. l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
  181. memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
  182.  
  183. /* then get the rest (if any) from the beginning of the buffer */
  184. memcpy(buffer + l, fifo->buffer, len - l);
  185.  
  186. /*
  187. * Ensure that we remove the bytes from the kfifo -before-
  188. * we update the fifo->out index.
  189. */
  190.  
  191. smp_mb();
  192.  
  193. fifo->out += len;
  194.  
  195. return len;
  196. }
  197. EXPORT_SYMBOL(__kfifo_get);

为了更好的理解上面的源码,这里顺带说一下此实现使用到的一些和本文主题无关的技巧:

  1. 使用与操作来求取环形缓冲区的下标,相比取余操作来求取下标的做法效率要高不少。使用与操作求取下标的前提是环形缓冲区的大小必须是 2 的 N 次方,换而言之就是说环形缓冲区的大小为一个仅有一个 1 的二进制数,那么 index & (size – 1) 则为求取的下标(这不难理解)
  2. 使用了 in 和 out 两个索引且 in 和 out 是一直递增的(此做法比较巧妙),这样能够避免一些复杂的条件判断(某些实现下,in == out 时还无法区分缓冲区是空还是满)

这里,索引 in 和 out 被两个线程访问。in 和 out 指明了缓冲区中实际数据的边界,也就是 in 和 out 同缓冲区数据存在访问上的顺序关系,由于未使用同步机制,那么保证顺序关系就需要使用到 Memory barrier 了。索引 in 和 out 都分别只被一个线程修改,而被两个线程读取。__kfifo_put 先通过 in 和 out 来确定可以向缓冲区中写入数据量的多少,这时,out 索引应该先被读取后才能真正的将用户 buffer 中的数据写入缓冲区,因此这里使用到了 smp_mb(),对应的,__kfifo_get 也使用 smp_mb() 来确保修改 out 索引之前缓冲区中数据已经被成功读取并写入用户 buffer 中了。对于 in 索引,在 __kfifo_put 中,通过 smp_wmb() 保证先向缓冲区写入数据后才修改 in 索引,由于这里只需要保证写入操作有序,故选用写操作 barrier,在 __kfifo_get 中,通过 smp_rmb() 保证先读取了 in 索引(这时候 in 索引用于确定缓冲区中实际存在多少可读数据)才开始读取缓冲区中数据(并写入用户 buffer 中),由于这里只需要保证读取操作有序,故选用读操作 barrier。

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多