diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2016-10-03 21:15:00 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2016-10-03 21:15:00 +0200 |
commit | 00bcf5cdd6c0e2e92ce3dd852ca68a3b779fa4ec (patch) | |
tree | 387b2ed2ac0e35776714515665f25ddb5fee1de3 | |
parent | Merge branch 'efi-core-for-linus' of git://git.kernel.org/pub/scm/linux/kerne... (diff) | |
parent | x86/cmpxchg, locking/atomics: Remove superfluous definitions (diff) | |
download | linux-00bcf5cdd6c0e2e92ce3dd852ca68a3b779fa4ec.tar.xz linux-00bcf5cdd6c0e2e92ce3dd852ca68a3b779fa4ec.zip |
Merge branch 'locking-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip
Pull locking updates from Ingo Molnar:
"The main changes in this cycle were:
- rwsem micro-optimizations (Davidlohr Bueso)
- Improve the implementation and optimize the performance of
percpu-rwsems. (Peter Zijlstra.)
- Convert all lglock users to better facilities such as percpu-rwsems
or percpu-spinlocks and remove lglocks. (Peter Zijlstra)
- Remove the ticket (spin)lock implementation. (Peter Zijlstra)
- Korean translation of memory-barriers.txt and related fixes to the
English document. (SeongJae Park)
- misc fixes and cleanups"
* 'locking-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip: (24 commits)
x86/cmpxchg, locking/atomics: Remove superfluous definitions
x86, locking/spinlocks: Remove ticket (spin)lock implementation
locking/lglock: Remove lglock implementation
stop_machine: Remove stop_cpus_lock and lg_double_lock/unlock()
fs/locks: Use percpu_down_read_preempt_disable()
locking/percpu-rwsem: Add down_read_preempt_disable()
fs/locks: Replace lg_local with a per-cpu spinlock
fs/locks: Replace lg_global with a percpu-rwsem
locking/percpu-rwsem: Add DEFINE_STATIC_PERCPU_RWSEMand percpu_rwsem_assert_held()
locking/pv-qspinlock: Use cmpxchg_release() in __pv_queued_spin_unlock()
locking/rwsem, x86: Drop a bogus cc clobber
futex: Add some more function commentry
locking/hung_task: Show all locks
locking/rwsem: Scan the wait_list for readers only once
locking/rwsem: Remove a few useless comments
locking/rwsem: Return void in __rwsem_mark_wake()
locking, rcu, cgroup: Avoid synchronize_sched() in __cgroup_procs_write()
locking/Documentation: Add Korean translation
locking/Documentation: Fix a typo of example result
locking/Documentation: Fix wrong section reference
...
31 files changed, 3540 insertions, 1337 deletions
diff --git a/Documentation/ko_KR/memory-barriers.txt b/Documentation/ko_KR/memory-barriers.txt new file mode 100644 index 000000000000..34d3d380893d --- /dev/null +++ b/Documentation/ko_KR/memory-barriers.txt @@ -0,0 +1,3135 @@ +NOTE: +This is a version of Documentation/memory-barriers.txt translated into Korean. +This document is maintained by SeongJae Park <sj38.park@gmail.com>. +If you find any difference between this document and the original file or +a problem with the translation, please contact the maintainer of this file. + +Please also note that the purpose of this file is to be easier to +read for non English (read: Korean) speakers and is not intended as +a fork. So if you have any comments or updates for this file please +update the original English file first. The English version is +definitive, and readers should look there if they have any doubt. + +=================================== +이 문서는 +Documentation/memory-barriers.txt +의 한글 번역입니다. + +역자: 박성재 <sj38.park@gmail.com> +=================================== + + + ========================= + 리눅스 커널 메모리 배리어 + ========================= + +저자: David Howells <dhowells@redhat.com> + Paul E. McKenney <paulmck@linux.vnet.ibm.com> + Will Deacon <will.deacon@arm.com> + Peter Zijlstra <peterz@infradead.org> + +======== +면책조항 +======== + +이 문서는 명세서가 아닙니다; 이 문서는 완벽하지 않은데, 간결성을 위해 의도된 +부분도 있고, 의도하진 않았지만 사람에 의해 쓰였다보니 불완전한 부분도 있습니다. +이 문서는 리눅스에서 제공하는 다양한 메모리 배리어들을 사용하기 위한 +안내서입니다만, 뭔가 이상하다 싶으면 (그런게 많을 겁니다) 질문을 부탁드립니다. + +다시 말하지만, 이 문서는 리눅스가 하드웨어에 기대하는 사항에 대한 명세서가 +아닙니다. + +이 문서의 목적은 두가지입니다: + + (1) 어떤 특정 배리어에 대해 기대할 수 있는 최소한의 기능을 명세하기 위해서, + 그리고 + + (2) 사용 가능한 배리어들에 대해 어떻게 사용해야 하는지에 대한 안내를 제공하기 + 위해서. + +어떤 아키텍쳐는 특정한 배리어들에 대해서는 여기서 이야기하는 최소한의 +요구사항들보다 많은 기능을 제공할 수도 있습니다만, 여기서 이야기하는 +요구사항들을 충족하지 않는 아키텍쳐가 있다면 그 아키텍쳐가 잘못된 것이란 점을 +알아두시기 바랍니다. + +또한, 특정 아키텍쳐에서 일부 배리어는 해당 아키텍쳐의 특수한 동작 방식으로 인해 +해당 배리어의 명시적 사용이 불필요해서 no-op 이 될수도 있음을 알아두시기 +바랍니다. + +역자: 본 번역 역시 완벽하지 않은데, 이 역시 부분적으로는 의도된 것이기도 +합니다. 여타 기술 문서들이 그렇듯 완벽한 이해를 위해서는 번역문과 원문을 함께 +읽으시되 번역문을 하나의 가이드로 활용하시길 추천드리며, 발견되는 오역 등에 +대해서는 언제든 의견을 부탁드립니다. 과한 번역으로 인한 오해를 최소화하기 위해 +애매한 부분이 있을 경우에는 어색함이 있더라도 원래의 용어를 차용합니다. + + +===== +목차: +===== + + (*) 추상 메모리 액세스 모델. + + - 디바이스 오퍼레이션. + - 보장사항. + + (*) 메모리 배리어란 무엇인가? + + - 메모리 배리어의 종류. + - 메모리 배리어에 대해 가정해선 안될 것. + - 데이터 의존성 배리어. + - 컨트롤 의존성. + - SMP 배리어 짝맞추기. + - 메모리 배리어 시퀀스의 예. + - 읽기 메모리 배리어 vs 로드 예측. + - 이행성 + + (*) 명시적 커널 배리어. + + - 컴파일러 배리어. + - CPU 메모리 배리어. + - MMIO 쓰기 배리어. + + (*) 암묵적 커널 메모리 배리어. + + - 락 Acquisition 함수. + - 인터럽트 비활성화 함수. + - 슬립과 웨이크업 함수. + - 그외의 함수들. + + (*) CPU 간 ACQUIRING 배리어의 효과. + + - Acquire vs 메모리 액세스. + - Acquire vs I/O 액세스. + + (*) 메모리 배리어가 필요한 곳 + + - 프로세서간 상호 작용. + - 어토믹 오퍼레이션. + - 디바이스 액세스. + - 인터럽트. + + (*) 커널 I/O 배리어의 효과. + + (*) 가정되는 가장 완화된 실행 순서 모델. + + (*) CPU 캐시의 영향. + + - 캐시 일관성. + - 캐시 일관성 vs DMA. + - 캐시 일관성 vs MMIO. + + (*) CPU 들이 저지르는 일들. + + - 그리고, Alpha 가 있다. + - 가상 머신 게스트. + + (*) 사용 예. + + - 순환식 버퍼. + + (*) 참고 문헌. + + +======================= +추상 메모리 액세스 모델 +======================= + +다음과 같이 추상화된 시스템 모델을 생각해 봅시다: + + : : + : : + : : + +-------+ : +--------+ : +-------+ + | | : | | : | | + | | : | | : | | + | CPU 1 |<----->| Memory |<----->| CPU 2 | + | | : | | : | | + | | : | | : | | + +-------+ : +--------+ : +-------+ + ^ : ^ : ^ + | : | : | + | : | : | + | : v : | + | : +--------+ : | + | : | | : | + | : | | : | + +---------->| Device |<----------+ + : | | : + : | | : + : +--------+ : + : : + +프로그램은 여러 메모리 액세스 오퍼레이션을 발생시키고, 각각의 CPU 는 그런 +프로그램들을 실행합니다. 추상화된 CPU 모델에서 메모리 오퍼레이션들의 순서는 +매우 완화되어 있고, CPU 는 프로그램이 인과관계를 어기지 않는 상태로 관리된다고 +보일 수만 있다면 메모리 오퍼레이션을 자신이 원하는 어떤 순서대로든 재배치해 +동작시킬 수 있습니다. 비슷하게, 컴파일러 또한 프로그램의 정상적 동작을 해치지 +않는 한도 내에서는 어떤 순서로든 자신이 원하는 대로 인스트럭션을 재배치 할 수 +있습니다. + +따라서 위의 다이어그램에서 한 CPU가 동작시키는 메모리 오퍼레이션이 만들어내는 +변화는 해당 오퍼레이션이 CPU 와 시스템의 다른 부분들 사이의 인터페이스(점선)를 +지나가면서 시스템의 나머지 부분들에 인지됩니다. + + +예를 들어, 다음의 일련의 이벤트들을 생각해 봅시다: + + CPU 1 CPU 2 + =============== =============== + { A == 1; B == 2 } + A = 3; x = B; + B = 4; y = A; + +다이어그램의 가운데에 위치한 메모리 시스템에 보여지게 되는 액세스들은 다음의 총 +24개의 조합으로 재구성될 수 있습니다: + + STORE A=3, STORE B=4, y=LOAD A->3, x=LOAD B->4 + STORE A=3, STORE B=4, x=LOAD B->4, y=LOAD A->3 + STORE A=3, y=LOAD A->3, STORE B=4, x=LOAD B->4 + STORE A=3, y=LOAD A->3, x=LOAD B->2, STORE B=4 + STORE A=3, x=LOAD B->2, STORE B=4, y=LOAD A->3 + STORE A=3, x=LOAD B->2, y=LOAD A->3, STORE B=4 + STORE B=4, STORE A=3, y=LOAD A->3, x=LOAD B->4 + STORE B=4, ... + ... + +따라서 다음의 네가지 조합의 값들이 나올 수 있습니다: + + x == 2, y == 1 + x == 2, y == 3 + x == 4, y == 1 + x == 4, y == 3 + + +한발 더 나아가서, 한 CPU 가 메모리 시스템에 반영한 스토어 오퍼레이션들의 결과는 +다른 CPU 에서의 로드 오퍼레이션을 통해 인지되는데, 이 때 스토어가 반영된 순서와 +다른 순서로 인지될 수도 있습니다. + + +예로, 아래의 일련의 이벤트들을 생각해 봅시다: + + CPU 1 CPU 2 + =============== =============== + { A == 1, B == 2, C == 3, P == &A, Q == &C } + B = 4; Q = P; + P = &B D = *Q; + +D 로 읽혀지는 값은 CPU 2 에서 P 로부터 읽혀진 주소값에 의존적이기 때문에 여기엔 +분명한 데이터 의존성이 있습니다. 하지만 이 이벤트들의 실행 결과로는 아래의 +결과들이 모두 나타날 수 있습니다: + + (Q == &A) and (D == 1) + (Q == &B) and (D == 2) + (Q == &B) and (D == 4) + +CPU 2 는 *Q 의 로드를 요청하기 전에 P 를 Q 에 넣기 때문에 D 에 C 를 집어넣는 +일은 없음을 알아두세요. + + +디바이스 오퍼레이션 +------------------- + +일부 디바이스는 자신의 컨트롤 인터페이스를 메모리의 특정 영역으로 매핑해서 +제공하는데(Memory mapped I/O), 해당 컨트롤 레지스터에 접근하는 순서는 매우 +중요합니다. 예를 들어, 어드레스 포트 레지스터 (A) 와 데이터 포트 레지스터 (D) +를 통해 접근되는 내부 레지스터 집합을 갖는 이더넷 카드를 생각해 봅시다. 내부의 +5번 레지스터를 읽기 위해 다음의 코드가 사용될 수 있습니다: + + *A = 5; + x = *D; + +하지만, 이건 다음의 두 조합 중 하나로 만들어질 수 있습니다: + + STORE *A = 5, x = LOAD *D + x = LOAD *D, STORE *A = 5 + +두번째 조합은 데이터를 읽어온 _후에_ 주소를 설정하므로, 오동작을 일으킬 겁니다. + + +보장사항 +-------- + +CPU 에게 기대할 수 있는 최소한의 보장사항 몇가지가 있습니다: + + (*) 어떤 CPU 든, 의존성이 존재하는 메모리 액세스들은 해당 CPU 자신에게 + 있어서는 순서대로 메모리 시스템에 수행 요청됩니다. 즉, 다음에 대해서: + + Q = READ_ONCE(P); smp_read_barrier_depends(); D = READ_ONCE(*Q); + + CPU 는 다음과 같은 메모리 오퍼레이션 시퀀스를 수행 요청합니다: + + Q = LOAD P, D = LOAD *Q + + 그리고 그 시퀀스 내에서의 순서는 항상 지켜집니다. 대부분의 시스템에서 + smp_read_barrier_depends() 는 아무일도 안하지만 DEC Alpha 에서는 + 명시적으로 사용되어야 합니다. 보통의 경우에는 smp_read_barrier_depends() + 를 직접 사용하는 대신 rcu_dereference() 같은 것들을 사용해야 함을 + 알아두세요. + + (*) 특정 CPU 내에서 겹치는 영역의 메모리에 행해지는 로드와 스토어 들은 해당 + CPU 안에서는 순서가 바뀌지 않은 것으로 보여집니다. 즉, 다음에 대해서: + + a = READ_ONCE(*X); WRITE_ONCE(*X, b); + + CPU 는 다음의 메모리 오퍼레이션 시퀀스만을 메모리에 요청할 겁니다: + + a = LOAD *X, STORE *X = b + + 그리고 다음에 대해서는: + + WRITE_ONCE(*X, c); d = READ_ONCE(*X); + + CPU 는 다음의 수행 요청만을 만들어 냅니다: + + STORE *X = c, d = LOAD *X + + (로드 오퍼레이션과 스토어 오퍼레이션이 겹치는 메모리 영역에 대해 + 수행된다면 해당 오퍼레이션들은 겹친다고 표현됩니다). + +그리고 _반드시_ 또는 _절대로_ 가정하거나 가정하지 말아야 하는 것들이 있습니다: + + (*) 컴파일러가 READ_ONCE() 나 WRITE_ONCE() 로 보호되지 않은 메모리 액세스를 + 당신이 원하는 대로 할 것이라는 가정은 _절대로_ 해선 안됩니다. 그것들이 + 없다면, 컴파일러는 컴파일러 배리어 섹션에서 다루게 될, 모든 "창의적인" + 변경들을 만들어낼 권한을 갖게 됩니다. + + (*) 개별적인 로드와 스토어들이 주어진 순서대로 요청될 것이라는 가정은 _절대로_ + 하지 말아야 합니다. 이 말은 곧: + + X = *A; Y = *B; *D = Z; + + 는 다음의 것들 중 어느 것으로든 만들어질 수 있다는 의미입니다: + + X = LOAD *A, Y = LOAD *B, STORE *D = Z + X = LOAD *A, STORE *D = Z, Y = LOAD *B + Y = LOAD *B, X = LOAD *A, STORE *D = Z + Y = LOAD *B, STORE *D = Z, X = LOAD *A + STORE *D = Z, X = LOAD *A, Y = LOAD *B + STORE *D = Z, Y = LOAD *B, X = LOAD *A + + (*) 겹치는 메모리 액세스들은 합쳐지거나 버려질 수 있음을 _반드시_ 가정해야 + 합니다. 다음의 코드는: + + X = *A; Y = *(A + 4); + + 다음의 것들 중 뭐든 될 수 있습니다: + + X = LOAD *A; Y = LOAD *(A + 4); + Y = LOAD *(A + 4); X = LOAD *A; + {X, Y} = LOAD {*A, *(A + 4) }; + + 그리고: + + *A = X; *(A + 4) = Y; + + 는 다음 중 뭐든 될 수 있습니다: + + STORE *A = X; STORE *(A + 4) = Y; + STORE *(A + 4) = Y; STORE *A = X; + STORE {*A, *(A + 4) } = {X, Y}; + +그리고 보장사항에 반대되는 것들(anti-guarantees)이 있습니다: + + (*) 이 보장사항들은 bitfield 에는 적용되지 않는데, 컴파일러들은 bitfield 를 + 수정하는 코드를 생성할 때 원자성 없는(non-atomic) 읽고-수정하고-쓰는 + 인스트럭션들의 조합을 만드는 경우가 많기 때문입니다. 병렬 알고리즘의 + 동기화에 bitfield 를 사용하려 하지 마십시오. + + (*) bitfield 들이 여러 락으로 보호되는 경우라 하더라도, 하나의 bitfield 의 + 모든 필드들은 하나의 락으로 보호되어야 합니다. 만약 한 bitfield 의 두 + 필드가 서로 다른 락으로 보호된다면, 컴파일러의 원자성 없는 + 읽고-수정하고-쓰는 인스트럭션 조합은 한 필드에의 업데이트가 근처의 + 필드에도 영향을 끼치게 할 수 있습니다. + + (*) 이 보장사항들은 적절하게 정렬되고 크기가 잡힌 스칼라 변수들에 대해서만 + 적용됩니다. "적절하게 크기가 잡힌" 이라함은 현재로써는 "char", "short", + "int" 그리고 "long" 과 같은 크기의 변수들을 의미합니다. "적절하게 정렬된" + 은 자연스런 정렬을 의미하는데, 따라서 "char" 에 대해서는 아무 제약이 없고, + "short" 에 대해서는 2바이트 정렬을, "int" 에는 4바이트 정렬을, 그리고 + "long" 에 대해서는 32-bit 시스템인지 64-bit 시스템인지에 따라 4바이트 또는 + 8바이트 정렬을 의미합니다. 이 보장사항들은 C11 표준에서 소개되었으므로, + C11 전의 오래된 컴파일러(예를 들어, gcc 4.6) 를 사용할 때엔 주의하시기 + 바랍니다. 표준에 이 보장사항들은 "memory location" 을 정의하는 3.14 + 섹션에 다음과 같이 설명되어 있습니다: + (역자: 인용문이므로 번역하지 않습니다) + + memory location + either an object of scalar type, or a maximal sequence + of adjacent bit-fields all having nonzero width + + NOTE 1: Two threads of execution can update and access + separate memory locations without interfering with + each other. + + NOTE 2: A bit-field and an adjacent non-bit-field member + are in separate memory locations. The same applies + to two bit-fields, if one is declared inside a nested + structure declaration and the other is not, or if the two + are separated by a zero-length bit-field declaration, + or if they are separated by a non-bit-field member + declaration. It is not safe to concurrently update two + bit-fields in the same structure if all members declared + between them are also bit-fields, no matter what the + sizes of those intervening bit-fields happen to be. + + +========================= +메모리 배리어란 무엇인가? +========================= + +앞에서 봤듯이, 상호간 의존성이 없는 메모리 오퍼레이션들은 실제로는 무작위적 +순서로 수행될 수 있으며, 이는 CPU 와 CPU 간의 상호작용이나 I/O 에 문제가 될 수 +있습니다. 따라서 컴파일러와 CPU 가 순서를 바꾸는데 제약을 걸 수 있도록 개입할 +수 있는 어떤 방법이 필요합니다. + +메모리 배리어는 그런 개입 수단입니다. 메모리 배리어는 배리어를 사이에 둔 앞과 +뒤 양측의 메모리 오퍼레이션들 간에 부분적 순서가 존재하도록 하는 효과를 줍니다. + +시스템의 CPU 들과 여러 디바이스들은 성능을 올리기 위해 명령어 재배치, 실행 +유예, 메모리 오퍼레이션들의 조합, 예측적 로드(speculative load), 브랜치 +예측(speculative branch prediction), 다양한 종류의 캐싱(caching) 등의 다양한 +트릭을 사용할 수 있기 때문에 이런 강제력은 중요합니다. 메모리 배리어들은 이런 +트릭들을 무효로 하거나 억제하는 목적으로 사용되어져서 코드가 여러 CPU 와 +디바이스들 간의 상호작용을 정상적으로 제어할 수 있게 해줍니다. + + +메모리 배리어의 종류 +-------------------- + +메모리 배리어는 네개의 기본 타입으로 분류됩니다: + + (1) 쓰기 (또는 스토어) 메모리 배리어. + + 쓰기 메모리 배리어는 시스템의 다른 컴포넌트들에 해당 배리어보다 앞서 + 명시된 모든 STORE 오퍼레이션들이 해당 배리어 뒤에 명시된 모든 STORE + 오퍼레이션들보다 먼저 수행된 것으로 보일 것을 보장합니다. + + 쓰기 배리어는 스토어 오퍼레이션들에 대한 부분적 순서 세우기입니다; 로드 + 오퍼레이션들에 대해서는 어떤 영향도 끼치지 않습니다. + + CPU 는 시간의 흐름에 따라 메모리 시스템에 일련의 스토어 오퍼레이션들을 + 하나씩 요청해 집어넣습니다. 쓰기 배리어 앞의 모든 스토어 오퍼레이션들은 + 쓰기 배리어 뒤의 모든 스토어 오퍼레이션들보다 _앞서_ 수행될 겁니다. + + [!] 쓰기 배리어들은 읽기 또는 데이터 의존성 배리어와 함께 짝을 맞춰 + 사용되어야만 함을 알아두세요; "SMP 배리어 짝맞추기" 서브섹션을 참고하세요. + + + (2) 데이터 의존성 배리어. + + 데이터 의존성 배리어는 읽기 배리어의 보다 완화된 형태입니다. 두개의 로드 + 오퍼레이션이 있고 두번째 것이 첫번째 것의 결과에 의존하고 있을 때(예: + 두번째 로드가 참조할 주소를 첫번째 로드가 읽는 경우), 두번째 로드가 읽어올 + 데이터는 첫번째 로드에 의해 그 주소가 얻어지기 전에 업데이트 되어 있음을 + 보장하기 위해서 데이터 의존성 배리어가 필요할 수 있습니다. + + 데이터 의존성 배리어는 상호 의존적인 로드 오퍼레이션들 사이의 부분적 순서 + 세우기입니다; 스토어 오퍼레이션들이나 독립적인 로드들, 또는 중복되는 + 로드들에 대해서는 어떤 영향도 끼치지 않습니다. + + (1) 에서 언급했듯이, 시스템의 CPU 들은 메모리 시스템에 일련의 스토어 + 오퍼레이션들을 던져 넣고 있으며, 거기에 관심이 있는 다른 CPU 는 그 + 오퍼레이션들을 메모리 시스템이 실행한 결과를 인지할 수 있습니다. 이처럼 + 다른 CPU 의 스토어 오퍼레이션의 결과에 관심을 두고 있는 CPU 가 수행 요청한 + 데이터 의존성 배리어는, 배리어 앞의 어떤 로드 오퍼레이션이 다른 CPU 에서 + 던져 넣은 스토어 오퍼레이션과 같은 영역을 향했다면, 그런 스토어 + 오퍼레이션들이 만들어내는 결과가 데이터 의존성 배리어 뒤의 로드 + 오퍼레이션들에게는 보일 것을 보장합니다. + + 이 순서 세우기 제약에 대한 그림을 보기 위해선 "메모리 배리어 시퀀스의 예" + 서브섹션을 참고하시기 바랍니다. + + [!] 첫번째 로드는 반드시 _데이터_ 의존성을 가져야지 컨트롤 의존성을 가져야 + 하는게 아님을 알아두십시오. 만약 두번째 로드를 위한 주소가 첫번째 로드에 + 의존적이지만 그 의존성은 조건적이지 그 주소 자체를 가져오는게 아니라면, + 그것은 _컨트롤_ 의존성이고, 이 경우에는 읽기 배리어나 그보다 강력한 + 무언가가 필요합니다. 더 자세한 내용을 위해서는 "컨트롤 의존성" 서브섹션을 + 참고하시기 바랍니다. + + [!] 데이터 의존성 배리어는 보통 쓰기 배리어들과 함께 짝을 맞춰 사용되어야 + 합니다; "SMP 배리어 짝맞추기" 서브섹션을 참고하세요. + + + (3) 읽기 (또는 로드) 메모리 배리어. + + 읽기 배리어는 데이터 의존성 배리어 기능의 보장사항에 더해서 배리어보다 + 앞서 명시된 모든 LOAD 오퍼레이션들이 배리어 뒤에 명시되는 모든 LOAD + 오퍼레이션들보다 먼저 행해진 것으로 시스템의 다른 컴포넌트들에 보여질 것을 + 보장합니다. + + 읽기 배리어는 로드 오퍼레이션에 행해지는 부분적 순서 세우기입니다; 스토어 + 오퍼레이션에 대해서는 어떤 영향도 끼치지 않습니다. + + 읽기 메모리 배리어는 데이터 의존성 배리어를 내장하므로 데이터 의존성 + 배리어를 대신할 수 있습니다. + + [!] 읽기 배리어는 일반적으로 쓰기 배리어들과 함께 짝을 맞춰 사용되어야 + 합니다; "SMP 배리어 짝맞추기" 서브섹션을 참고하세요. + + + (4) 범용 메모리 배리어. + + 범용(general) 메모리 배리어는 배리어보다 앞서 명시된 모든 LOAD 와 STORE + 오퍼레이션들이 배리어 뒤에 명시된 모든 LOAD 와 STORE 오퍼레이션들보다 + 먼저 수행된 것으로 시스템의 나머지 컴포넌트들에 보이게 됨을 보장합니다. + + 범용 메모리 배리어는 로드와 스토어 모두에 대한 부분적 순서 세우기입니다. + + 범용 메모리 배리어는 읽기 메모리 배리어, 쓰기 메모리 배리어 모두를 + 내장하므로, 두 배리어를 모두 대신할 수 있습니다. + + +그리고 두개의 명시적이지 않은 타입이 있습니다: + + (5) ACQUIRE 오퍼레이션. + + 이 타입의 오퍼레이션은 단방향의 투과성 배리어처럼 동작합니다. ACQUIRE + 오퍼레이션 뒤의 모든 메모리 오퍼레이션들이 ACQUIRE 오퍼레이션 후에 + 일어난 것으로 시스템의 나머지 컴포넌트들에 보이게 될 것이 보장됩니다. + LOCK 오퍼레이션과 smp_load_acquire(), smp_cond_acquire() 오퍼레이션도 + ACQUIRE 오퍼레이션에 포함됩니다. smp_cond_acquire() 오퍼레이션은 컨트롤 + 의존성과 smp_rmb() 를 사용해서 ACQUIRE 의 의미적 요구사항(semantic)을 + 충족시킵니다. + + ACQUIRE 오퍼레이션 앞의 메모리 오퍼레이션들은 ACQUIRE 오퍼레이션 완료 후에 + 수행된 것처럼 보일 수 있습니다. + + ACQUIRE 오퍼레이션은 거의 항상 RELEASE 오퍼레이션과 짝을 지어 사용되어야 + 합니다. + + + (6) RELEASE 오퍼레이션. + + 이 타입의 오퍼레이션들도 단방향 투과성 배리어처럼 동작합니다. RELEASE + 오퍼레이션 앞의 모든 메모리 오퍼레이션들은 RELEASE 오퍼레이션 전에 완료된 + 것으로 시스템의 다른 컴포넌트들에 보여질 것이 보장됩니다. UNLOCK 류의 + 오퍼레이션들과 smp_store_release() 오퍼레이션도 RELEASE 오퍼레이션의 + 일종입니다. + + RELEASE 오퍼레이션 뒤의 메모리 오퍼레이션들은 RELEASE 오퍼레이션이 + 완료되기 전에 행해진 것처럼 보일 수 있습니다. + + ACQUIRE 와 RELEASE 오퍼레이션의 사용은 일반적으로 다른 메모리 배리어의 + 필요성을 없앱니다 (하지만 "MMIO 쓰기 배리어" 서브섹션에서 설명되는 예외를 + 알아두세요). 또한, RELEASE+ACQUIRE 조합은 범용 메모리 배리어처럼 동작할 + 것을 보장하지 -않습니다-. 하지만, 어떤 변수에 대한 RELEASE 오퍼레이션을 + 앞서는 메모리 액세스들의 수행 결과는 이 RELEASE 오퍼레이션을 뒤이어 같은 + 변수에 대해 수행된 ACQUIRE 오퍼레이션을 뒤따르는 메모리 액세스에는 보여질 + 것이 보장됩니다. 다르게 말하자면, 주어진 변수의 크리티컬 섹션에서는, 해당 + 변수에 대한 앞의 크리티컬 섹션에서의 모든 액세스들이 완료되었을 것을 + 보장합니다. + + 즉, ACQUIRE 는 최소한의 "취득" 동작처럼, 그리고 RELEASE 는 최소한의 "공개" + 처럼 동작한다는 의미입니다. + +atomic_ops.txt 에서 설명되는 어토믹 오퍼레이션들 중에는 완전히 순서잡힌 것들과 +(배리어를 사용하지 않는) 완화된 순서의 것들 외에 ACQUIRE 와 RELEASE 부류의 +것들도 존재합니다. 로드와 스토어를 모두 수행하는 조합된 어토믹 오퍼레이션에서, +ACQUIRE 는 해당 오퍼레이션의 로드 부분에만 적용되고 RELEASE 는 해당 +오퍼레이션의 스토어 부분에만 적용됩니다. + +메모리 배리어들은 두 CPU 간, 또는 CPU 와 디바이스 간에 상호작용의 가능성이 있을 +때에만 필요합니다. 만약 어떤 코드에 그런 상호작용이 없을 것이 보장된다면, 해당 +코드에서는 메모리 배리어를 사용할 필요가 없습니다. + + +이것들은 _최소한의_ 보장사항들임을 알아두세요. 다른 아키텍쳐에서는 더 강력한 +보장사항을 제공할 수도 있습니다만, 그런 보장사항은 아키텍쳐 종속적 코드 이외의 +부분에서는 신뢰되지 _않을_ 겁니다. + + +메모리 배리어에 대해 가정해선 안될 것 +------------------------------------- + +리눅스 커널 메모리 배리어들이 보장하지 않는 것들이 있습니다: + + (*) 메모리 배리어 앞에서 명시된 어떤 메모리 액세스도 메모리 배리어 명령의 수행 + 완료 시점까지 _완료_ 될 것이란 보장은 없습니다; 배리어가 하는 일은 CPU 의 + 액세스 큐에 특정 타입의 액세스들은 넘을 수 없는 선을 긋는 것으로 생각될 수 + 있습니다. + + (*) 한 CPU 에서 메모리 배리어를 수행하는게 시스템의 다른 CPU 나 하드웨어에 + 어떤 직접적인 영향을 끼친다는 보장은 존재하지 않습니다. 배리어 수행이 + 만드는 간접적 영향은 두번째 CPU 가 첫번째 CPU 의 액세스들의 결과를 + 바라보는 순서가 됩니다만, 다음 항목을 보세요: + + (*) 첫번째 CPU 가 두번째 CPU 의 메모리 액세스들의 결과를 바라볼 때, _설령_ + 두번째 CPU 가 메모리 배리어를 사용한다 해도, 첫번째 CPU _또한_ 그에 맞는 + 메모리 배리어를 사용하지 않는다면 ("SMP 배리어 짝맞추기" 서브섹션을 + 참고하세요) 그 결과가 올바른 순서로 보여진다는 보장은 없습니다. + + (*) CPU 바깥의 하드웨어[*] 가 메모리 액세스들의 순서를 바꾸지 않는다는 보장은 + 존재하지 않습니다. CPU 캐시 일관성 메커니즘은 메모리 배리어의 간접적 + 영향을 CPU 사이에 전파하긴 하지만, 순서대로 전파하지는 않을 수 있습니다. + + [*] 버스 마스터링 DMA 와 일관성에 대해서는 다음을 참고하시기 바랍니다: + + Documentation/PCI/pci.txt + Documentation/DMA-API-HOWTO.txt + Documentation/DMA-API.txt + + +데이터 의존성 배리어 +-------------------- + +데이터 의존성 배리어의 사용에 있어 지켜야 하는 사항들은 약간 미묘하고, 데이터 +의존성 배리어가 사용되어야 하는 상황도 항상 명백하지는 않습니다. 설명을 위해 +다음의 이벤트 시퀀스를 생각해 봅시다: + + CPU 1 CPU 2 + =============== =============== + { A == 1, B == 2, C == 3, P == &A, Q == &C } + B = 4; + <쓰기 배리어> + WRITE_ONCE(P, &B) + Q = READ_ONCE(P); + D = *Q; + +여기엔 분명한 데이터 의존성이 존재하므로, 이 시퀀스가 끝났을 때 Q 는 &A 또는 &B +일 것이고, 따라서: + + (Q == &A) 는 (D == 1) 를, + (Q == &B) 는 (D == 4) 를 의미합니다. + +하지만! CPU 2 는 B 의 업데이트를 인식하기 전에 P 의 업데이트를 인식할 수 있고, +따라서 다음의 결과가 가능합니다: + + (Q == &B) and (D == 2) ???? + +이런 결과는 일관성이나 인과 관계 유지가 실패한 것처럼 보일 수도 있겠지만, +그렇지 않습니다, 그리고 이 현상은 (DEC Alpha 와 같은) 여러 CPU 에서 실제로 +발견될 수 있습니다. + +이 문제 상황을 제대로 해결하기 위해, 데이터 의존성 배리어나 그보다 강화된 +무언가가 주소를 읽어올 때와 데이터를 읽어올 때 사이에 추가되어야만 합니다: + + CPU 1 CPU 2 + =============== =============== + { A == 1, B == 2, C == 3, P == &A, Q == &C } + B = 4; + <쓰기 배리어> + WRITE_ONCE(P, &B); + Q = READ_ONCE(P); + <데이터 의존성 배리어> + D = *Q; + +이 변경은 앞의 처음 두가지 결과 중 하나만이 발생할 수 있고, 세번째의 결과는 +발생할 수 없도록 합니다. + +데이터 의존성 배리어는 의존적 쓰기에 대해서도 순서를 잡아줍니다: + + CPU 1 CPU 2 + =============== =============== + { A == 1, B == 2, C = 3, P == &A, Q == &C } + B = 4; + <쓰기 배리어> + WRITE_ONCE(P, &B); + Q = READ_ONCE(P); + <데이터 의존성 배리어> + *Q = 5; + +이 데이터 의존성 배리어는 Q 로의 읽기가 *Q 로의 스토어와 순서를 맞추게 +해줍니다. 이는 다음과 같은 결과를 막습니다: + + (Q == &B) && (B == 4) + +이런 패턴은 드물게 사용되어야 함을 알아 두시기 바랍니다. 무엇보다도, 의존성 +순서 규칙의 의도는 쓰기 작업을 -예방- 해서 그로 인해 발생하는 비싼 캐시 미스도 +없애려는 것입니다. 이 패턴은 드물게 발생하는 에러 조건 같은것들을 기록하는데 +사용될 수 있고, 이렇게 배리어를 사용해 순서를 지키게 함으로써 그런 기록이 +사라지는 것을 막습니다. + + +[!] 상당히 비직관적인 이 상황은 분리된 캐시를 가진 기계, 예를 들어 한 캐시 +뱅크가 짝수번 캐시 라인을 처리하고 다른 뱅크는 홀수번 캐시 라인을 처리하는 기계 +등에서 가장 잘 발생합니다. 포인터 P 는 홀수 번호의 캐시 라인에 있고, 변수 B 는 +짝수 번호 캐시 라인에 있다고 생각해 봅시다. 그런 상태에서 읽기 작업을 하는 CPU +의 짝수번 뱅크는 할 일이 쌓여 매우 바쁘지만 홀수번 뱅크는 할 일이 없어 아무 +일도 하지 않고 있었다면, 포인터 P 는 새 값 (&B) 을, 그리고 변수 B 는 옛날 값 +(2) 을 가지고 있는 상태가 보여질 수도 있습니다. + + +데이터 의존성 배리어는 매우 중요한데, 예를 들어 RCU 시스템에서 그렇습니다. +include/linux/rcupdate.h 의 rcu_assign_pointer() 와 rcu_dereference() 를 +참고하세요. 여기서 데이터 의존성 배리어는 RCU 로 관리되는 포인터의 타겟을 현재 +타겟에서 수정된 새로운 타겟으로 바꾸는 작업에서 새로 수정된 타겟이 초기화가 +완료되지 않은 채로 보여지는 일이 일어나지 않게 해줍니다. + +더 많은 예를 위해선 "캐시 일관성" 서브섹션을 참고하세요. + + +컨트롤 의존성 +------------- + +로드-로드 컨트롤 의존성은 데이터 의존성 배리어만으로는 정확히 동작할 수가 +없어서 읽기 메모리 배리어를 필요로 합니다. 아래의 코드를 봅시다: + + q = READ_ONCE(a); + if (q) { + <데이터 의존성 배리어> /* BUG: No data dependency!!! */ + p = READ_ONCE(b); + } + +이 코드는 원하는 대로의 효과를 내지 못할 수 있는데, 이 코드에는 데이터 의존성이 +아니라 컨트롤 의존성이 존재하기 때문으로, 이런 상황에서 CPU 는 실행 속도를 더 +빠르게 하기 위해 분기 조건의 결과를 예측하고 코드를 재배치 할 수 있어서 다른 +CPU 는 b 로부터의 로드 오퍼레이션이 a 로부터의 로드 오퍼레이션보다 먼저 발생한 +걸로 인식할 수 있습니다. 여기에 정말로 필요했던 건 다음과 같습니다: + + q = READ_ONCE(a); + if (q) { + <읽기 배리어> + p = READ_ONCE(b); + } + +하지만, 스토어 오퍼레이션은 예측적으로 수행되지 않습니다. 즉, 다음 예에서와 +같이 로드-스토어 컨트롤 의존성이 존재하는 경우에는 순서가 -지켜진다-는 +의미입니다. + + q = READ_ONCE(a); + if (q) { + WRITE_ONCE(b, p); + } + +컨트롤 의존성은 보통 다른 타입의 배리어들과 짝을 맞춰 사용됩니다. 그렇다곤 +하나, READ_ONCE() 는 반드시 사용해야 함을 부디 명심하세요! READ_ONCE() 가 +없다면, 컴파일러가 'a' 로부터의 로드를 'a' 로부터의 또다른 로드와, 'b' 로의 +스토어를 'b' 로의 또다른 스토어와 조합해 버려 매우 비직관적인 결과를 초래할 수 +있습니다. + +이걸로 끝이 아닌게, 컴파일러가 변수 'a' 의 값이 항상 0이 아니라고 증명할 수 +있다면, 앞의 예에서 "if" 문을 없애서 다음과 같이 최적화 할 수도 있습니다: + + q = a; + b = p; /* BUG: Compiler and CPU can both reorder!!! */ + +그러니 READ_ONCE() 를 반드시 사용하세요. + +다음과 같이 "if" 문의 양갈래 브랜치에 모두 존재하는 동일한 스토어에 대해 순서를 +강제하고 싶은 경우가 있을 수 있습니다: + + q = READ_ONCE(a); + if (q) { + barrier(); + WRITE_ONCE(b, p); + do_something(); + } else { + barrier(); + WRITE_ONCE(b, p); + do_something_else(); + } + +안타깝게도, 현재의 컴파일러들은 높은 최적화 레벨에서는 이걸 다음과 같이 +바꿔버립니다: + + q = READ_ONCE(a); + barrier(); + WRITE_ONCE(b, p); /* BUG: No ordering vs. load from a!!! */ + if (q) { + /* WRITE_ONCE(b, p); -- moved up, BUG!!! */ + do_something(); + } else { + /* WRITE_ONCE(b, p); -- moved up, BUG!!! */ + do_something_else(); + } + +이제 'a' 에서의 로드와 'b' 로의 스토어 사이에는 조건적 관계가 없기 때문에 CPU +는 이들의 순서를 바꿀 수 있게 됩니다: 이런 경우에 조건적 관계는 반드시 +필요한데, 모든 컴파일러 최적화가 이루어지고 난 후의 어셈블리 코드에서도 +마찬가지입니다. 따라서, 이 예에서 순서를 지키기 위해서는 smp_store_release() +와 같은 명시적 메모리 배리어가 필요합니다: + + q = READ_ONCE(a); + if (q) { + smp_store_release(&b, p); + do_something(); + } else { + smp_store_release(&b, p); + do_something_else(); + } + +반면에 명시적 메모리 배리어가 없다면, 이런 경우의 순서는 스토어 오퍼레이션들이 +서로 다를 때에만 보장되는데, 예를 들면 다음과 같은 경우입니다: + + q = READ_ONCE(a); + if (q) { + WRITE_ONCE(b, p); + do_something(); + } else { + WRITE_ONCE(b, r); + do_something_else(); + } + +처음의 READ_ONCE() 는 컴파일러가 'a' 의 값을 증명해내는 것을 막기 위해 여전히 +필요합니다. + +또한, 로컬 변수 'q' 를 가지고 하는 일에 대해 주의해야 하는데, 그러지 않으면 +컴파일러는 그 값을 추측하고 또다시 필요한 조건관계를 없애버릴 수 있습니다. +예를 들면: + + q = READ_ONCE(a); + if (q % MAX) { + WRITE_ONCE(b, p); + do_something(); + } else { + WRITE_ONCE(b, r); + do_something_else(); + } + +만약 MAX 가 1 로 정의된 상수라면, 컴파일러는 (q % MAX) 는 0이란 것을 알아채고, +위의 코드를 아래와 같이 바꿔버릴 수 있습니다: + + q = READ_ONCE(a); + WRITE_ONCE(b, p); + do_something_else(); + +이렇게 되면, CPU 는 변수 'a' 로부터의 로드와 변수 'b' 로의 스토어 사이의 순서를 +지켜줄 필요가 없어집니다. barrier() 를 추가해 해결해 보고 싶겠지만, 그건 +도움이 안됩니다. 조건 관계는 사라졌고, barrier() 는 이를 되돌리지 못합니다. +따라서, 이 순서를 지켜야 한다면, MAX 가 1 보다 크다는 것을, 다음과 같은 방법을 +사용해 분명히 해야 합니다: + + q = READ_ONCE(a); + BUILD_BUG_ON(MAX <= 1); /* Order load from a with store to b. */ + if (q % MAX) { + WRITE_ONCE(b, p); + do_something(); + } else { + WRITE_ONCE(b, r); + do_something_else(); + } + +'b' 로의 스토어들은 여전히 서로 다름을 알아두세요. 만약 그것들이 동일하면, +앞에서 이야기했듯, 컴파일러가 그 스토어 오퍼레이션들을 'if' 문 바깥으로 +끄집어낼 수 있습니다. + +또한 이진 조건문 평가에 너무 의존하지 않도록 조심해야 합니다. 다음의 예를 +봅시다: + + q = READ_ONCE(a); + if (q || 1 > 0) + WRITE_ONCE(b, 1); + +첫번째 조건만으로는 브랜치 조건 전체를 거짓으로 만들 수 없고 두번째 조건은 항상 +참이기 때문에, 컴파일러는 이 예를 다음과 같이 바꿔서 컨트롤 의존성을 없애버릴 +수 있습니다: + + q = READ_ONCE(a); + WRITE_ONCE(b, 1); + +이 예는 컴파일러가 코드를 추측으로 수정할 수 없도록 분명히 해야 한다는 점을 +강조합니다. 조금 더 일반적으로 말해서, READ_ONCE() 는 컴파일러에게 주어진 로드 +오퍼레이션을 위한 코드를 정말로 만들도록 하지만, 컴파일러가 그렇게 만들어진 +코드의 수행 결과를 사용하도록 강제하지는 않습니다. + +마지막으로, 컨트롤 의존성은 이행성 (transitivity) 을 제공하지 -않습니다-. 이건 +x 와 y 가 둘 다 0 이라는 초기값을 가졌다는 가정 하의 두개의 예제로 +보이겠습니다: + + CPU 0 CPU 1 + ======================= ======================= + r1 = READ_ONCE(x); r2 = READ_ONCE(y); + if (r1 > 0) if (r2 > 0) + WRITE_ONCE(y, 1); WRITE_ONCE(x, 1); + + assert(!(r1 == 1 && r2 == 1)); + +이 두 CPU 예제에서 assert() 의 조건은 항상 참일 것입니다. 그리고, 만약 컨트롤 +의존성이 이행성을 (실제로는 그러지 않지만) 보장한다면, 다음의 CPU 가 추가되어도 +아래의 assert() 조건은 참이 될것입니다: + + CPU 2 + ===================== + WRITE_ONCE(x, 2); + + assert(!(r1 == 2 && r2 == 1 && x == 2)); /* FAILS!!! */ + +하지만 컨트롤 의존성은 이행성을 제공하지 -않기- 때문에, 세개의 CPU 예제가 실행 +완료된 후에 위의 assert() 의 조건은 거짓으로 평가될 수 있습니다. 세개의 CPU +예제가 순서를 지키길 원한다면, CPU 0 와 CPU 1 코드의 로드와 스토어 사이, "if" +문 바로 다음에 smp_mb()를 넣어야 합니다. 더 나아가서, 최초의 두 CPU 예제는 +매우 위험하므로 사용되지 않아야 합니다. + +이 두개의 예제는 다음 논문: +http://www.cl.cam.ac.uk/users/pes20/ppc-supplemental/test6.pdf 와 +이 사이트: https://www.cl.cam.ac.uk/~pes20/ppcmem/index.html 에 나온 LB 와 WWC +리트머스 테스트입니다. + +요약하자면: + + (*) 컨트롤 의존성은 앞의 로드들을 뒤의 스토어들에 대해 순서를 맞춰줍니다. + 하지만, 그 외의 어떤 순서도 보장하지 -않습니다-: 앞의 로드와 뒤의 로드들 + 사이에도, 앞의 스토어와 뒤의 스토어들 사이에도요. 이런 다른 형태의 + 순서가 필요하다면 smp_rmb() 나 smp_wmb()를, 또는, 앞의 스토어들과 뒤의 + 로드들 사이의 순서를 위해서는 smp_mb() 를 사용하세요. + + (*) "if" 문의 양갈래 브랜치가 같은 변수에의 동일한 스토어로 시작한다면, 그 + 스토어들은 각 스토어 앞에 smp_mb() 를 넣거나 smp_store_release() 를 + 사용해서 스토어를 하는 식으로 순서를 맞춰줘야 합니다. 이 문제를 해결하기 + 위해 "if" 문의 양갈래 브랜치의 시작 지점에 barrier() 를 넣는 것만으로는 + 충분한 해결이 되지 않는데, 이는 앞의 예에서 본것과 같이, 컴파일러의 + 최적화는 barrier() 가 의미하는 바를 지키면서도 컨트롤 의존성을 손상시킬 + 수 있기 때문이라는 점을 부디 알아두시기 바랍니다. + + (*) 컨트롤 의존성은 앞의 로드와 뒤의 스토어 사이에 최소 하나의, 실행 + 시점에서의 조건관계를 필요로 하며, 이 조건관계는 앞의 로드와 관계되어야 + 합니다. 만약 컴파일러가 조건 관계를 최적화로 없앨수 있다면, 순서도 + 최적화로 없애버렸을 겁니다. READ_ONCE() 와 WRITE_ONCE() 의 주의 깊은 + 사용은 주어진 조건 관계를 유지하는데 도움이 될 수 있습니다. + + (*) 컨트롤 의존성을 위해선 컴파일러가 조건관계를 없애버리는 것을 막아야 + 합니다. 주의 깊은 READ_ONCE() 나 atomic{,64}_read() 의 사용이 컨트롤 + 의존성이 사라지지 않게 하는데 도움을 줄 수 있습니다. 더 많은 정보를 + 위해선 "컴파일러 배리어" 섹션을 참고하시기 바랍니다. + + (*) 컨트롤 의존성은 보통 다른 타입의 배리어들과 짝을 맞춰 사용됩니다. + + (*) 컨트롤 의존성은 이행성을 제공하지 -않습니다-. 이행성이 필요하다면, + smp_mb() 를 사용하세요. + + +SMP 배리어 짝맞추기 +-------------------- + +CPU 간 상호작용을 다룰 때에 일부 타입의 메모리 배리어는 항상 짝을 맞춰 +사용되어야 합니다. 적절하게 짝을 맞추지 않은 코드는 사실상 에러에 가깝습니다. + +범용 배리어들은 범용 배리어끼리도 짝을 맞추지만 이행성이 없는 대부분의 다른 +타입의 배리어들과도 짝을 맞춥니다. ACQUIRE 배리어는 RELEASE 배리어와 짝을 +맞춥니다만, 둘 다 범용 배리어를 포함해 다른 배리어들과도 짝을 맞출 수 있습니다. +쓰기 배리어는 데이터 의존성 배리어나 컨트롤 의존성, ACQUIRE 배리어, RELEASE +배리어, 읽기 배리어, 또는 범용 배리어와 짝을 맞춥니다. 비슷하게 읽기 배리어나 +컨트롤 의존성, 또는 데이터 의존성 배리어는 쓰기 배리어나 ACQUIRE 배리어, +RELEASE 배리어, 또는 범용 배리어와 짝을 맞추는데, 다음과 같습니다: + + CPU 1 CPU 2 + =============== =============== + WRITE_ONCE(a, 1); + <쓰기 배리어> + WRITE_ONCE(b, 2); x = READ_ONCE(b); + <읽기 배리어> + y = READ_ONCE(a); + +또는: + + CPU 1 CPU 2 + =============== =============================== + a = 1; + <쓰기 배리어> + WRITE_ONCE(b, &a); x = READ_ONCE(b); + <데이터 의존성 배리어> + y = *x; + +또는: + + CPU 1 CPU 2 + =============== =============================== + r1 = READ_ONCE(y); + <범용 배리어> + WRITE_ONCE(y, 1); if (r2 = READ_ONCE(x)) { + <묵시적 컨트롤 의존성> + WRITE_ONCE(y, 1); + } + + assert(r1 == 0 || r2 == 0); + +기본적으로, 여기서의 읽기 배리어는 "더 완화된" 타입일 순 있어도 항상 존재해야 +합니다. + +[!] 쓰기 배리어 앞의 스토어 오퍼레이션은 일반적으로 읽기 배리어나 데이터 +의존성 배리어 뒤의 로드 오퍼레이션과 매치될 것이고, 반대도 마찬가지입니다: + + CPU 1 CPU 2 + =================== =================== + WRITE_ONCE(a, 1); }---- --->{ v = READ_ONCE(c); + WRITE_ONCE(b, 2); } \ / { w = READ_ONCE(d); + <쓰기 배리어> \ <읽기 배리어> + WRITE_ONCE(c, 3); } / \ { x = READ_ONCE(a); + WRITE_ONCE(d, 4); }---- --->{ y = READ_ONCE(b); + + +메모리 배리어 시퀀스의 예 +------------------------- + +첫째, 쓰기 배리어는 스토어 오퍼레이션들의 부분적 순서 세우기로 동작합니다. +아래의 이벤트 시퀀스를 보세요: + + CPU 1 + ======================= + STORE A = 1 + STORE B = 2 + STORE C = 3 + <쓰기 배리어> + STORE D = 4 + STORE E = 5 + +이 이벤트 시퀀스는 메모리 일관성 시스템에 원소끼리의 순서가 존재하지 않는 집합 +{ STORE A, STORE B, STORE C } 가 역시 원소끼리의 순서가 존재하지 않는 집합 +{ STORE D, STORE E } 보다 먼저 일어난 것으로 시스템의 나머지 요소들에 보이도록 +전달됩니다: + + +-------+ : : + | | +------+ + | |------>| C=3 | } /\ + | | : +------+ }----- \ -----> 시스템의 나머지 요소에 + | | : | A=1 | } \/ 보여질 수 있는 이벤트들 + | | : +------+ } + | CPU 1 | : | B=2 | } + | | +------+ } + | | wwwwwwwwwwwwwwww } <--- 여기서 쓰기 배리어는 배리어 앞의 + | | +------+ } 모든 스토어가 배리어 뒤의 스토어 + | | : | E=5 | } 전에 메모리 시스템에 전달되도록 + | | : +------+ } 합니다 + | |------>| D=4 | } + | | +------+ + +-------+ : : + | + | CPU 1 에 의해 메모리 시스템에 전달되는 + | 일련의 스토어 오퍼레이션들 + V + + +둘째, 데이터 의존성 배리어는 데이터 의존적 로드 오퍼레이션들의 부분적 순서 +세우기로 동작합니다. 다음 일련의 이벤트들을 보세요: + + CPU 1 CPU 2 + ======================= ======================= + { B = 7; X = 9; Y = 8; C = &Y } + STORE A = 1 + STORE B = 2 + <쓰기 배리어> + STORE C = &B LOAD X + STORE D = 4 LOAD C (gets &B) + LOAD *C (reads B) + +여기에 별다른 개입이 없다면, CPU 1 의 쓰기 배리어에도 불구하고 CPU 2 는 CPU 1 +의 이벤트들을 완전히 무작위적 순서로 인지하게 됩니다: + + +-------+ : : : : + | | +------+ +-------+ | CPU 2 에 인지되는 + | |------>| B=2 |----- --->| Y->8 | | 업데이트 이벤트 + | | : +------+ \ +-------+ | 시퀀스 + | CPU 1 | : | A=1 | \ --->| C->&Y | V + | | +------+ | +-------+ + | | wwwwwwwwwwwwwwww | : : + | | +------+ | : : + | | : | C=&B |--- | : : +-------+ + | | : +------+ \ | +-------+ | | + | |------>| D=4 | ----------->| C->&B |------>| | + | | +------+ | +-------+ | | + +-------+ : : | : : | | + | : : | | + | : : | CPU 2 | + | +-------+ | | + 분명히 잘못된 ---> | | B->7 |------>| | + B 의 값 인지 (!) | +-------+ | | + | : : | | + | +-------+ | | + X 의 로드가 B 의 ---> \ | X->9 |------>| | + 일관성 유지를 \ +-------+ | | + 지연시킴 ----->| B->2 | +-------+ + +-------+ + : : + + +앞의 예에서, CPU 2 는 (B 의 값이 될) *C 의 값 읽기가 C 의 LOAD 뒤에 이어짐에도 +B 가 7 이라는 결과를 얻습니다. + +하지만, 만약 데이터 의존성 배리어가 C 의 로드와 *C (즉, B) 의 로드 사이에 +있었다면: + + CPU 1 CPU 2 + ======================= ======================= + { B = 7; X = 9; Y = 8; C = &Y } + STORE A = 1 + STORE B = 2 + <쓰기 배리어> + STORE C = &B LOAD X + STORE D = 4 LOAD C (gets &B) + <데이터 의존성 배리어> + LOAD *C (reads B) + +다음과 같이 됩니다: + + +-------+ : : : : + | | +------+ +-------+ + | |------>| B=2 |----- --->| Y->8 | + | | : +------+ \ +-------+ + | CPU 1 | : | A=1 | \ --->| C->&Y | + | | +------+ | +-------+ + | | wwwwwwwwwwwwwwww | : : + | | +------+ | : : + | | : | C=&B |--- | : : +-------+ + | | : +------+ \ | +-------+ | | + | |------>| D=4 | ----------->| C->&B |------>| | + | | +------+ | +-------+ | | + +-------+ : : | : : | | + | : : | | + | : : | CPU 2 | + | +-------+ | | + | | X->9 |------>| | + | +-------+ | | + C 로의 스토어 앞의 ---> \ ddddddddddddddddd | | + 모든 이벤트 결과가 \ +-------+ | | + 뒤의 로드에게 ----->| B->2 |------>| | + 보이게 강제한다 +-------+ | | + : : +-------+ + + +셋째, 읽기 배리어는 로드 오퍼레이션들에의 부분적 순서 세우기로 동작합니다. +아래의 일련의 이벤트를 봅시다: + + CPU 1 CPU 2 + ======================= ======================= + { A = 0, B = 9 } + STORE A=1 + <쓰기 배리어> + STORE B=2 + LOAD B + LOAD A + +CPU 1 은 쓰기 배리어를 쳤지만, 별다른 개입이 없다면 CPU 2 는 CPU 1 에서 행해진 +이벤트의 결과를 무작위적 순서로 인지하게 됩니다. + + +-------+ : : : : + | | +------+ +-------+ + | |------>| A=1 |------ --->| A->0 | + | | +------+ \ +-------+ + | CPU 1 | wwwwwwwwwwwwwwww \ --->| B->9 | + | | +------+ | +-------+ + | |------>| B=2 |--- | : : + | | +------+ \ | : : +-------+ + +-------+ : : \ | +-------+ | | + ---------->| B->2 |------>| | + | +-------+ | CPU 2 | + | | A->0 |------>| | + | +-------+ | | + | : : +-------+ + \ : : + \ +-------+ + ---->| A->1 | + +-------+ + : : + + +하지만, 만약 읽기 배리어가 B 의 로드와 A 의 로드 사이에 존재한다면: + + CPU 1 CPU 2 + ======================= ======================= + { A = 0, B = 9 } + STORE A=1 + <쓰기 배리어> + STORE B=2 + LOAD B + <읽기 배리어> + LOAD A + +CPU 1 에 의해 만들어진 부분적 순서가 CPU 2 에도 그대로 인지됩니다: + + +-------+ : : : : + | | +------+ +-------+ + | |------>| A=1 |------ --->| A->0 | + | | +------+ \ +-------+ + | CPU 1 | wwwwwwwwwwwwwwww \ --->| B->9 | + | | +------+ | +-------+ + | |------>| B=2 |--- | : : + | | +------+ \ | : : +-------+ + +-------+ : : \ | +-------+ | | + ---------->| B->2 |------>| | + | +-------+ | CPU 2 | + | : : | | + | : : | | + 여기서 읽기 배리어는 ----> \ rrrrrrrrrrrrrrrrr | | + B 로의 스토어 전의 \ +-------+ | | + 모든 결과를 CPU 2 에 ---->| A->1 |------>| | + 보이도록 한다 +-------+ | | + : : +-------+ + + +더 완벽한 설명을 위해, A 의 로드가 읽기 배리어 앞과 뒤에 있으면 어떻게 될지 +생각해 봅시다: + + CPU 1 CPU 2 + ======================= ======================= + { A = 0, B = 9 } + STORE A=1 + <쓰기 배리어> + STORE B=2 + LOAD B + LOAD A [first load of A] + <읽기 배리어> + LOAD A [second load of A] + +A 의 로드 두개가 모두 B 의 로드 뒤에 있지만, 서로 다른 값을 얻어올 수 +있습니다: + + +-------+ : : : : + | | +------+ +-------+ + | |------>| A=1 |------ --->| A->0 | + | | +------+ \ +-------+ + | CPU 1 | wwwwwwwwwwwwwwww \ --->| B->9 | + | | +------+ | +-------+ + | |------>| B=2 |--- | : : + | | +------+ \ | : : +-------+ + +-------+ : : \ | +-------+ | | + ---------->| B->2 |------>| | + | +-------+ | CPU 2 | + | : : | | + | : : | | + | +-------+ | | + | | A->0 |------>| 1st | + | +-------+ | | + 여기서 읽기 배리어는 ----> \ rrrrrrrrrrrrrrrrr | | + B 로의 스토어 전의 \ +-------+ | | + 모든 결과를 CPU 2 에 ---->| A->1 |------>| 2nd | + 보이도록 한다 +-------+ | | + : : +-------+ + + +하지만 CPU 1 에서의 A 업데이트는 읽기 배리어가 완료되기 전에도 보일 수도 +있긴 합니다: + + +-------+ : : : : + | | +------+ +-------+ + | |------>| A=1 |------ --->| A->0 | + | | +------+ \ +-------+ + | CPU 1 | wwwwwwwwwwwwwwww \ --->| B->9 | + | | +------+ | +-------+ + | |------>| B=2 |--- | : : + | | +------+ \ | : : +-------+ + +-------+ : : \ | +-------+ | | + ---------->| B->2 |------>| | + | +-------+ | CPU 2 | + | : : | | + \ : : | | + \ +-------+ | | + ---->| A->1 |------>| 1st | + +-------+ | | + rrrrrrrrrrrrrrrrr | | + +-------+ | | + | A->1 |------>| 2nd | + +-------+ | | + : : +-------+ + + +여기서 보장되는 건, 만약 B 의 로드가 B == 2 라는 결과를 봤다면, A 에의 두번째 +로드는 항상 A == 1 을 보게 될 것이라는 겁니다. A 에의 첫번째 로드에는 그런 +보장이 없습니다; A == 0 이거나 A == 1 이거나 둘 중 하나의 결과를 보게 될겁니다. + + +읽기 메모리 배리어 VS 로드 예측 +------------------------------- + +많은 CPU들이 로드를 예측적으로 (speculatively) 합니다: 어떤 데이터를 메모리에서 +로드해야 하게 될지 예측을 했다면, 해당 데이터를 로드하는 인스트럭션을 실제로는 +아직 만나지 않았더라도 다른 로드 작업이 없어 버스 (bus) 가 아무 일도 하고 있지 +않다면, 그 데이터를 로드합니다. 이후에 실제 로드 인스트럭션이 실행되면 CPU 가 +이미 그 값을 가지고 있기 때문에 그 로드 인스트럭션은 즉시 완료됩니다. + +해당 CPU 는 실제로는 그 값이 필요치 않았다는 사실이 나중에 드러날 수도 있는데 - +해당 로드 인스트럭션이 브랜치로 우회되거나 했을 수 있겠죠 - , 그렇게 되면 앞서 +읽어둔 값을 버리거나 나중의 사용을 위해 캐시에 넣어둘 수 있습니다. + +다음을 생각해 봅시다: + + CPU 1 CPU 2 + ======================= ======================= + LOAD B + DIVIDE } 나누기 명령은 일반적으로 + DIVIDE } 긴 시간을 필요로 합니다 + LOAD A + +는 이렇게 될 수 있습니다: + + : : +-------+ + +-------+ | | + --->| B->2 |------>| | + +-------+ | CPU 2 | + : :DIVIDE | | + +-------+ | | + 나누기 하느라 바쁜 ---> --->| A->0 |~~~~ | | + CPU 는 A 의 LOAD 를 +-------+ ~ | | + 예측해서 수행한다 : : ~ | | + : :DIVIDE | | + : : ~ | | + 나누기가 끝나면 ---> ---> : : ~-->| | + CPU 는 해당 LOAD 를 : : | | + 즉각 완료한다 : : +-------+ + + +읽기 배리어나 데이터 의존성 배리어를 두번째 로드 직전에 놓는다면: + + CPU 1 CPU 2 + ======================= ======================= + LOAD B + DIVIDE + DIVIDE + <읽기 배리어> + LOAD A + +예측으로 얻어진 값은 사용된 배리어의 타입에 따라서 해당 값이 옳은지 검토되게 +됩니다. 만약 해당 메모리 영역에 변화가 없었다면, 예측으로 얻어두었던 값이 +사용됩니다: + + : : +-------+ + +-------+ | | + --->| B->2 |------>| | + +-------+ | CPU 2 | + : :DIVIDE | | + +-------+ | | + 나누기 하느라 바쁜 ---> --->| A->0 |~~~~ | | + CPU 는 A 의 LOAD 를 +-------+ ~ | | + 예측한다 : : ~ | | + : :DIVIDE | | + : : ~ | | + : : ~ | | + rrrrrrrrrrrrrrrr~ | | + : : ~ | | + : : ~-->| | + : : | | + : : +-------+ + + +하지만 다른 CPU 에서 업데이트나 무효화가 있었다면, 그 예측은 무효화되고 그 값은 +다시 읽혀집니다: + + : : +-------+ + +-------+ | | + --->| B->2 |------>| | + +-------+ | CPU 2 | + : :DIVIDE | | + +-------+ | | + 나누기 하느라 바쁜 ---> --->| A->0 |~~~~ | | + CPU 는 A 의 LOAD 를 +-------+ ~ | | + 예측한다 : : ~ | | + : :DIVIDE | | + : : ~ | | + : : ~ | | + rrrrrrrrrrrrrrrrr | | + +-------+ | | + 예측성 동작은 무효화 되고 ---> --->| A->1 |------>| | + 업데이트된 값이 다시 읽혀진다 +-------+ | | + : : +-------+ + + +이행성 +------ + +이행성(transitivity)은 실제의 컴퓨터 시스템에서 항상 제공되지는 않는, 순서 +맞추기에 대한 상당히 직관적인 개념입니다. 다음의 예가 이행성을 보여줍니다: + + CPU 1 CPU 2 CPU 3 + ======================= ======================= ======================= + { X = 0, Y = 0 } + STORE X=1 LOAD X STORE Y=1 + <범용 배리어> <범용 배리어> + LOAD Y LOAD X + +CPU 2 의 X 로드가 1을 리턴했고 Y 로드가 0을 리턴했다고 해봅시다. 이는 CPU 2 의 +X 로드가 CPU 1 의 X 스토어 뒤에 이루어졌고 CPU 2 의 Y 로드는 CPU 3 의 Y 스토어 +전에 이루어졌음을 의미합니다. 그럼 "CPU 3 의 X 로드는 0을 리턴할 수 있나요?" + +CPU 2 의 X 로드는 CPU 1 의 스토어 후에 이루어졌으니, CPU 3 의 X 로드는 1을 +리턴하는게 자연스럽습니다. 이런 생각이 이행성의 한 예입니다: CPU A 에서 실행된 +로드가 CPU B 에서의 같은 변수에 대한 로드를 뒤따른다면, CPU A 의 로드는 CPU B +의 로드가 내놓은 값과 같거나 그 후의 값을 내놓아야 합니다. + +리눅스 커널에서 범용 배리어의 사용은 이행성을 보장합니다. 따라서, 앞의 예에서 +CPU 2 의 X 로드가 1을, Y 로드는 0을 리턴했다면, CPU 3 의 X 로드는 반드시 1을 +리턴합니다. + +하지만, 읽기나 쓰기 배리어에 대해서는 이행성이 보장되지 -않습니다-. 예를 들어, +앞의 예에서 CPU 2 의 범용 배리어가 아래처럼 읽기 배리어로 바뀐 경우를 생각해 +봅시다: + + CPU 1 CPU 2 CPU 3 + ======================= ======================= ======================= + { X = 0, Y = 0 } + STORE X=1 LOAD X STORE Y=1 + <읽기 배리어> <범용 배리어> + LOAD Y LOAD X + +이 코드는 이행성을 갖지 않습니다: 이 예에서는, CPU 2 의 X 로드가 1을 +리턴하고, Y 로드는 0을 리턴하지만 CPU 3 의 X 로드가 0을 리턴하는 것도 완전히 +합법적입니다. + +CPU 2 의 읽기 배리어가 자신의 읽기는 순서를 맞춰줘도, CPU 1 의 스토어와의 +순서를 맞춰준다고는 보장할 수 없다는게 핵심입니다. 따라서, CPU 1 과 CPU 2 가 +버퍼나 캐시를 공유하는 시스템에서 이 예제 코드가 실행된다면, CPU 2 는 CPU 1 이 +쓴 값에 좀 빨리 접근할 수 있을 것입니다. 따라서 CPU 1 과 CPU 2 의 접근으로 +조합된 순서를 모든 CPU 가 동의할 수 있도록 하기 위해 범용 배리어가 필요합니다. + +범용 배리어는 "글로벌 이행성"을 제공해서, 모든 CPU 들이 오퍼레이션들의 순서에 +동의하게 할 것입니다. 반면, release-acquire 조합은 "로컬 이행성" 만을 +제공해서, 해당 조합이 사용된 CPU 들만이 해당 액세스들의 조합된 순서에 동의함이 +보장됩니다. 예를 들어, 존경스런 Herman Hollerith 의 C 코드로 보면: + + int u, v, x, y, z; + + void cpu0(void) + { + r0 = smp_load_acquire(&x); + WRITE_ONCE(u, 1); + smp_store_release(&y, 1); + } + + void cpu1(void) + { + r1 = smp_load_acquire(&y); + r4 = READ_ONCE(v); + r5 = READ_ONCE(u); + smp_store_release(&z, 1); + } + + void cpu2(void) + { + r2 = smp_load_acquire(&z); + smp_store_release(&x, 1); + } + + void cpu3(void) + { + WRITE_ONCE(v, 1); + smp_mb(); + r3 = READ_ONCE(u); + } + +cpu0(), cpu1(), 그리고 cpu2() 는 smp_store_release()/smp_load_acquire() 쌍의 +연결을 통한 로컬 이행성에 동참하고 있으므로, 다음과 같은 결과는 나오지 않을 +겁니다: + + r0 == 1 && r1 == 1 && r2 == 1 + +더 나아가서, cpu0() 와 cpu1() 사이의 release-acquire 관계로 인해, cpu1() 은 +cpu0() 의 쓰기를 봐야만 하므로, 다음과 같은 결과도 없을 겁니다: + + r1 == 1 && r5 == 0 + +하지만, release-acquire 타동성은 동참한 CPU 들에만 적용되므로 cpu3() 에는 +적용되지 않습니다. 따라서, 다음과 같은 결과가 가능합니다: + + r0 == 0 && r1 == 1 && r2 == 1 && r3 == 0 && r4 == 0 + +비슷하게, 다음과 같은 결과도 가능합니다: + + r0 == 0 && r1 == 1 && r2 == 1 && r3 == 0 && r4 == 0 && r5 == 1 + +cpu0(), cpu1(), 그리고 cpu2() 는 그들의 읽기와 쓰기를 순서대로 보게 되지만, +release-acquire 체인에 관여되지 않은 CPU 들은 그 순서에 이견을 가질 수 +있습니다. 이런 이견은 smp_load_acquire() 와 smp_store_release() 의 구현에 +사용되는 완화된 메모리 배리어 인스트럭션들은 항상 배리어 앞의 스토어들을 뒤의 +로드들에 앞세울 필요는 없다는 사실에서 기인합니다. 이 말은 cpu3() 는 cpu0() 의 +u 로의 스토어를 cpu1() 의 v 로부터의 로드 뒤에 일어난 것으로 볼 수 있다는 +뜻입니다, cpu0() 와 cpu1() 은 이 두 오퍼레이션이 의도된 순서대로 일어났음에 +모두 동의하는데도 말입니다. + +하지만, smp_load_acquire() 는 마술이 아님을 명심하시기 바랍니다. 구체적으로, +이 함수는 단순히 순서 규칙을 지키며 인자로부터의 읽기를 수행합니다. 이것은 +어떤 특정한 값이 읽힐 것인지는 보장하지 -않습니다-. 따라서, 다음과 같은 결과도 +가능합니다: + + r0 == 0 && r1 == 0 && r2 == 0 && r5 == 0 + +이런 결과는 어떤 것도 재배치 되지 않는, 순차적 일관성을 가진 가상의 +시스템에서도 일어날 수 있음을 기억해 두시기 바랍니다. + +다시 말하지만, 당신의 코드가 글로벌 이행성을 필요로 한다면, 범용 배리어를 +사용하십시오. + + +================== +명시적 커널 배리어 +================== + +리눅스 커널은 서로 다른 단계에서 동작하는 다양한 배리어들을 가지고 있습니다: + + (*) 컴파일러 배리어. + + (*) CPU 메모리 배리어. + + (*) MMIO 쓰기 배리어. + + +컴파일러 배리어 +--------------- + +리눅스 커널은 컴파일러가 메모리 액세스를 재배치 하는 것을 막아주는 명시적인 +컴파일러 배리어를 가지고 있습니다: + + barrier(); + +이건 범용 배리어입니다 -- barrier() 의 읽기-읽기 나 쓰기-쓰기 변종은 없습니다. +하지만, READ_ONCE() 와 WRITE_ONCE() 는 특정 액세스들에 대해서만 동작하는 +barrier() 의 완화된 형태로 볼 수 있습니다. + +barrier() 함수는 다음과 같은 효과를 갖습니다: + + (*) 컴파일러가 barrier() 뒤의 액세스들이 barrier() 앞의 액세스보다 앞으로 + 재배치되지 못하게 합니다. 예를 들어, 인터럽트 핸들러 코드와 인터럽트 당한 + 코드 사이의 통신을 신중히 하기 위해 사용될 수 있습니다. + + (*) 루프에서, 컴파일러가 루프 조건에 사용된 변수를 매 이터레이션마다 + 메모리에서 로드하지 않아도 되도록 최적화 하는걸 방지합니다. + +READ_ONCE() 와 WRITE_ONCE() 함수는 싱글 쓰레드 코드에서는 문제 없지만 동시성이 +있는 코드에서는 문제가 될 수 있는 모든 최적화를 막습니다. 이런 류의 최적화에 +대한 예를 몇가지 들어보면 다음과 같습니다: + + (*) 컴파일러는 같은 변수에 대한 로드와 스토어를 재배치 할 수 있고, 어떤 + 경우에는 CPU가 같은 변수로부터의 로드들을 재배치할 수도 있습니다. 이는 + 다음의 코드가: + + a[0] = x; + a[1] = x; + + x 의 예전 값이 a[1] 에, 새 값이 a[0] 에 있게 할 수 있다는 뜻입니다. + 컴파일러와 CPU가 이런 일을 못하게 하려면 다음과 같이 해야 합니다: + + a[0] = READ_ONCE(x); + a[1] = READ_ONCE(x); + + 즉, READ_ONCE() 와 WRITE_ONCE() 는 여러 CPU 에서 하나의 변수에 가해지는 + 액세스들에 캐시 일관성을 제공합니다. + + (*) 컴파일러는 같은 변수에 대한 연속적인 로드들을 병합할 수 있습니다. 그런 + 병합 작업으로 컴파일러는 다음의 코드를: + + while (tmp = a) + do_something_with(tmp); + + 다음과 같이, 싱글 쓰레드 코드에서는 말이 되지만 개발자의 의도와 전혀 맞지 + 않는 방향으로 "최적화" 할 수 있습니다: + + if (tmp = a) + for (;;) + do_something_with(tmp); + + 컴파일러가 이런 짓을 하지 못하게 하려면 READ_ONCE() 를 사용하세요: + + while (tmp = READ_ONCE(a)) + do_something_with(tmp); + + (*) 예컨대 레지스터 사용량이 많아 컴파일러가 모든 데이터를 레지스터에 담을 수 + 없는 경우, 컴파일러는 변수를 다시 로드할 수 있습니다. 따라서 컴파일러는 + 앞의 예에서 변수 'tmp' 사용을 최적화로 없애버릴 수 있습니다: + + while (tmp = a) + do_something_with(tmp); + + 이 코드는 다음과 같이 싱글 쓰레드에서는 완벽하지만 동시성이 존재하는 + 경우엔 치명적인 코드로 바뀔 수 있습니다: + + while (a) + do_something_with(a); + + 예를 들어, 최적화된 이 코드는 변수 a 가 다른 CPU 에 의해 "while" 문과 + do_something_with() 호출 사이에 바뀌어 do_something_with() 에 0을 넘길 + 수도 있습니다. + + 이번에도, 컴파일러가 그런 짓을 하는걸 막기 위해 READ_ONCE() 를 사용하세요: + + while (tmp = READ_ONCE(a)) + do_something_with(tmp); + + 레지스터가 부족한 상황을 겪는 경우, 컴파일러는 tmp 를 스택에 저장해둘 수도 + 있습니다. 컴파일러가 변수를 다시 읽어들이는건 이렇게 저장해두고 후에 다시 + 읽어들이는데 드는 오버헤드 때문입니다. 그렇게 하는게 싱글 쓰레드 + 코드에서는 안전하므로, 안전하지 않은 경우에는 컴파일러에게 직접 알려줘야 + 합니다. + + (*) 컴파일러는 그 값이 무엇일지 알고 있다면 로드를 아예 안할 수도 있습니다. + 예를 들어, 다음의 코드는 변수 'a' 의 값이 항상 0임을 증명할 수 있다면: + + while (tmp = a) + do_something_with(tmp); + + 이렇게 최적화 되어버릴 수 있습니다: + + do { } while (0); + + 이 변환은 싱글 쓰레드 코드에서는 도움이 되는데 로드와 브랜치를 제거했기 + 때문입니다. 문제는 컴파일러가 'a' 의 값을 업데이트 하는건 현재의 CPU 하나 + 뿐이라는 가정 위에서 증명을 했다는데 있습니다. 만약 변수 'a' 가 공유되어 + 있다면, 컴파일러의 증명은 틀린 것이 될겁니다. 컴파일러는 그 자신이 + 생각하는 것만큼 많은 것을 알고 있지 못함을 컴파일러에게 알리기 위해 + READ_ONCE() 를 사용하세요: + + while (tmp = READ_ONCE(a)) + do_something_with(tmp); + + 하지만 컴파일러는 READ_ONCE() 뒤에 나오는 값에 대해서도 눈길을 두고 있음을 + 기억하세요. 예를 들어, 다음의 코드에서 MAX 는 전처리기 매크로로, 1의 값을 + 갖는다고 해봅시다: + + while ((tmp = READ_ONCE(a)) % MAX) + do_something_with(tmp); + + 이렇게 되면 컴파일러는 MAX 를 가지고 수행되는 "%" 오퍼레이터의 결과가 항상 + 0이라는 것을 알게 되고, 컴파일러가 코드를 실질적으로는 존재하지 않는 + 것처럼 최적화 하는 것이 허용되어 버립니다. ('a' 변수의 로드는 여전히 + 행해질 겁니다.) + + (*) 비슷하게, 컴파일러는 변수가 저장하려 하는 값을 이미 가지고 있다는 것을 + 알면 스토어 자체를 제거할 수 있습니다. 이번에도, 컴파일러는 현재의 CPU + 만이 그 변수에 값을 쓰는 오로지 하나의 존재라고 생각하여 공유된 변수에 + 대해서는 잘못된 일을 하게 됩니다. 예를 들어, 다음과 같은 경우가 있을 수 + 있습니다: + + a = 0; + ... 변수 a 에 스토어를 하지 않는 코드 ... + a = 0; + + 컴파일러는 변수 'a' 의 값은 이미 0이라는 것을 알고, 따라서 두번째 스토어를 + 삭제할 겁니다. 만약 다른 CPU 가 그 사이 변수 'a' 에 다른 값을 썼다면 + 황당한 결과가 나올 겁니다. + + 컴파일러가 그런 잘못된 추측을 하지 않도록 WRITE_ONCE() 를 사용하세요: + + WRITE_ONCE(a, 0); + ... 변수 a 에 스토어를 하지 않는 코드 ... + WRITE_ONCE(a, 0); + + (*) 컴파일러는 하지 말라고 하지 않으면 메모리 액세스들을 재배치 할 수 + 있습니다. 예를 들어, 다음의 프로세스 레벨 코드와 인터럽트 핸들러 사이의 + 상호작용을 생각해 봅시다: + + void process_level(void) + { + msg = get_message(); + flag = true; + } + + void interrupt_handler(void) + { + if (flag) + process_message(msg); + } + + 이 코드에는 컴파일러가 process_level() 을 다음과 같이 변환하는 것을 막을 + 수단이 없고, 이런 변환은 싱글쓰레드에서라면 실제로 훌륭한 선택일 수 + 있습니다: + + void process_level(void) + { + flag = true; + msg = get_message(); + } + + 이 두개의 문장 사이에 인터럽트가 발생한다면, interrupt_handler() 는 의미를 + 알 수 없는 메세지를 받을 수도 있습니다. 이걸 막기 위해 다음과 같이 + WRITE_ONCE() 를 사용하세요: + + void process_level(void) + { + WRITE_ONCE(msg, get_message()); + WRITE_ONCE(flag, true); + } + + void interrupt_handler(void) + { + if (READ_ONCE(flag)) + process_message(READ_ONCE(msg)); + } + + interrupt_handler() 안에서도 중첩된 인터럽트나 NMI 와 같이 인터럽트 핸들러 + 역시 'flag' 와 'msg' 에 접근하는 또다른 무언가에 인터럽트 될 수 있다면 + READ_ONCE() 와 WRITE_ONCE() 를 사용해야 함을 기억해 두세요. 만약 그런 + 가능성이 없다면, interrupt_handler() 안에서는 문서화 목적이 아니라면 + READ_ONCE() 와 WRITE_ONCE() 는 필요치 않습니다. (근래의 리눅스 커널에서 + 중첩된 인터럽트는 보통 잘 일어나지 않음도 기억해 두세요, 실제로, 어떤 + 인터럽트 핸들러가 인터럽트가 활성화된 채로 리턴하면 WARN_ONCE() 가 + 실행됩니다.) + + 컴파일러는 READ_ONCE() 와 WRITE_ONCE() 뒤의 READ_ONCE() 나 WRITE_ONCE(), + barrier(), 또는 비슷한 것들을 담고 있지 않은 코드를 움직일 수 있을 것으로 + 가정되어야 합니다. + + 이 효과는 barrier() 를 통해서도 만들 수 있지만, READ_ONCE() 와 + WRITE_ONCE() 가 좀 더 안목 높은 선택입니다: READ_ONCE() 와 WRITE_ONCE()는 + 컴파일러에 주어진 메모리 영역에 대해서만 최적화 가능성을 포기하도록 + 하지만, barrier() 는 컴파일러가 지금까지 기계의 레지스터에 캐시해 놓은 + 모든 메모리 영역의 값을 버려야 하게 하기 때문입니다. 물론, 컴파일러는 + READ_ONCE() 와 WRITE_ONCE() 가 일어난 순서도 지켜줍니다, CPU 는 당연히 + 그 순서를 지킬 의무가 없지만요. + + (*) 컴파일러는 다음의 예에서와 같이 변수에의 스토어를 날조해낼 수도 있습니다: + + if (a) + b = a; + else + b = 42; + + 컴파일러는 아래와 같은 최적화로 브랜치를 줄일 겁니다: + + b = 42; + if (a) + b = a; + + 싱글 쓰레드 코드에서 이 최적화는 안전할 뿐 아니라 브랜치 갯수를 + 줄여줍니다. 하지만 안타깝게도, 동시성이 있는 코드에서는 이 최적화는 다른 + CPU 가 'b' 를 로드할 때, -- 'a' 가 0이 아닌데도 -- 가짜인 값, 42를 보게 + 되는 경우를 가능하게 합니다. 이걸 방지하기 위해 WRITE_ONCE() 를 + 사용하세요: + + if (a) + WRITE_ONCE(b, a); + else + WRITE_ONCE(b, 42); + + 컴파일러는 로드를 만들어낼 수도 있습니다. 일반적으로는 문제를 일으키지 + 않지만, 캐시 라인 바운싱을 일으켜 성능과 확장성을 떨어뜨릴 수 있습니다. + 날조된 로드를 막기 위해선 READ_ONCE() 를 사용하세요. + + (*) 정렬된 메모리 주소에 위치한, 한번의 메모리 참조 인스트럭션으로 액세스 + 가능한 크기의 데이터는 하나의 큰 액세스가 여러개의 작은 액세스들로 + 대체되는 "로드 티어링(load tearing)" 과 "스토어 티어링(store tearing)" 을 + 방지합니다. 예를 들어, 주어진 아키텍쳐가 7-bit imeediate field 를 갖는 + 16-bit 스토어 인스트럭션을 제공한다면, 컴파일러는 다음의 32-bit 스토어를 + 구현하는데에 두개의 16-bit store-immediate 명령을 사용하려 할겁니다: + + p = 0x00010002; + + 스토어 할 상수를 만들고 그 값을 스토어 하기 위해 두개가 넘는 인스트럭션을 + 사용하게 되는, 이런 종류의 최적화를 GCC 는 실제로 함을 부디 알아 두십시오. + 이 최적화는 싱글 쓰레드 코드에서는 성공적인 최적화 입니다. 실제로, 근래에 + 발생한 (그리고 고쳐진) 버그는 GCC 가 volatile 스토어에 비정상적으로 이 + 최적화를 사용하게 했습니다. 그런 버그가 없다면, 다음의 예에서 + WRITE_ONCE() 의 사용은 스토어 티어링을 방지합니다: + + WRITE_ONCE(p, 0x00010002); + + Packed 구조체의 사용 역시 다음의 예처럼 로드 / 스토어 티어링을 유발할 수 + 있습니다: + + struct __attribute__((__packed__)) foo { + short a; + int b; + short c; + }; + struct foo foo1, foo2; + ... + + foo2.a = foo1.a; + foo2.b = foo1.b; + foo2.c = foo1.c; + + READ_ONCE() 나 WRITE_ONCE() 도 없고 volatile 마킹도 없기 때문에, + 컴파일러는 이 세개의 대입문을 두개의 32-bit 로드와 두개의 32-bit 스토어로 + 변환할 수 있습니다. 이는 'foo1.b' 의 값의 로드 티어링과 'foo2.b' 의 + 스토어 티어링을 초래할 겁니다. 이 예에서도 READ_ONCE() 와 WRITE_ONCE() + 가 티어링을 막을 수 있습니다: + + foo2.a = foo1.a; + WRITE_ONCE(foo2.b, READ_ONCE(foo1.b)); + foo2.c = foo1.c; + +그렇지만, volatile 로 마크된 변수에 대해서는 READ_ONCE() 와 WRITE_ONCE() 가 +필요치 않습니다. 예를 들어, 'jiffies' 는 volatile 로 마크되어 있기 때문에, +READ_ONCE(jiffies) 라고 할 필요가 없습니다. READ_ONCE() 와 WRITE_ONCE() 가 +실은 volatile 캐스팅으로 구현되어 있어서 인자가 이미 volatile 로 마크되어 +있다면 또다른 효과를 내지는 않기 때문입니다. + +이 컴파일러 배리어들은 CPU 에는 직접적 효과를 전혀 만들지 않기 때문에, 결국은 +재배치가 일어날 수도 있음을 부디 기억해 두십시오. + + +CPU 메모리 배리어 +----------------- + +리눅스 커널은 다음의 여덟개 기본 CPU 메모리 배리어를 가지고 있습니다: + + TYPE MANDATORY SMP CONDITIONAL + =============== ======================= =========================== + 범용 mb() smp_mb() + 쓰기 wmb() smp_wmb() + 읽기 rmb() smp_rmb() + 데이터 의존성 read_barrier_depends() smp_read_barrier_depends() + + +데이터 의존성 배리어를 제외한 모든 메모리 배리어는 컴파일러 배리어를 +포함합니다. 데이터 의존성은 컴파일러에의 추가적인 순서 보장을 포함하지 +않습니다. + +방백: 데이터 의존성이 있는 경우, 컴파일러는 해당 로드를 올바른 순서로 일으킬 +것으로 (예: `a[b]` 는 a[b] 를 로드 하기 전에 b 의 값을 먼저 로드한다) +기대되지만, C 언어 사양에는 컴파일러가 b 의 값을 추측 (예: 1 과 같음) 해서 +b 로드 전에 a 로드를 하는 코드 (예: tmp = a[1]; if (b != 1) tmp = a[b]; ) 를 +만들지 않아야 한다는 내용 같은 건 없습니다. 또한 컴파일러는 a[b] 를 로드한 +후에 b 를 또다시 로드할 수도 있어서, a[b] 보다 최신 버전의 b 값을 가질 수도 +있습니다. 이런 문제들의 해결책에 대한 의견 일치는 아직 없습니다만, 일단 +READ_ONCE() 매크로부터 보기 시작하는게 좋은 시작이 될겁니다. + +SMP 메모리 배리어들은 유니프로세서로 컴파일된 시스템에서는 컴파일러 배리어로 +바뀌는데, 하나의 CPU 는 스스로 일관성을 유지하고, 겹치는 액세스들 역시 올바른 +순서로 행해질 것으로 생각되기 때문입니다. 하지만, 아래의 "Virtual Machine +Guests" 서브섹션을 참고하십시오. + +[!] SMP 시스템에서 공유메모리로의 접근들을 순서 세워야 할 때, SMP 메모리 +배리어는 _반드시_ 사용되어야 함을 기억하세요, 그대신 락을 사용하는 것으로도 +충분하긴 하지만 말이죠. + +Mandatory 배리어들은 SMP 시스템에서도 UP 시스템에서도 SMP 효과만 통제하기에는 +불필요한 오버헤드를 갖기 때문에 SMP 효과만 통제하면 되는 곳에는 사용되지 않아야 +합니다. 하지만, 느슨한 순서 규칙의 메모리 I/O 윈도우를 통한 MMIO 의 효과를 +통제할 때에는 mandatory 배리어들이 사용될 수 있습니다. 이 배리어들은 +컴파일러와 CPU 모두 재배치를 못하도록 함으로써 메모리 오퍼레이션들이 디바이스에 +보여지는 순서에도 영향을 주기 때문에, SMP 가 아닌 시스템이라 할지라도 필요할 수 +있습니다. + + +일부 고급 배리어 함수들도 있습니다: + + (*) smp_store_mb(var, value) + + 이 함수는 특정 변수에 특정 값을 대입하고 범용 메모리 배리어를 칩니다. + UP 컴파일에서는 컴파일러 배리어보다 더한 것을 친다고는 보장되지 않습니다. + + + (*) smp_mb__before_atomic(); + (*) smp_mb__after_atomic(); + + 이것들은 값을 리턴하지 않는 (더하기, 빼기, 증가, 감소와 같은) 어토믹 + 함수들을 위한, 특히 그것들이 레퍼런스 카운팅에 사용될 때를 위한 + 함수들입니다. 이 함수들은 메모리 배리어를 내포하고 있지는 않습니다. + + 이것들은 값을 리턴하지 않으며 어토믹한 (set_bit 과 clear_bit 같은) 비트 + 연산에도 사용될 수 있습니다. + + 한 예로, 객체 하나를 무효한 것으로 표시하고 그 객체의 레퍼런스 카운트를 + 감소시키는 다음 코드를 보세요: + + obj->dead = 1; + smp_mb__before_atomic(); + atomic_dec(&obj->ref_count); + + 이 코드는 객체의 업데이트된 death 마크가 레퍼런스 카운터 감소 동작 + *전에* 보일 것을 보장합니다. + + 더 많은 정보를 위해선 Documentation/atomic_ops.txt 문서를 참고하세요. + 어디서 이것들을 사용해야 할지 궁금하다면 "어토믹 오퍼레이션" 서브섹션을 + 참고하세요. + + + (*) lockless_dereference(); + + 이 함수는 smp_read_barrier_depends() 데이터 의존성 배리어를 사용하는 + 포인터 읽어오기 래퍼(wrapper) 함수로 생각될 수 있습니다. + + 객체의 라이프타임이 RCU 외의 메커니즘으로 관리된다는 점을 제외하면 + rcu_dereference() 와도 유사한데, 예를 들면 객체가 시스템이 꺼질 때에만 + 제거되는 경우 등입니다. 또한, lockless_dereference() 은 RCU 와 함께 + 사용될수도, RCU 없이 사용될 수도 있는 일부 데이터 구조에 사용되고 + 있습니다. + + + (*) dma_wmb(); + (*) dma_rmb(); + + 이것들은 CPU 와 DMA 가능한 디바이스에서 모두 액세스 가능한 공유 메모리의 + 읽기, 쓰기 작업들의 순서를 보장하기 위해 consistent memory 에서 사용하기 + 위한 것들입니다. + + 예를 들어, 디바이스와 메모리를 공유하며, 디스크립터 상태 값을 사용해 + 디스크립터가 디바이스에 속해 있는지 아니면 CPU 에 속해 있는지 표시하고, + 공지용 초인종(doorbell) 을 사용해 업데이트된 디스크립터가 디바이스에 사용 + 가능해졌음을 공지하는 디바이스 드라이버를 생각해 봅시다: + + if (desc->status != DEVICE_OWN) { + /* 디스크립터를 소유하기 전에는 데이터를 읽지 않음 */ + dma_rmb(); + + /* 데이터를 읽고 씀 */ + read_data = desc->data; + desc->data = write_data; + + /* 상태 업데이트 전 수정사항을 반영 */ + dma_wmb(); + + /* 소유권을 수정 */ + desc->status = DEVICE_OWN; + + /* MMIO 를 통해 디바이스에 공지를 하기 전에 메모리를 동기화 */ + wmb(); + + /* 업데이트된 디스크립터의 디바이스에 공지 */ + writel(DESC_NOTIFY, doorbell); + } + + dma_rmb() 는 디스크립터로부터 데이터를 읽어오기 전에 디바이스가 소유권을 + 내놓았음을 보장하게 하고, dma_wmb() 는 디바이스가 자신이 소유권을 다시 + 가졌음을 보기 전에 디스크립터에 데이터가 쓰였음을 보장합니다. wmb() 는 + 캐시 일관성이 없는 (cache incoherent) MMIO 영역에 쓰기를 시도하기 전에 + 캐시 일관성이 있는 메모리 (cache coherent memory) 쓰기가 완료되었음을 + 보장해주기 위해 필요합니다. + + consistent memory 에 대한 자세한 내용을 위해선 Documentation/DMA-API.txt + 문서를 참고하세요. + + +MMIO 쓰기 배리어 +---------------- + +리눅스 커널은 또한 memory-mapped I/O 쓰기를 위한 특별한 배리어도 가지고 +있습니다: + + mmiowb(); + +이것은 mandatory 쓰기 배리어의 변종으로, 완화된 순서 규칙의 I/O 영역에으로의 +쓰기가 부분적으로 순서를 맞추도록 해줍니다. 이 함수는 CPU->하드웨어 사이를 +넘어서 실제 하드웨어에까지 일부 수준의 영향을 끼칩니다. + +더 많은 정보를 위해선 "Acquire vs I/O 액세스" 서브섹션을 참고하세요. + + +========================= +암묵적 커널 메모리 배리어 +========================= + +리눅스 커널의 일부 함수들은 메모리 배리어를 내장하고 있는데, 락(lock)과 +스케쥴링 관련 함수들이 대부분입니다. + +여기선 _최소한의_ 보장을 설명합니다; 특정 아키텍쳐에서는 이 설명보다 더 많은 +보장을 제공할 수도 있습니다만 해당 아키텍쳐에 종속적인 코드 외의 부분에서는 +그런 보장을 기대해선 안될겁니다. + + +락 ACQUISITION 함수 +------------------- + +리눅스 커널은 다양한 락 구성체를 가지고 있습니다: + + (*) 스핀 락 + (*) R/W 스핀 락 + (*) 뮤텍스 + (*) 세마포어 + (*) R/W 세마포어 + +각 구성체마다 모든 경우에 "ACQUIRE" 오퍼레이션과 "RELEASE" 오퍼레이션의 변종이 +존재합니다. 이 오퍼레이션들은 모두 적절한 배리어를 내포하고 있습니다: + + (1) ACQUIRE 오퍼레이션의 영향: + + ACQUIRE 뒤에서 요청된 메모리 오퍼레이션은 ACQUIRE 오퍼레이션이 완료된 + 뒤에 완료됩니다. + + ACQUIRE 앞에서 요청된 메모리 오퍼레이션은 ACQUIRE 오퍼레이션이 완료된 후에 + 완료될 수 있습니다. smp_mb__before_spinlock() 뒤에 ACQUIRE 가 실행되는 + 코드 블록은 블록 앞의 스토어를 블록 뒤의 로드와 스토어에 대해 순서 + 맞춥니다. 이건 smp_mb() 보다 완화된 것임을 기억하세요! 많은 아키텍쳐에서 + smp_mb__before_spinlock() 은 사실 아무일도 하지 않습니다. + + (2) RELEASE 오퍼레이션의 영향: + + RELEASE 앞에서 요청된 메모리 오퍼레이션은 RELEASE 오퍼레이션이 완료되기 + 전에 완료됩니다. + + RELEASE 뒤에서 요청된 메모리 오퍼레이션은 RELEASE 오퍼레이션 완료 전에 + 완료될 수 있습니다. + + (3) ACQUIRE vs ACQUIRE 영향: + + 어떤 ACQUIRE 오퍼레이션보다 앞에서 요청된 모든 ACQUIRE 오퍼레이션은 그 + ACQUIRE 오퍼레이션 전에 완료됩니다. + + (4) ACQUIRE vs RELEASE implication: + + 어떤 RELEASE 오퍼레이션보다 앞서 요청된 ACQUIRE 오퍼레이션은 그 RELEASE + 오퍼레이션보다 먼저 완료됩니다. + + (5) 실패한 조건적 ACQUIRE 영향: + + ACQUIRE 오퍼레이션의 일부 락(lock) 변종은 락이 곧바로 획득하기에는 + 불가능한 상태이거나 락이 획득 가능해지도록 기다리는 도중 시그널을 받거나 + 해서 실패할 수 있습니다. 실패한 락은 어떤 배리어도 내포하지 않습니다. + +[!] 참고: 락 ACQUIRE 와 RELEASE 가 단방향 배리어여서 나타나는 현상 중 하나는 +크리티컬 섹션 바깥의 인스트럭션의 영향이 크리티컬 섹션 내부로도 들어올 수 +있다는 것입니다. + +RELEASE 후에 요청되는 ACQUIRE 는 전체 메모리 배리어라 여겨지면 안되는데, +ACQUIRE 앞의 액세스가 ACQUIRE 후에 수행될 수 있고, RELEASE 후의 액세스가 +RELEASE 전에 수행될 수도 있으며, 그 두개의 액세스가 서로를 지나칠 수도 있기 +때문입니다: + + *A = a; + ACQUIRE M + RELEASE M + *B = b; + +는 다음과 같이 될 수도 있습니다: + + ACQUIRE M, STORE *B, STORE *A, RELEASE M + +ACQUIRE 와 RELEASE 가 락 획득과 해제라면, 그리고 락의 ACQUIRE 와 RELEASE 가 +같은 락 변수에 대한 것이라면, 해당 락을 쥐고 있지 않은 다른 CPU 의 시야에는 +이와 같은 재배치가 일어나는 것으로 보일 수 있습니다. 요약하자면, ACQUIRE 에 +이어 RELEASE 오퍼레이션을 순차적으로 실행하는 행위가 전체 메모리 배리어로 +생각되어선 -안됩니다-. + +비슷하게, 앞의 반대 케이스인 RELEASE 와 ACQUIRE 두개 오퍼레이션의 순차적 실행 +역시 전체 메모리 배리어를 내포하지 않습니다. 따라서, RELEASE, ACQUIRE 로 +규정되는 크리티컬 섹션의 CPU 수행은 RELEASE 와 ACQUIRE 를 가로지를 수 있으므로, +다음과 같은 코드는: + + *A = a; + RELEASE M + ACQUIRE N + *B = b; + +다음과 같이 수행될 수 있습니다: + + ACQUIRE N, STORE *B, STORE *A, RELEASE M + +이런 재배치는 데드락을 일으킬 수도 있을 것처럼 보일 수 있습니다. 하지만, 그런 +데드락의 조짐이 있다면 RELEASE 는 단순히 완료될 것이므로 데드락은 존재할 수 +없습니다. + + 이게 어떻게 올바른 동작을 할 수 있을까요? + + 우리가 이야기 하고 있는건 재배치를 하는 CPU 에 대한 이야기이지, + 컴파일러에 대한 것이 아니란 점이 핵심입니다. 컴파일러 (또는, 개발자) + 가 오퍼레이션들을 이렇게 재배치하면, 데드락이 일어날 수 -있습-니다. + + 하지만 CPU 가 오퍼레이션들을 재배치 했다는걸 생각해 보세요. 이 예에서, + 어셈블리 코드 상으로는 언락이 락을 앞서게 되어 있습니다. CPU 가 이를 + 재배치해서 뒤의 락 오퍼레이션을 먼저 실행하게 됩니다. 만약 데드락이 + 존재한다면, 이 락 오퍼레이션은 그저 스핀을 하며 계속해서 락을 + 시도합니다 (또는, 한참 후에겠지만, 잠듭니다). CPU 는 언젠가는 + (어셈블리 코드에서는 락을 앞서는) 언락 오퍼레이션을 실행하는데, 이 언락 + 오퍼레이션이 잠재적 데드락을 해결하고, 락 오퍼레이션도 뒤이어 성공하게 + 됩니다. + + 하지만 만약 락이 잠을 자는 타입이었다면요? 그런 경우에 코드는 + 스케쥴러로 들어가려 할 거고, 여기서 결국은 메모리 배리어를 만나게 + 되는데, 이 메모리 배리어는 앞의 언락 오퍼레이션이 완료되도록 만들고, + 데드락은 이번에도 해결됩니다. 잠을 자는 행위와 언락 사이의 경주 상황 + (race) 도 있을 수 있겠습니다만, 락 관련 기능들은 그런 경주 상황을 모든 + 경우에 제대로 해결할 수 있어야 합니다. + +락과 세마포어는 UP 컴파일된 시스템에서의 순서에 대해 보장을 하지 않기 때문에, +그런 상황에서 인터럽트 비활성화 오퍼레이션과 함께가 아니라면 어떤 일에도 - 특히 +I/O 액세스와 관련해서는 - 제대로 사용될 수 없을 겁니다. + +"CPU 간 ACQUIRING 배리어 효과" 섹션도 참고하시기 바랍니다. + + +예를 들어, 다음과 같은 코드를 생각해 봅시다: + + *A = a; + *B = b; + ACQUIRE + *C = c; + *D = d; + RELEASE + *E = e; + *F = f; + +여기선 다음의 이벤트 시퀀스가 생길 수 있습니다: + + ACQUIRE, {*F,*A}, *E, {*C,*D}, *B, RELEASE + + [+] {*F,*A} 는 조합된 액세스를 의미합니다. + +하지만 다음과 같은 건 불가능하죠: + + {*F,*A}, *B, ACQUIRE, *C, *D, RELEASE, *E + *A, *B, *C, ACQUIRE, *D, RELEASE, *E, *F + *A, *B, ACQUIRE, *C, RELEASE, *D, *E, *F + *B, ACQUIRE, *C, *D, RELEASE, {*F,*A}, *E + + + +인터럽트 비활성화 함수 +---------------------- + +인터럽트를 비활성화 하는 함수 (ACQUIRE 와 동일) 와 인터럽트를 활성화 하는 함수 +(RELEASE 와 동일) 는 컴파일러 배리어처럼만 동작합니다. 따라서, 별도의 메모리 +배리어나 I/O 배리어가 필요한 상황이라면 그 배리어들은 인터럽트 비활성화 함수 +외의 방법으로 제공되어야만 합니다. + + +슬립과 웨이크업 함수 +-------------------- + +글로벌 데이터에 표시된 이벤트에 의해 프로세스를 잠에 빠트리는 것과 깨우는 것은 +해당 이벤트를 기다리는 태스크의 태스크 상태와 그 이벤트를 알리기 위해 사용되는 +글로벌 데이터, 두 데이터간의 상호작용으로 볼 수 있습니다. 이것이 옳은 순서대로 +일어남을 분명히 하기 위해, 프로세스를 잠에 들게 하는 기능과 깨우는 기능은 +몇가지 배리어를 내포합니다. + +먼저, 잠을 재우는 쪽은 일반적으로 다음과 같은 이벤트 시퀀스를 따릅니다: + + for (;;) { + set_current_state(TASK_UNINTERRUPTIBLE); + if (event_indicated) + break; + schedule(); + } + +set_current_state() 에 의해, 태스크 상태가 바뀐 후 범용 메모리 배리어가 +자동으로 삽입됩니다: + + CPU 1 + =============================== + set_current_state(); + smp_store_mb(); + STORE current->state + <범용 배리어> + LOAD event_indicated + +set_current_state() 는 다음의 것들로 감싸질 수도 있습니다: + + prepare_to_wait(); + prepare_to_wait_exclusive(); + +이것들 역시 상태를 설정한 후 범용 메모리 배리어를 삽입합니다. +앞의 전체 시퀀스는 다음과 같은 함수들로 한번에 수행 가능한데, 이것들은 모두 +올바른 장소에 메모리 배리어를 삽입합니다: + + wait_event(); + wait_event_interruptible(); + wait_event_interruptible_exclusive(); + wait_event_interruptible_timeout(); + wait_event_killable(); + wait_event_timeout(); + wait_on_bit(); + wait_on_bit_lock(); + + +두번째로, 깨우기를 수행하는 코드는 일반적으로 다음과 같을 겁니다: + + event_indicated = 1; + wake_up(&event_wait_queue); + +또는: + + event_indicated = 1; + wake_up_process(event_daemon); + +wake_up() 류에 의해 쓰기 메모리 배리어가 내포됩니다. 만약 그것들이 뭔가를 +깨운다면요. 이 배리어는 태스크 상태가 지워지기 전에 수행되므로, 이벤트를 +알리기 위한 STORE 와 태스크 상태를 TASK_RUNNING 으로 설정하는 STORE 사이에 +위치하게 됩니다. + + CPU 1 CPU 2 + =============================== =============================== + set_current_state(); STORE event_indicated + smp_store_mb(); wake_up(); + STORE current->state <쓰기 배리어> + <범용 배리어> STORE current->state + LOAD event_indicated + +한번더 말합니다만, 이 쓰기 메모리 배리어는 이 코드가 정말로 뭔가를 깨울 때에만 +실행됩니다. 이걸 설명하기 위해, X 와 Y 는 모두 0 으로 초기화 되어 있다는 가정 +하에 아래의 이벤트 시퀀스를 생각해 봅시다: + + CPU 1 CPU 2 + =============================== =============================== + X = 1; STORE event_indicated + smp_mb(); wake_up(); + Y = 1; wait_event(wq, Y == 1); + wake_up(); load from Y sees 1, no memory barrier + load from X might see 0 + +위 예제에서의 경우와 달리 깨우기가 정말로 행해졌다면, CPU 2 의 X 로드는 1 을 +본다고 보장될 수 있을 겁니다. + +사용 가능한 깨우기류 함수들로 다음과 같은 것들이 있습니다: + + complete(); + wake_up(); + wake_up_all(); + wake_up_bit(); + wake_up_interruptible(); + wake_up_interruptible_all(); + wake_up_interruptible_nr(); + wake_up_interruptible_poll(); + wake_up_interruptible_sync(); + wake_up_interruptible_sync_poll(); + wake_up_locked(); + wake_up_locked_poll(); + wake_up_nr(); + wake_up_poll(); + wake_up_process(); + + +[!] 잠재우는 코드와 깨우는 코드에 내포되는 메모리 배리어들은 깨우기 전에 +이루어진 스토어를 잠재우는 코드가 set_current_state() 를 호출한 후에 행하는 +로드에 대해 순서를 맞추지 _않는다는_ 점을 기억하세요. 예를 들어, 잠재우는 +코드가 다음과 같고: + + set_current_state(TASK_INTERRUPTIBLE); + if (event_indicated) + break; + __set_current_state(TASK_RUNNING); + do_something(my_data); + +깨우는 코드는 다음과 같다면: + + my_data = value; + event_indicated = 1; + wake_up(&event_wait_queue); + +event_indecated 에의 변경이 잠재우는 코드에게 my_data 에의 변경 후에 이루어진 +것으로 인지될 것이라는 보장이 없습니다. 이런 경우에는 양쪽 코드 모두 각각의 +데이터 액세스 사이에 메모리 배리어를 직접 쳐야 합니다. 따라서 앞의 재우는 +코드는 다음과 같이: + + set_current_state(TASK_INTERRUPTIBLE); + if (event_indicated) { + smp_rmb(); + do_something(my_data); + } + +그리고 깨우는 코드는 다음과 같이 되어야 합니다: + + my_data = value; + smp_wmb(); + event_indicated = 1; + wake_up(&event_wait_queue); + + +그외의 함수들 +------------- + +그외의 배리어를 내포하는 함수들은 다음과 같습니다: + + (*) schedule() 과 그 유사한 것들이 완전한 메모리 배리어를 내포합니다. + + +============================== +CPU 간 ACQUIRING 배리어의 효과 +============================== + +SMP 시스템에서의 락 기능들은 더욱 강력한 형태의 배리어를 제공합니다: 이 +배리어는 동일한 락을 사용하는 다른 CPU 들의 메모리 액세스 순서에도 영향을 +끼칩니다. + + +ACQUIRE VS 메모리 액세스 +------------------------ + +다음의 예를 생각해 봅시다: 시스템은 두개의 스핀락 (M) 과 (Q), 그리고 세개의 CPU +를 가지고 있습니다; 여기에 다음의 이벤트 시퀀스가 발생합니다: + + CPU 1 CPU 2 + =============================== =============================== + WRITE_ONCE(*A, a); WRITE_ONCE(*E, e); + ACQUIRE M ACQUIRE Q + WRITE_ONCE(*B, b); WRITE_ONCE(*F, f); + WRITE_ONCE(*C, c); WRITE_ONCE(*G, g); + RELEASE M RELEASE Q + WRITE_ONCE(*D, d); WRITE_ONCE(*H, h); + +*A 로의 액세스부터 *H 로의 액세스까지가 어떤 순서로 CPU 3 에게 보여질지에 +대해서는 각 CPU 에서의 락 사용에 의해 내포되어 있는 제약을 제외하고는 어떤 +보장도 존재하지 않습니다. 예를 들어, CPU 3 에게 다음과 같은 순서로 보여지는 +것이 가능합니다: + + *E, ACQUIRE M, ACQUIRE Q, *G, *C, *F, *A, *B, RELEASE Q, *D, *H, RELEASE M + +하지만 다음과 같이 보이지는 않을 겁니다: + + *B, *C or *D preceding ACQUIRE M + *A, *B or *C following RELEASE M + *F, *G or *H preceding ACQUIRE Q + *E, *F or *G following RELEASE Q + + + +ACQUIRE VS I/O 액세스 +---------------------- + +특정한 (특히 NUMA 가 관련된) 환경 하에서 두개의 CPU 에서 동일한 스핀락으로 +보호되는 두개의 크리티컬 섹션 안의 I/O 액세스는 PCI 브릿지에 겹쳐진 I/O +액세스로 보일 수 있는데, PCI 브릿지는 캐시 일관성 프로토콜과 합을 맞춰야 할 +의무가 없으므로, 필요한 읽기 메모리 배리어가 요청되지 않기 때문입니다. + +예를 들어서: + + CPU 1 CPU 2 + =============================== =============================== + spin_lock(Q) + writel(0, ADDR) + writel(1, DATA); + spin_unlock(Q); + spin_lock(Q); + writel(4, ADDR); + writel(5, DATA); + spin_unlock(Q); + +는 PCI 브릿지에 다음과 같이 보일 수 있습니다: + + STORE *ADDR = 0, STORE *ADDR = 4, STORE *DATA = 1, STORE *DATA = 5 + +이렇게 되면 하드웨어의 오동작을 일으킬 수 있습니다. + + +이런 경우엔 잡아둔 스핀락을 내려놓기 전에 mmiowb() 를 수행해야 하는데, 예를 +들면 다음과 같습니다: + + CPU 1 CPU 2 + =============================== =============================== + spin_lock(Q) + writel(0, ADDR) + writel(1, DATA); + mmiowb(); + spin_unlock(Q); + spin_lock(Q); + writel(4, ADDR); + writel(5, DATA); + mmiowb(); + spin_unlock(Q); + +이 코드는 CPU 1 에서 요청된 두개의 스토어가 PCI 브릿지에 CPU 2 에서 요청된 +스토어들보다 먼저 보여짐을 보장합니다. + + +또한, 같은 디바이스에서 스토어를 이어 로드가 수행되면 이 로드는 로드가 수행되기 +전에 스토어가 완료되기를 강제하므로 mmiowb() 의 필요가 없어집니다: + + CPU 1 CPU 2 + =============================== =============================== + spin_lock(Q) + writel(0, ADDR) + a = readl(DATA); + spin_unlock(Q); + spin_lock(Q); + writel(4, ADDR); + b = readl(DATA); + spin_unlock(Q); + + +더 많은 정보를 위해선 Documenataion/DocBook/deviceiobook.tmpl 을 참고하세요. + + +========================= +메모리 배리어가 필요한 곳 +========================= + +설령 SMP 커널을 사용하더라도 싱글 쓰레드로 동작하는 코드는 올바르게 동작하는 +것으로 보여질 것이기 때문에, 평범한 시스템 운영중에 메모리 오퍼레이션 재배치는 +일반적으로 문제가 되지 않습니다. 하지만, 재배치가 문제가 _될 수 있는_ 네가지 +환경이 있습니다: + + (*) 프로세서간 상호 작용. + + (*) 어토믹 오퍼레이션. + + (*) 디바이스 액세스. + + (*) 인터럽트. + + +프로세서간 상호 작용 +-------------------- + +두개 이상의 프로세서를 가진 시스템이 있다면, 시스템의 두개 이상의 CPU 는 동시에 +같은 데이터에 대한 작업을 할 수 있습니다. 이는 동기화 문제를 일으킬 수 있고, +이 문제를 해결하는 일반적 방법은 락을 사용하는 것입니다. 하지만, 락은 상당히 +비용이 비싸서 가능하면 락을 사용하지 않고 일을 처리하는 것이 낫습니다. 이런 +경우, 두 CPU 모두에 영향을 끼치는 오퍼레이션들은 오동작을 막기 위해 신중하게 +순서가 맞춰져야 합니다. + +예를 들어, R/W 세마포어의 느린 수행경로 (slow path) 를 생각해 봅시다. +세마포어를 위해 대기를 하는 하나의 프로세스가 자신의 스택 중 일부를 이 +세마포어의 대기 프로세스 리스트에 링크한 채로 있습니다: + + struct rw_semaphore { + ... + spinlock_t lock; + struct list_head waiters; + }; + + struct rwsem_waiter { + struct list_head list; + struct task_struct *task; + }; + +특정 대기 상태 프로세스를 깨우기 위해, up_read() 나 up_write() 함수는 다음과 +같은 일을 합니다: + + (1) 다음 대기 상태 프로세스 레코드는 어디있는지 알기 위해 이 대기 상태 + 프로세스 레코드의 next 포인터를 읽습니다; + + (2) 이 대기 상태 프로세스의 task 구조체로의 포인터를 읽습니다; + + (3) 이 대기 상태 프로세스가 세마포어를 획득했음을 알리기 위해 task + 포인터를 초기화 합니다; + + (4) 해당 태스크에 대해 wake_up_process() 를 호출합니다; 그리고 + + (5) 해당 대기 상태 프로세스의 task 구조체를 잡고 있던 레퍼런스를 해제합니다. + +달리 말하자면, 다음 이벤트 시퀀스를 수행해야 합니다: + + LOAD waiter->list.next; + LOAD waiter->task; + STORE waiter->task; + CALL wakeup + RELEASE task + +그리고 이 이벤트들이 다른 순서로 수행된다면, 오동작이 일어날 수 있습니다. + +한번 세마포어의 대기줄에 들어갔고 세마포어 락을 놓았다면, 해당 대기 프로세스는 +락을 다시는 잡지 않습니다; 대신 자신의 task 포인터가 초기화 되길 기다립니다. +그 레코드는 대기 프로세스의 스택에 있기 때문에, 리스트의 next 포인터가 읽혀지기 +_전에_ task 포인터가 지워진다면, 다른 CPU 는 해당 대기 프로세스를 시작해 버리고 +up*() 함수가 next 포인터를 읽기 전에 대기 프로세스의 스택을 마구 건드릴 수 +있습니다. + +그렇게 되면 위의 이벤트 시퀀스에 어떤 일이 일어나는지 생각해 보죠: + + CPU 1 CPU 2 + =============================== =============================== + down_xxx() + Queue waiter + Sleep + up_yyy() + LOAD waiter->task; + STORE waiter->task; + Woken up by other event + <preempt> + Resume processing + down_xxx() returns + call foo() + foo() clobbers *waiter + </preempt> + LOAD waiter->list.next; + --- OOPS --- + +이 문제는 세마포어 락의 사용으로 해결될 수도 있겠지만, 그렇게 되면 깨어난 후에 +down_xxx() 함수가 불필요하게 스핀락을 또다시 얻어야만 합니다. + +이 문제를 해결하는 방법은 범용 SMP 메모리 배리어를 추가하는 겁니다: + + LOAD waiter->list.next; + LOAD waiter->task; + smp_mb(); + STORE waiter->task; + CALL wakeup + RELEASE task + +이 경우에, 배리어는 시스템의 나머지 CPU 들에게 모든 배리어 앞의 메모리 액세스가 +배리어 뒤의 메모리 액세스보다 앞서 일어난 것으로 보이게 만듭니다. 배리어 앞의 +메모리 액세스들이 배리어 명령 자체가 완료되는 시점까지 완료된다고는 보장하지 +_않습니다_. + +(이게 문제가 되지 않을) 단일 프로세서 시스템에서 smp_mb() 는 실제로는 그저 +컴파일러가 CPU 안에서의 순서를 바꾸거나 하지 않고 주어진 순서대로 명령을 +내리도록 하는 컴파일러 배리어일 뿐입니다. 오직 하나의 CPU 만 있으니, CPU 의 +의존성 순서 로직이 그 외의 모든것을 알아서 처리할 겁니다. + + +어토믹 오퍼레이션 +----------------- + +어토믹 오퍼레이션은 기술적으로 프로세서간 상호작용으로 분류되며 그 중 일부는 +전체 메모리 배리어를 내포하고 또 일부는 내포하지 않지만, 커널에서 상당히 +의존적으로 사용하는 기능 중 하나입니다. + +메모리의 어떤 상태를 수정하고 해당 상태에 대한 (예전의 또는 최신의) 정보를 +리턴하는 어토믹 오퍼레이션은 모두 SMP-조건적 범용 메모리 배리어(smp_mb())를 +실제 오퍼레이션의 앞과 뒤에 내포합니다. 이런 오퍼레이션은 다음의 것들을 +포함합니다: + + xchg(); + atomic_xchg(); atomic_long_xchg(); + atomic_inc_return(); atomic_long_inc_return(); + atomic_dec_return(); atomic_long_dec_return(); + atomic_add_return(); atomic_long_add_return(); + atomic_sub_return(); atomic_long_sub_return(); + atomic_inc_and_test(); atomic_long_inc_and_test(); + atomic_dec_and_test(); atomic_long_dec_and_test(); + atomic_sub_and_test(); atomic_long_sub_and_test(); + atomic_add_negative(); atomic_long_add_negative(); + test_and_set_bit(); + test_and_clear_bit(); + test_and_change_bit(); + + /* exchange 조건이 성공할 때 */ + cmpxchg(); + atomic_cmpxchg(); atomic_long_cmpxchg(); + atomic_add_unless(); atomic_long_add_unless(); + +이것들은 메모리 배리어 효과가 필요한 ACQUIRE 부류와 RELEASE 부류 오퍼레이션들을 +구현할 때, 그리고 객체 해제를 위해 레퍼런스 카운터를 조정할 때, 암묵적 메모리 +배리어 효과가 필요한 곳 등에 사용됩니다. + + +다음의 오퍼레이션들은 메모리 배리어를 내포하지 _않기_ 때문에 문제가 될 수 +있지만, RELEASE 부류의 오퍼레이션들과 같은 것들을 구현할 때 사용될 수도 +있습니다: + + atomic_set(); + set_bit(); + clear_bit(); + change_bit(); + +이것들을 사용할 때에는 필요하다면 적절한 (예를 들면 smp_mb__before_atomic() +같은) 메모리 배리어가 명시적으로 함께 사용되어야 합니다. + + +아래의 것들도 메모리 배리어를 내포하지 _않기_ 때문에, 일부 환경에서는 (예를 +들면 smp_mb__before_atomic() 과 같은) 명시적인 메모리 배리어 사용이 필요합니다. + + atomic_add(); + atomic_sub(); + atomic_inc(); + atomic_dec(); + +이것들이 통계 생성을 위해 사용된다면, 그리고 통계 데이터 사이에 관계가 존재하지 +않는다면 메모리 배리어는 필요치 않을 겁니다. + +객체의 수명을 관리하기 위해 레퍼런스 카운팅 목적으로 사용된다면, 레퍼런스 +카운터는 락으로 보호되는 섹션에서만 조정되거나 호출하는 쪽이 이미 충분한 +레퍼런스를 잡고 있을 것이기 때문에 메모리 배리어는 아마 필요 없을 겁니다. + +만약 어떤 락을 구성하기 위해 사용된다면, 락 관련 동작은 일반적으로 작업을 특정 +순서대로 진행해야 하므로 메모리 배리어가 필요할 수 있습니다. + +기본적으로, 각 사용처에서는 메모리 배리어가 필요한지 아닌지 충분히 고려해야 +합니다. + +아래의 오퍼레이션들은 특별한 락 관련 동작들입니다: + + test_and_set_bit_lock(); + clear_bit_unlock(); + __clear_bit_unlock(); + +이것들은 ACQUIRE 류와 RELEASE 류의 오퍼레이션들을 구현합니다. 락 관련 도구를 +구현할 때에는 이것들을 좀 더 선호하는 편이 나은데, 이것들의 구현은 많은 +아키텍쳐에서 최적화 될 수 있기 때문입니다. + +[!] 이런 상황에 사용할 수 있는 특수한 메모리 배리어 도구들이 있습니다만, 일부 +CPU 에서는 사용되는 어토믹 인스트럭션 자체에 메모리 배리어가 내포되어 있어서 +어토믹 오퍼레이션과 메모리 배리어를 함께 사용하는 게 불필요한 일이 될 수 +있는데, 그런 경우에 이 특수 메모리 배리어 도구들은 no-op 이 되어 실질적으로 +아무일도 하지 않습니다. + +더 많은 내용을 위해선 Documentation/atomic_ops.txt 를 참고하세요. + + +디바이스 액세스 +--------------- + +많은 디바이스가 메모리 매핑 기법으로 제어될 수 있는데, 그렇게 제어되는 +디바이스는 CPU 에는 단지 특정 메모리 영역의 집합처럼 보이게 됩니다. 드라이버는 +그런 디바이스를 제어하기 위해 정확히 올바른 순서로 올바른 메모리 액세스를 +만들어야 합니다. + +하지만, 액세스들을 재배치 하거나 조합하거나 병합하는게 더 효율적이라 판단하는 +영리한 CPU 나 컴파일러들을 사용하면 드라이버 코드의 조심스럽게 순서 맞춰진 +액세스들이 디바이스에는 요청된 순서대로 도착하지 못하게 할 수 있는 - 디바이스가 +오동작을 하게 할 - 잠재적 문제가 생길 수 있습니다. + +리눅스 커널 내부에서, I/O 는 어떻게 액세스들을 적절히 순차적이게 만들 수 있는지 +알고 있는, - inb() 나 writel() 과 같은 - 적절한 액세스 루틴을 통해 이루어져야만 +합니다. 이것들은 대부분의 경우에는 명시적 메모리 배리어 와 함께 사용될 필요가 +없습니다만, 다음의 두가지 상황에서는 명시적 메모리 배리어가 필요할 수 있습니다: + + (1) 일부 시스템에서 I/O 스토어는 모든 CPU 에 일관되게 순서 맞춰지지 않는데, + 따라서 _모든_ 일반적인 드라이버들에 락이 사용되어야만 하고 이 크리티컬 + 섹션을 빠져나오기 전에 mmiowb() 가 꼭 호출되어야 합니다. + + (2) 만약 액세스 함수들이 완화된 메모리 액세스 속성을 갖는 I/O 메모리 윈도우를 + 사용한다면, 순서를 강제하기 위해선 _mandatory_ 메모리 배리어가 필요합니다. + +더 많은 정보를 위해선 Documentation/DocBook/deviceiobook.tmpl 을 참고하십시오. + + +인터럽트 +-------- + +드라이버는 자신의 인터럽트 서비스 루틴에 의해 인터럽트 당할 수 있기 때문에 +드라이버의 이 두 부분은 서로의 디바이스 제어 또는 액세스 부분과 상호 간섭할 수 +있습니다. + +스스로에게 인터럽트 당하는 걸 불가능하게 하고, 드라이버의 크리티컬한 +오퍼레이션들을 모두 인터럽트가 불가능하게 된 영역에 집어넣거나 하는 방법 (락의 +한 형태) 으로 이런 상호 간섭을 - 최소한 부분적으로라도 - 줄일 수 있습니다. +드라이버의 인터럽트 루틴이 실행 중인 동안, 해당 드라이버의 코어는 같은 CPU 에서 +수행되지 않을 것이며, 현재의 인터럽트가 처리되는 중에는 또다시 인터럽트가 +일어나지 못하도록 되어 있으니 인터럽트 핸들러는 그에 대해서는 락을 잡지 않아도 +됩니다. + +하지만, 어드레스 레지스터와 데이터 레지스터를 갖는 이더넷 카드를 다루는 +드라이버를 생각해 봅시다. 만약 이 드라이버의 코어가 인터럽트를 비활성화시킨 +채로 이더넷 카드와 대화하고 드라이버의 인터럽트 핸들러가 호출되었다면: + + LOCAL IRQ DISABLE + writew(ADDR, 3); + writew(DATA, y); + LOCAL IRQ ENABLE + <interrupt> + writew(ADDR, 4); + q = readw(DATA); + </interrupt> + +만약 순서 규칙이 충분히 완화되어 있다면 데이터 레지스터에의 스토어는 어드레스 +레지스터에 두번째로 행해지는 스토어 뒤에 일어날 수도 있습니다: + + STORE *ADDR = 3, STORE *ADDR = 4, STORE *DATA = y, q = LOAD *DATA + + +만약 순서 규칙이 충분히 완화되어 있고 묵시적으로든 명시적으로든 배리어가 +사용되지 않았다면 인터럽트 비활성화 섹션에서 일어난 액세스가 바깥으로 새어서 +인터럽트 내에서 일어난 액세스와 섞일 수 있다고 - 그리고 그 반대도 - 가정해야만 +합니다. + +그런 영역 안에서 일어나는 I/O 액세스들은 엄격한 순서 규칙의 I/O 레지스터에 +묵시적 I/O 배리어를 형성하는 동기적 (synchronous) 로드 오퍼레이션을 포함하기 +때문에 일반적으로는 이런게 문제가 되지 않습니다. 만약 이걸로는 충분치 않다면 +mmiowb() 가 명시적으로 사용될 필요가 있습니다. + + +하나의 인터럽트 루틴과 별도의 CPU 에서 수행중이며 서로 통신을 하는 두 루틴 +사이에도 비슷한 상황이 일어날 수 있습니다. 만약 그런 경우가 발생할 가능성이 +있다면, 순서를 보장하기 위해 인터럽트 비활성화 락이 사용되어져야만 합니다. + + +====================== +커널 I/O 배리어의 효과 +====================== + +I/O 메모리에 액세스할 때, 드라이버는 적절한 액세스 함수를 사용해야 합니다: + + (*) inX(), outX(): + + 이것들은 메모리 공간보다는 I/O 공간에 이야기를 하려는 의도로 + 만들어졌습니다만, 그건 기본적으로 CPU 마다 다른 컨셉입니다. i386 과 + x86_64 프로세서들은 특별한 I/O 공간 액세스 사이클과 명령어를 실제로 가지고 + 있지만, 다른 많은 CPU 들에는 그런 컨셉이 존재하지 않습니다. + + 다른 것들 중에서도 PCI 버스가 I/O 공간 컨셉을 정의하는데, 이는 - i386 과 + x86_64 같은 CPU 에서 - CPU 의 I/O 공간 컨셉으로 쉽게 매치됩니다. 하지만, + 대체할 I/O 공간이 없는 CPU 에서는 CPU 의 메모리 맵의 가상 I/O 공간으로 + 매핑될 수도 있습니다. + + 이 공간으로의 액세스는 (i386 등에서는) 완전하게 동기화 됩니다만, 중간의 + (PCI 호스트 브리지와 같은) 브리지들은 이를 완전히 보장하진 않을수도 + 있습니다. + + 이것들의 상호간의 순서는 완전하게 보장됩니다. + + 다른 타입의 메모리 오퍼레이션, I/O 오퍼레이션에 대한 순서는 완전하게 + 보장되지는 않습니다. + + (*) readX(), writeX(): + + 이것들이 수행 요청되는 CPU 에서 서로에게 완전히 순서가 맞춰지고 독립적으로 + 수행되는지에 대한 보장 여부는 이들이 액세스 하는 메모리 윈도우에 정의된 + 특성에 의해 결정됩니다. 예를 들어, 최신의 i386 아키텍쳐 머신에서는 MTRR + 레지스터로 이 특성이 조정됩니다. + + 일반적으로는, 프리페치 (prefetch) 가능한 디바이스를 액세스 하는게 + 아니라면, 이것들은 완전히 순서가 맞춰지고 결합되지 않게 보장될 겁니다. + + 하지만, (PCI 브리지와 같은) 중간의 하드웨어는 자신이 원한다면 집행을 + 연기시킬 수 있습니다; 스토어 명령을 실제로 하드웨어로 내려보내기(flush) + 위해서는 같은 위치로부터 로드를 하는 방법이 있습니다만[*], PCI 의 경우는 + 같은 디바이스나 환경 구성 영역에서의 로드만으로도 충분할 겁니다. + + [*] 주의! 쓰여진 것과 같은 위치로부터의 로드를 시도하는 것은 오동작을 + 일으킬 수도 있습니다 - 예로 16650 Rx/Tx 시리얼 레지스터를 생각해 + 보세요. + + 프리페치 가능한 I/O 메모리가 사용되면, 스토어 명령들이 순서를 지키도록 + 하기 위해 mmiowb() 배리어가 필요할 수 있습니다. + + PCI 트랜잭션 사이의 상호작용에 대해 더 많은 정보를 위해선 PCI 명세서를 + 참고하시기 바랍니다. + + (*) readX_relaxed(), writeX_relaxed() + + 이것들은 readX() 와 writeX() 랑 비슷하지만, 더 완화된 메모리 순서 보장을 + 제공합니다. 구체적으로, 이것들은 일반적 메모리 액세스 (예: DMA 버퍼) 에도 + LOCK 이나 UNLOCK 오퍼레이션들에도 순서를 보장하지 않습니다. LOCK 이나 + UNLOCK 오퍼레이션들에 맞춰지는 순서가 필요하다면, mmiowb() 배리어가 사용될 + 수 있습니다. 같은 주변 장치에의 완화된 액세스끼리는 순서가 지켜짐을 알아 + 두시기 바랍니다. + + (*) ioreadX(), iowriteX() + + 이것들은 inX()/outX() 나 readX()/writeX() 처럼 실제로 수행하는 액세스의 + 종류에 따라 적절하게 수행될 것입니다. + + +=================================== +가정되는 가장 완화된 실행 순서 모델 +=================================== + +컨셉적으로 CPU 는 주어진 프로그램에 대해 프로그램 그 자체에는 인과성 (program +causality) 을 지키는 것처럼 보이게 하지만 일반적으로는 순서를 거의 지켜주지 +않는다고 가정되어야만 합니다. (i386 이나 x86_64 같은) 일부 CPU 들은 코드 +재배치에 (powerpc 나 frv 와 같은) 다른 것들에 비해 강한 제약을 갖지만, 아키텍쳐 +종속적 코드 이외의 코드에서는 순서에 대한 제약이 가장 완화된 경우 (DEC Alpha) +를 가정해야 합니다. + +이 말은, CPU 에게 주어지는 인스트럭션 스트림 내의 한 인스트럭션이 앞의 +인스트럭션에 종속적이라면 앞의 인스트럭션은 뒤의 종속적 인스트럭션이 실행되기 +전에 완료[*]될 수 있어야 한다는 제약 (달리 말해서, 인과성이 지켜지는 것으로 +보이게 함) 외에는 자신이 원하는 순서대로 - 심지어 병렬적으로도 - 그 스트림을 +실행할 수 있음을 의미합니다 + + [*] 일부 인스트럭션은 하나 이상의 영향 - 조건 코드를 바꾼다던지, 레지스터나 + 메모리를 바꾼다던지 - 을 만들어내며, 다른 인스트럭션은 다른 효과에 + 종속적일 수 있습니다. + +CPU 는 최종적으로 아무 효과도 만들지 않는 인스트럭션 시퀀스는 없애버릴 수도 +있습니다. 예를 들어, 만약 두개의 연속되는 인스트럭션이 둘 다 같은 레지스터에 +직접적인 값 (immediate value) 을 집어넣는다면, 첫번째 인스트럭션은 버려질 수도 +있습니다. + + +비슷하게, 컴파일러 역시 프로그램의 인과성만 지켜준다면 인스트럭션 스트림을 +자신이 보기에 올바르다 생각되는대로 재배치 할 수 있습니다. + + +=============== +CPU 캐시의 영향 +=============== + +캐시된 메모리 오퍼레이션들이 시스템 전체에 어떻게 인지되는지는 CPU 와 메모리 +사이에 존재하는 캐시들, 그리고 시스템 상태의 일관성을 관리하는 메모리 일관성 +시스템에 상당 부분 영향을 받습니다. + +한 CPU 가 시스템의 다른 부분들과 캐시를 통해 상호작용한다면, 메모리 시스템은 +CPU 의 캐시들을 포함해야 하며, CPU 와 CPU 자신의 캐시 사이에서의 동작을 위한 +메모리 배리어를 가져야 합니다. (메모리 배리어는 논리적으로는 다음 그림의 +점선에서 동작합니다): + + <--- CPU ---> : <----------- Memory -----------> + : + +--------+ +--------+ : +--------+ +-----------+ + | | | | : | | | | +--------+ + | CPU | | Memory | : | CPU | | | | | + | Core |--->| Access |----->| Cache |<-->| | | | + | | | Queue | : | | | |--->| Memory | + | | | | : | | | | | | + +--------+ +--------+ : +--------+ | | | | + : | Cache | +--------+ + : | Coherency | + : | Mechanism | +--------+ + +--------+ +--------+ : +--------+ | | | | + | | | | : | | | | | | + | CPU | | Memory | : | CPU | | |--->| Device | + | Core |--->| Access |----->| Cache |<-->| | | | + | | | Queue | : | | | | | | + | | | | : | | | | +--------+ + +--------+ +--------+ : +--------+ +-----------+ + : + : + +특정 로드나 스토어는 해당 오퍼레이션을 요청한 CPU 의 캐시 내에서 동작을 완료할 +수도 있기 때문에 해당 CPU 의 바깥에는 보이지 않을 수 있지만, 다른 CPU 가 관심을 +갖는다면 캐시 일관성 메커니즘이 해당 캐시라인을 해당 CPU 에게 전달하고, 해당 +메모리 영역에 대한 오퍼레이션이 발생할 때마다 그 영향을 전파시키기 때문에, 해당 +오퍼레이션은 메모리에 실제로 액세스를 한것처럼 나타날 것입니다. + +CPU 코어는 프로그램의 인과성이 유지된다고만 여겨진다면 인스트럭션들을 어떤 +순서로든 재배치해서 수행할 수 있습니다. 일부 인스트럭션들은 로드나 스토어 +오퍼레이션을 만드는데 이 오퍼레이션들은 이후 수행될 메모리 액세스 큐에 들어가게 +됩니다. 코어는 이 오퍼레이션들을 해당 큐에 어떤 순서로든 원하는대로 넣을 수 +있고, 다른 인스트럭션의 완료를 기다리도록 강제되기 전까지는 수행을 계속합니다. + +메모리 배리어가 하는 일은 CPU 쪽에서 메모리 쪽으로 넘어가는 액세스들의 순서, +그리고 그 액세스의 결과가 시스템의 다른 관찰자들에게 인지되는 순서를 제어하는 +것입니다. + +[!] CPU 들은 항상 그들 자신의 로드와 스토어는 프로그램 순서대로 일어난 것으로 +보기 때문에, 주어진 CPU 내에서는 메모리 배리어를 사용할 필요가 _없습니다_. + +[!] MMIO 나 다른 디바이스 액세스들은 캐시 시스템을 우회할 수도 있습니다. 우회 +여부는 디바이스가 액세스 되는 메모리 윈도우의 특성에 의해 결정될 수도 있고, CPU +가 가지고 있을 수 있는 특수한 디바이스 통신 인스트럭션의 사용에 의해서 결정될 +수도 있습니다. + + +캐시 일관성 +----------- + +하지만 삶은 앞에서 이야기한 것처럼 단순하지 않습니다: 캐시들은 일관적일 것으로 +기대되지만, 그 일관성이 순서에도 적용될 거라는 보장은 없습니다. 한 CPU 에서 +만들어진 변경 사항은 최종적으로는 시스템의 모든 CPU 에게 보여지게 되지만, 다른 +CPU 들에게도 같은 순서로 보이게 될 거라는 보장은 없다는 뜻입니다. + + +두개의 CPU (1 & 2) 가 달려 있고, 각 CPU 에 두개의 데이터 캐시(CPU 1 은 A/B 를, +CPU 2 는 C/D 를 갖습니다)가 병렬로 연결되어 있는 시스템을 다룬다고 생각해 +봅시다: + + : + : +--------+ + : +---------+ | | + +--------+ : +--->| Cache A |<------->| | + | | : | +---------+ | | + | CPU 1 |<---+ | | + | | : | +---------+ | | + +--------+ : +--->| Cache B |<------->| | + : +---------+ | | + : | Memory | + : +---------+ | System | + +--------+ : +--->| Cache C |<------->| | + | | : | +---------+ | | + | CPU 2 |<---+ | | + | | : | +---------+ | | + +--------+ : +--->| Cache D |<------->| | + : +---------+ | | + : +--------+ + : + +이 시스템이 다음과 같은 특성을 갖는다 생각해 봅시다: + + (*) 홀수번 캐시라인은 캐시 A, 캐시 C 또는 메모리에 위치할 수 있음; + + (*) 짝수번 캐시라인은 캐시 B, 캐시 D 또는 메모리에 위치할 수 있음; + + (*) CPU 코어가 한개의 캐시에 접근하는 동안, 다른 캐시는 - 더티 캐시라인을 + 메모리에 내리거나 추측성 로드를 하거나 하기 위해 - 시스템의 다른 부분에 + 액세스 하기 위해 버스를 사용할 수 있음; + + (*) 각 캐시는 시스템의 나머지 부분들과 일관성을 맞추기 위해 해당 캐시에 + 적용되어야 할 오퍼레이션들의 큐를 가짐; + + (*) 이 일관성 큐는 캐시에 이미 존재하는 라인에 가해지는 평범한 로드에 의해서는 + 비워지지 않는데, 큐의 오퍼레이션들이 이 로드의 결과에 영향을 끼칠 수 있다 + 할지라도 그러함. + +이제, 첫번째 CPU 에서 두개의 쓰기 오퍼레이션을 만드는데, 해당 CPU 의 캐시에 +요청된 순서로 오퍼레이션이 도달됨을 보장하기 위해 두 오퍼레이션 사이에 쓰기 +배리어를 사용하는 상황을 상상해 봅시다: + + CPU 1 CPU 2 COMMENT + =============== =============== ======================================= + u == 0, v == 1 and p == &u, q == &u + v = 2; + smp_wmb(); v 의 변경이 p 의 변경 전에 보일 것을 + 분명히 함 + <A:modify v=2> v 는 이제 캐시 A 에 독점적으로 존재함 + p = &v; + <B:modify p=&v> p 는 이제 캐시 B 에 독점적으로 존재함 + +여기서의 쓰기 메모리 배리어는 CPU 1 의 캐시가 올바른 순서로 업데이트 된 것으로 +시스템의 다른 CPU 들이 인지하게 만듭니다. 하지만, 이제 두번째 CPU 가 그 값들을 +읽으려 하는 상황을 생각해 봅시다: + + CPU 1 CPU 2 COMMENT + =============== =============== ======================================= + ... + q = p; + x = *q; + +위의 두개의 읽기 오퍼레이션은 예상된 순서로 일어나지 못할 수 있는데, 두번째 CPU +의 한 캐시에 다른 캐시 이벤트가 발생해 v 를 담고 있는 캐시라인의 해당 캐시에의 +업데이트가 지연되는 사이, p 를 담고 있는 캐시라인은 두번째 CPU 의 다른 캐시에 +업데이트 되어버렸을 수 있기 때문입니다. + + CPU 1 CPU 2 COMMENT + =============== =============== ======================================= + u == 0, v == 1 and p == &u, q == &u + v = 2; + smp_wmb(); + <A:modify v=2> <C:busy> + <C:queue v=2> + p = &v; q = p; + <D:request p> + <B:modify p=&v> <D:commit p=&v> + <D:read p> + x = *q; + <C:read *q> 캐시에 업데이트 되기 전의 v 를 읽음 + <C:unbusy> + <C:commit v=2> + +기본적으로, 두개의 캐시라인 모두 CPU 2 에 최종적으로는 업데이트 될 것이지만, +별도의 개입 없이는, 업데이트의 순서가 CPU 1 에서 만들어진 순서와 동일할 +것이라는 보장이 없습니다. + + +여기에 개입하기 위해선, 데이터 의존성 배리어나 읽기 배리어를 로드 오퍼레이션들 +사이에 넣어야 합니다. 이렇게 함으로써 캐시가 다음 요청을 처리하기 전에 일관성 +큐를 처리하도록 강제하게 됩니다. + + CPU 1 CPU 2 COMMENT + =============== =============== ======================================= + u == 0, v == 1 and p == &u, q == &u + v = 2; + smp_wmb(); + <A:modify v=2> <C:busy> + <C:queue v=2> + p = &v; q = p; + <D:request p> + <B:modify p=&v> <D:commit p=&v> + <D:read p> + smp_read_barrier_depends() + <C:unbusy> + <C:commit v=2> + x = *q; + <C:read *q> 캐시에 업데이트 된 v 를 읽음 + + +이런 부류의 문제는 DEC Alpha 계열 프로세서들에서 발견될 수 있는데, 이들은 +데이터 버스를 좀 더 잘 사용해 성능을 개선할 수 있는, 분할된 캐시를 가지고 있기 +때문입니다. 대부분의 CPU 는 하나의 읽기 오퍼레이션의 메모리 액세스가 다른 읽기 +오퍼레이션에 의존적이라면 데이터 의존성 배리어를 내포시킵니다만, 모두가 그런건 +아니기 때문에 이점에 의존해선 안됩니다. + +다른 CPU 들도 분할된 캐시를 가지고 있을 수 있지만, 그런 CPU 들은 평범한 메모리 +액세스를 위해서도 이 분할된 캐시들 사이의 조정을 해야만 합니다. Alpha 는 가장 +약한 메모리 순서 시맨틱 (semantic) 을 선택함으로써 메모리 배리어가 명시적으로 +사용되지 않았을 때에는 그런 조정이 필요하지 않게 했습니다. + + +캐시 일관성 VS DMA +------------------ + +모든 시스템이 DMA 를 하는 디바이스에 대해서까지 캐시 일관성을 유지하지는 +않습니다. 그런 경우, DMA 를 시도하는 디바이스는 RAM 으로부터 잘못된 데이터를 +읽을 수 있는데, 더티 캐시 라인이 CPU 의 캐시에 머무르고 있고, 바뀐 값이 아직 +RAM 에 써지지 않았을 수 있기 때문입니다. 이 문제를 해결하기 위해선, 커널의 +적절한 부분에서 각 CPU 캐시의 문제되는 비트들을 플러시 (flush) 시켜야만 합니다 +(그리고 그것들을 무효화 - invalidation - 시킬 수도 있겠죠). + +또한, 디바이스에 의해 RAM 에 DMA 로 쓰여진 값은 디바이스가 쓰기를 완료한 후에 +CPU 의 캐시에서 RAM 으로 쓰여지는 더티 캐시 라인에 의해 덮어써질 수도 있고, CPU +의 캐시에 존재하는 캐시 라인이 해당 캐시에서 삭제되고 다시 값을 읽어들이기 +전까지는 RAM 이 업데이트 되었다는 사실 자체가 숨겨져 버릴 수도 있습니다. 이 +문제를 해결하기 위해선, 커널의 적절한 부분에서 각 CPU 의 캐시 안의 문제가 되는 +비트들을 무효화 시켜야 합니다. + +캐시 관리에 대한 더 많은 정보를 위해선 Documentation/cachetlb.txt 를 +참고하세요. + + +캐시 일관성 VS MMIO +------------------- + +Memory mapped I/O 는 일반적으로 CPU 의 메모리 공간 내의 한 윈도우의 특정 부분 +내의 메모리 지역에 이루어지는데, 이 윈도우는 일반적인, RAM 으로 향하는 +윈도우와는 다른 특성을 갖습니다. + +그런 특성 가운데 하나는, 일반적으로 그런 액세스는 캐시를 완전히 우회하고 +디바이스 버스로 곧바로 향한다는 것입니다. 이 말은 MMIO 액세스는 먼저 +시작되어서 캐시에서 완료된 메모리 액세스를 추월할 수 있다는 뜻입니다. 이런 +경우엔 메모리 배리어만으로는 충분치 않고, 만약 캐시된 메모리 쓰기 오퍼레이션과 +MMIO 액세스가 어떤 방식으로든 의존적이라면 해당 캐시는 두 오퍼레이션 사이에 +비워져(flush)야만 합니다. + + +====================== +CPU 들이 저지르는 일들 +====================== + +프로그래머는 CPU 가 메모리 오퍼레이션들을 정확히 요청한대로 수행해 줄 것이라고 +생각하는데, 예를 들어 다음과 같은 코드를 CPU 에게 넘긴다면: + + a = READ_ONCE(*A); + WRITE_ONCE(*B, b); + c = READ_ONCE(*C); + d = READ_ONCE(*D); + WRITE_ONCE(*E, e); + +CPU 는 다음 인스트럭션을 처리하기 전에 현재의 인스트럭션을 위한 메모리 +오퍼레이션을 완료할 것이라 생각하고, 따라서 시스템 외부에서 관찰하기에도 정해진 +순서대로 오퍼레이션이 수행될 것으로 예상합니다: + + LOAD *A, STORE *B, LOAD *C, LOAD *D, STORE *E. + + +당연하지만, 실제로는 훨씬 엉망입니다. 많은 CPU 와 컴파일러에서 앞의 가정은 +성립하지 못하는데 그 이유는 다음과 같습니다: + + (*) 로드 오퍼레이션들은 실행을 계속 해나가기 위해 곧바로 완료될 필요가 있는 + 경우가 많은 반면, 스토어 오퍼레이션들은 종종 별다른 문제 없이 유예될 수 + 있습니다; + + (*) 로드 오퍼레이션들은 예측적으로 수행될 수 있으며, 필요없는 로드였다고 + 증명된 예측적 로드의 결과는 버려집니다; + + (*) 로드 오퍼레이션들은 예측적으로 수행될 수 있으므로, 예상된 이벤트의 + 시퀀스와 다른 시간에 로드가 이뤄질 수 있습니다; + + (*) 메모리 액세스 순서는 CPU 버스와 캐시를 좀 더 잘 사용할 수 있도록 재배치 + 될 수 있습니다; + + (*) 로드와 스토어는 인접한 위치에의 액세스들을 일괄적으로 처리할 수 있는 + 메모리나 I/O 하드웨어 (메모리와 PCI 디바이스 둘 다 이게 가능할 수 + 있습니다) 에 대해 요청되는 경우, 개별 오퍼레이션을 위한 트랜잭션 설정 + 비용을 아끼기 위해 조합되어 실행될 수 있습니다; 그리고 + + (*) 해당 CPU 의 데이터 캐시가 순서에 영향을 끼칠 수도 있고, 캐시 일관성 + 메커니즘이 - 스토어가 실제로 캐시에 도달한다면 - 이 문제를 완화시킬 수는 + 있지만 이 일관성 관리가 다른 CPU 들에도 같은 순서로 전달된다는 보장은 + 없습니다. + +따라서, 앞의 코드에 대해 다른 CPU 가 보는 결과는 다음과 같을 수 있습니다: + + LOAD *A, ..., LOAD {*C,*D}, STORE *E, STORE *B + + ("LOAD {*C,*D}" 는 조합된 로드입니다) + + +하지만, CPU 는 스스로는 일관적일 것을 보장합니다: CPU _자신_ 의 액세스들은 +자신에게는 메모리 배리어가 없음에도 불구하고 정확히 순서 세워진 것으로 보여질 +것입니다. 예를 들어 다음의 코드가 주어졌다면: + + U = READ_ONCE(*A); + WRITE_ONCE(*A, V); + WRITE_ONCE(*A, W); + X = READ_ONCE(*A); + WRITE_ONCE(*A, Y); + Z = READ_ONCE(*A); + +그리고 외부의 영향에 의한 간섭이 없다고 가정하면, 최종 결과는 다음과 같이 +나타날 것이라고 예상될 수 있습니다: + + U == *A 의 최초 값 + X == W + Z == Y + *A == Y + +앞의 코드는 CPU 가 다음의 메모리 액세스 시퀀스를 만들도록 할겁니다: + + U=LOAD *A, STORE *A=V, STORE *A=W, X=LOAD *A, STORE *A=Y, Z=LOAD *A + +하지만, 별다른 개입이 없고 프로그램의 시야에 이 세상이 여전히 일관적이라고 +보인다는 보장만 지켜진다면 이 시퀀스는 어떤 조합으로든 재구성될 수 있으며, 각 +액세스들은 합쳐지거나 버려질 수 있습니다. 일부 아키텍쳐에서 CPU 는 같은 위치에 +대한 연속적인 로드 오퍼레이션들을 재배치 할 수 있기 때문에 앞의 예에서의 +READ_ONCE() 와 WRITE_ONCE() 는 반드시 존재해야 함을 알아두세요. 그런 종류의 +아키텍쳐에서 READ_ONCE() 와 WRITE_ONCE() 는 이 문제를 막기 위해 필요한 일을 +뭐가 됐든지 하게 되는데, 예를 들어 Itanium 에서는 READ_ONCE() 와 WRITE_ONCE() +가 사용하는 volatile 캐스팅은 GCC 가 그런 재배치를 방지하는 특수 인스트럭션인 +ld.acq 와 stl.rel 인스트럭션을 각각 만들어 내도록 합니다. + +컴파일러 역시 이 시퀀스의 액세스들을 CPU 가 보기도 전에 합치거나 버리거나 뒤로 +미뤄버릴 수 있습니다. + +예를 들어: + + *A = V; + *A = W; + +는 다음과 같이 변형될 수 있습니다: + + *A = W; + +따라서, 쓰기 배리어나 WRITE_ONCE() 가 없다면 *A 로의 V 값의 저장의 효과는 +사라진다고 가정될 수 있습니다. 비슷하게: + + *A = Y; + Z = *A; + +는, 메모리 배리어나 READ_ONCE() 와 WRITE_ONCE() 없이는 다음과 같이 변형될 수 +있습니다: + + *A = Y; + Z = Y; + +그리고 이 LOAD 오퍼레이션은 CPU 바깥에는 아예 보이지 않습니다. + + +그리고, ALPHA 가 있다 +--------------------- + +DEC Alpha CPU 는 가장 완화된 메모리 순서의 CPU 중 하나입니다. 뿐만 아니라, +Alpha CPU 의 일부 버전은 분할된 데이터 캐시를 가지고 있어서, 의미적으로 +관계되어 있는 두개의 캐시 라인이 서로 다른 시간에 업데이트 되는게 가능합니다. +이게 데이터 의존성 배리어가 정말 필요해지는 부분인데, 데이터 의존성 배리어는 +메모리 일관성 시스템과 함께 두개의 캐시를 동기화 시켜서, 포인터 변경과 새로운 +데이터의 발견을 올바른 순서로 일어나게 하기 때문입니다. + +리눅스 커널의 메모리 배리어 모델은 Alpha 에 기초해서 정의되었습니다. + +위의 "캐시 일관성" 서브섹션을 참고하세요. + + +가상 머신 게스트 +---------------- + +가상 머신에서 동작하는 게스트들은 게스트 자체는 SMP 지원 없이 컴파일 되었다 +해도 SMP 영향을 받을 수 있습니다. 이건 UP 커널을 사용하면서 SMP 호스트와 +결부되어 발생하는 부작용입니다. 이 경우에는 mandatory 배리어를 사용해서 문제를 +해결할 수 있겠지만 그런 해결은 대부분의 경우 최적의 해결책이 아닙니다. + +이 문제를 완벽하게 해결하기 위해, 로우 레벨의 virt_mb() 등의 매크로를 사용할 수 +있습니다. 이것들은 SMP 가 활성화 되어 있다면 smp_mb() 등과 동일한 효과를 +갖습니다만, SMP 와 SMP 아닌 시스템 모두에 대해 동일한 코드를 만들어냅니다. +예를 들어, 가상 머신 게스트들은 (SMP 일 수 있는) 호스트와 동기화를 할 때에는 +smp_mb() 가 아니라 virt_mb() 를 사용해야 합니다. + +이것들은 smp_mb() 류의 것들과 모든 부분에서 동일하며, 특히, MMIO 의 영향에 +대해서는 간여하지 않습니다: MMIO 의 영향을 제어하려면, mandatory 배리어를 +사용하시기 바랍니다. + + +======= +사용 예 +======= + +순환식 버퍼 +----------- + +메모리 배리어는 순환식 버퍼를 생성자(producer)와 소비자(consumer) 사이의 +동기화에 락을 사용하지 않고 구현하는데에 사용될 수 있습니다. 더 자세한 내용을 +위해선 다음을 참고하세요: + + Documentation/circular-buffers.txt + + +========= +참고 문헌 +========= + +Alpha AXP Architecture Reference Manual, Second Edition (Sites & Witek, +Digital Press) + Chapter 5.2: Physical Address Space Characteristics + Chapter 5.4: Caches and Write Buffers + Chapter 5.5: Data Sharing + Chapter 5.6: Read/Write Ordering + +AMD64 Architecture Programmer's Manual Volume 2: System Programming + Chapter 7.1: Memory-Access Ordering + Chapter 7.4: Buffering and Combining Memory Writes + +IA-32 Intel Architecture Software Developer's Manual, Volume 3: +System Programming Guide + Chapter 7.1: Locked Atomic Operations + Chapter 7.2: Memory Ordering + Chapter 7.4: Serializing Instructions + +The SPARC Architecture Manual, Version 9 + Chapter 8: Memory Models + Appendix D: Formal Specification of the Memory Models + Appendix J: Programming with the Memory Models + +UltraSPARC Programmer Reference Manual + Chapter 5: Memory Accesses and Cacheability + Chapter 15: Sparc-V9 Memory Models + +UltraSPARC III Cu User's Manual + Chapter 9: Memory Models + +UltraSPARC IIIi Processor User's Manual + Chapter 8: Memory Models + +UltraSPARC Architecture 2005 + Chapter 9: Memory + Appendix D: Formal Specifications of the Memory Models + +UltraSPARC T1 Supplement to the UltraSPARC Architecture 2005 + Chapter 8: Memory Models + Appendix F: Caches and Cache Coherency + +Solaris Internals, Core Kernel Architecture, p63-68: + Chapter 3.3: Hardware Considerations for Locks and + Synchronization + +Unix Systems for Modern Architectures, Symmetric Multiprocessing and Caching +for Kernel Programmers: + Chapter 13: Other Memory Models + +Intel Itanium Architecture Software Developer's Manual: Volume 1: + Section 2.6: Speculation + Section 4.4: Memory Access diff --git a/Documentation/locking/lglock.txt b/Documentation/locking/lglock.txt deleted file mode 100644 index a6971e34fabe..000000000000 --- a/Documentation/locking/lglock.txt +++ /dev/null @@ -1,166 +0,0 @@ -lglock - local/global locks for mostly local access patterns ------------------------------------------------------------- - -Origin: Nick Piggin's VFS scalability series introduced during - 2.6.35++ [1] [2] -Location: kernel/locking/lglock.c - include/linux/lglock.h -Users: currently only the VFS and stop_machine related code - -Design Goal: ------------- - -Improve scalability of globally used large data sets that are -distributed over all CPUs as per_cpu elements. - -To manage global data structures that are partitioned over all CPUs -as per_cpu elements but can be mostly handled by CPU local actions -lglock will be used where the majority of accesses are cpu local -reading and occasional cpu local writing with very infrequent -global write access. - - -* deal with things locally whenever possible - - very fast access to the local per_cpu data - - reasonably fast access to specific per_cpu data on a different - CPU -* while making global action possible when needed - - by expensive access to all CPUs locks - effectively - resulting in a globally visible critical section. - -Design: -------- - -Basically it is an array of per_cpu spinlocks with the -lg_local_lock/unlock accessing the local CPUs lock object and the -lg_local_lock_cpu/unlock_cpu accessing a remote CPUs lock object -the lg_local_lock has to disable preemption as migration protection so -that the reference to the local CPUs lock does not go out of scope. -Due to the lg_local_lock/unlock only touching cpu-local resources it -is fast. Taking the local lock on a different CPU will be more -expensive but still relatively cheap. - -One can relax the migration constraints by acquiring the current -CPUs lock with lg_local_lock_cpu, remember the cpu, and release that -lock at the end of the critical section even if migrated. This should -give most of the performance benefits without inhibiting migration -though needs careful considerations for nesting of lglocks and -consideration of deadlocks with lg_global_lock. - -The lg_global_lock/unlock locks all underlying spinlocks of all -possible CPUs (including those off-line). The preemption disable/enable -are needed in the non-RT kernels to prevent deadlocks like: - - on cpu 1 - - task A task B - lg_global_lock - got cpu 0 lock - <<<< preempt <<<< - lg_local_lock_cpu for cpu 0 - spin on cpu 0 lock - -On -RT this deadlock scenario is resolved by the arch_spin_locks in the -lglocks being replaced by rt_mutexes which resolve the above deadlock -by boosting the lock-holder. - - -Implementation: ---------------- - -The initial lglock implementation from Nick Piggin used some complex -macros to generate the lglock/brlock in lglock.h - they were later -turned into a set of functions by Andi Kleen [7]. The change to functions -was motivated by the presence of multiple lock users and also by them -being easier to maintain than the generating macros. This change to -functions is also the basis to eliminated the restriction of not -being initializeable in kernel modules (the remaining problem is that -locks are not explicitly initialized - see lockdep-design.txt) - -Declaration and initialization: -------------------------------- - - #include <linux/lglock.h> - - DEFINE_LGLOCK(name) - or: - DEFINE_STATIC_LGLOCK(name); - - lg_lock_init(&name, "lockdep_name_string"); - - on UP this is mapped to DEFINE_SPINLOCK(name) in both cases, note - also that as of 3.18-rc6 all declaration in use are of the _STATIC_ - variant (and it seems that the non-static was never in use). - lg_lock_init is initializing the lockdep map only. - -Usage: ------- - -From the locking semantics it is a spinlock. It could be called a -locality aware spinlock. lg_local_* behaves like a per_cpu -spinlock and lg_global_* like a global spinlock. -No surprises in the API. - - lg_local_lock(*lglock); - access to protected per_cpu object on this CPU - lg_local_unlock(*lglock); - - lg_local_lock_cpu(*lglock, cpu); - access to protected per_cpu object on other CPU cpu - lg_local_unlock_cpu(*lglock, cpu); - - lg_global_lock(*lglock); - access all protected per_cpu objects on all CPUs - lg_global_unlock(*lglock); - - There are no _trylock variants of the lglocks. - -Note that the lg_global_lock/unlock has to iterate over all possible -CPUs rather than the actually present CPUs or a CPU could go off-line -with a held lock [4] and that makes it very expensive. A discussion on -these issues can be found at [5] - -Constraints: ------------- - - * currently the declaration of lglocks in kernel modules is not - possible, though this should be doable with little change. - * lglocks are not recursive. - * suitable for code that can do most operations on the CPU local - data and will very rarely need the global lock - * lg_global_lock/unlock is *very* expensive and does not scale - * on UP systems all lg_* primitives are simply spinlocks - * in PREEMPT_RT the spinlock becomes an rt-mutex and can sleep but - does not change the tasks state while sleeping [6]. - * in PREEMPT_RT the preempt_disable/enable in lg_local_lock/unlock - is downgraded to a migrate_disable/enable, the other - preempt_disable/enable are downgraded to barriers [6]. - The deadlock noted for non-RT above is resolved due to rt_mutexes - boosting the lock-holder in this case which arch_spin_locks do - not do. - -lglocks were designed for very specific problems in the VFS and probably -only are the right answer in these corner cases. Any new user that looks -at lglocks probably wants to look at the seqlock and RCU alternatives as -her first choice. There are also efforts to resolve the RCU issues that -currently prevent using RCU in place of view remaining lglocks. - -Note on brlock history: ------------------------ - -The 'Big Reader' read-write spinlocks were originally introduced by -Ingo Molnar in 2000 (2.4/2.5 kernel series) and removed in 2003. They -later were introduced by the VFS scalability patch set in 2.6 series -again as the "big reader lock" brlock [2] variant of lglock which has -been replaced by seqlock primitives or by RCU based primitives in the -3.13 kernel series as was suggested in [3] in 2003. The brlock was -entirely removed in the 3.13 kernel series. - -Link: 1 http://lkml.org/lkml/2010/8/2/81 -Link: 2 http://lwn.net/Articles/401738/ -Link: 3 http://lkml.org/lkml/2003/3/9/205 -Link: 4 https://lkml.org/lkml/2011/8/24/185 -Link: 5 http://lkml.org/lkml/2011/12/18/189 -Link: 6 https://www.kernel.org/pub/linux/kernel/projects/rt/ - patch series - lglocks-rt.patch.patch -Link: 7 http://lkml.org/lkml/2012/3/5/26 diff --git a/Documentation/memory-barriers.txt b/Documentation/memory-barriers.txt index a4d0a99de04d..ba818ecce6f9 100644 --- a/Documentation/memory-barriers.txt +++ b/Documentation/memory-barriers.txt @@ -609,7 +609,7 @@ A data-dependency barrier must also order against dependent writes: The data-dependency barrier must order the read into Q with the store into *Q. This prohibits this outcome: - (Q == B) && (B == 4) + (Q == &B) && (B == 4) Please note that this pattern should be rare. After all, the whole point of dependency ordering is to -prevent- writes to the data structure, along @@ -1928,6 +1928,7 @@ There are some more advanced barrier functions: See Documentation/DMA-API.txt for more information on consistent memory. + MMIO WRITE BARRIER ------------------ @@ -2075,7 +2076,7 @@ systems, and so cannot be counted on in such a situation to actually achieve anything at all - especially with respect to I/O accesses - unless combined with interrupt disabling operations. -See also the section on "Inter-CPU locking barrier effects". +See also the section on "Inter-CPU acquiring barrier effects". As an example, consider the following: diff --git a/arch/x86/Kconfig b/arch/x86/Kconfig index 2a1f0ce7c59a..0cc8811af4e0 100644 --- a/arch/x86/Kconfig +++ b/arch/x86/Kconfig @@ -705,7 +705,6 @@ config PARAVIRT_DEBUG config PARAVIRT_SPINLOCKS bool "Paravirtualization layer for spinlocks" depends on PARAVIRT && SMP - select UNINLINE_SPIN_UNLOCK if !QUEUED_SPINLOCKS ---help--- Paravirtualized spinlocks allow a pvops backend to replace the spinlock implementation with something virtualization-friendly @@ -718,7 +717,7 @@ config PARAVIRT_SPINLOCKS config QUEUED_LOCK_STAT bool "Paravirt queued spinlock statistics" - depends on PARAVIRT_SPINLOCKS && DEBUG_FS && QUEUED_SPINLOCKS + depends on PARAVIRT_SPINLOCKS && DEBUG_FS ---help--- Enable the collection of statistical data on the slowpath behavior of paravirtualized queued spinlocks and report diff --git a/arch/x86/include/asm/cmpxchg.h b/arch/x86/include/asm/cmpxchg.h index 9733361fed6f..97848cdfcb1a 100644 --- a/arch/x86/include/asm/cmpxchg.h +++ b/arch/x86/include/asm/cmpxchg.h @@ -158,53 +158,9 @@ extern void __add_wrong_size(void) * value of "*ptr". * * xadd() is locked when multiple CPUs are online - * xadd_sync() is always locked - * xadd_local() is never locked */ #define __xadd(ptr, inc, lock) __xchg_op((ptr), (inc), xadd, lock) #define xadd(ptr, inc) __xadd((ptr), (inc), LOCK_PREFIX) -#define xadd_sync(ptr, inc) __xadd((ptr), (inc), "lock; ") -#define xadd_local(ptr, inc) __xadd((ptr), (inc), "") - -#define __add(ptr, inc, lock) \ - ({ \ - __typeof__ (*(ptr)) __ret = (inc); \ - switch (sizeof(*(ptr))) { \ - case __X86_CASE_B: \ - asm volatile (lock "addb %b1, %0\n" \ - : "+m" (*(ptr)) : "qi" (inc) \ - : "memory", "cc"); \ - break; \ - case __X86_CASE_W: \ - asm volatile (lock "addw %w1, %0\n" \ - : "+m" (*(ptr)) : "ri" (inc) \ - : "memory", "cc"); \ - break; \ - case __X86_CASE_L: \ - asm volatile (lock "addl %1, %0\n" \ - : "+m" (*(ptr)) : "ri" (inc) \ - : "memory", "cc"); \ - break; \ - case __X86_CASE_Q: \ - asm volatile (lock "addq %1, %0\n" \ - : "+m" (*(ptr)) : "ri" (inc) \ - : "memory", "cc"); \ - break; \ - default: \ - __add_wrong_size(); \ - } \ - __ret; \ - }) - -/* - * add_*() adds "inc" to "*ptr" - * - * __add() takes a lock prefix - * add_smp() is locked when multiple CPUs are online - * add_sync() is always locked - */ -#define add_smp(ptr, inc) __add((ptr), (inc), LOCK_PREFIX) -#define add_sync(ptr, inc) __add((ptr), (inc), "lock; ") #define __cmpxchg_double(pfx, p1, p2, o1, o2, n1, n2) \ ({ \ diff --git a/arch/x86/include/asm/paravirt.h b/arch/x86/include/asm/paravirt.h index 2970d22d7766..4cd8db05301f 100644 --- a/arch/x86/include/asm/paravirt.h +++ b/arch/x86/include/asm/paravirt.h @@ -661,8 +661,6 @@ static inline void __set_fixmap(unsigned /* enum fixed_addresses */ idx, #if defined(CONFIG_SMP) && defined(CONFIG_PARAVIRT_SPINLOCKS) -#ifdef CONFIG_QUEUED_SPINLOCKS - static __always_inline void pv_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) { @@ -684,22 +682,6 @@ static __always_inline void pv_kick(int cpu) PVOP_VCALL1(pv_lock_ops.kick, cpu); } -#else /* !CONFIG_QUEUED_SPINLOCKS */ - -static __always_inline void __ticket_lock_spinning(struct arch_spinlock *lock, - __ticket_t ticket) -{ - PVOP_VCALLEE2(pv_lock_ops.lock_spinning, lock, ticket); -} - -static __always_inline void __ticket_unlock_kick(struct arch_spinlock *lock, - __ticket_t ticket) -{ - PVOP_VCALL2(pv_lock_ops.unlock_kick, lock, ticket); -} - -#endif /* CONFIG_QUEUED_SPINLOCKS */ - #endif /* SMP && PARAVIRT_SPINLOCKS */ #ifdef CONFIG_X86_32 diff --git a/arch/x86/include/asm/paravirt_types.h b/arch/x86/include/asm/paravirt_types.h index 7fa9e7740ba3..60aac60ba25f 100644 --- a/arch/x86/include/asm/paravirt_types.h +++ b/arch/x86/include/asm/paravirt_types.h @@ -301,23 +301,16 @@ struct pv_mmu_ops { struct arch_spinlock; #ifdef CONFIG_SMP #include <asm/spinlock_types.h> -#else -typedef u16 __ticket_t; #endif struct qspinlock; struct pv_lock_ops { -#ifdef CONFIG_QUEUED_SPINLOCKS void (*queued_spin_lock_slowpath)(struct qspinlock *lock, u32 val); struct paravirt_callee_save queued_spin_unlock; void (*wait)(u8 *ptr, u8 val); void (*kick)(int cpu); -#else /* !CONFIG_QUEUED_SPINLOCKS */ - struct paravirt_callee_save lock_spinning; - void (*unlock_kick)(struct arch_spinlock *lock, __ticket_t ticket); -#endif /* !CONFIG_QUEUED_SPINLOCKS */ }; /* This contains all the paravirt structures: we get a convenient diff --git a/arch/x86/include/asm/rwsem.h b/arch/x86/include/asm/rwsem.h index 8dbc762ad132..3d33a719f5c1 100644 --- a/arch/x86/include/asm/rwsem.h +++ b/arch/x86/include/asm/rwsem.h @@ -154,7 +154,7 @@ static inline bool __down_write_trylock(struct rw_semaphore *sem) : "+m" (sem->count), "=&a" (tmp0), "=&r" (tmp1), CC_OUT(e) (result) : "er" (RWSEM_ACTIVE_WRITE_BIAS) - : "memory", "cc"); + : "memory"); return result; } diff --git a/arch/x86/include/asm/spinlock.h b/arch/x86/include/asm/spinlock.h index be0a05913b91..921bea7a2708 100644 --- a/arch/x86/include/asm/spinlock.h +++ b/arch/x86/include/asm/spinlock.h @@ -20,187 +20,13 @@ * (the type definitions are in asm/spinlock_types.h) */ -#ifdef CONFIG_X86_32 -# define LOCK_PTR_REG "a" -#else -# define LOCK_PTR_REG "D" -#endif - -#if defined(CONFIG_X86_32) && (defined(CONFIG_X86_PPRO_FENCE)) -/* - * On PPro SMP, we use a locked operation to unlock - * (PPro errata 66, 92) - */ -# define UNLOCK_LOCK_PREFIX LOCK_PREFIX -#else -# define UNLOCK_LOCK_PREFIX -#endif - /* How long a lock should spin before we consider blocking */ #define SPIN_THRESHOLD (1 << 15) extern struct static_key paravirt_ticketlocks_enabled; static __always_inline bool static_key_false(struct static_key *key); -#ifdef CONFIG_QUEUED_SPINLOCKS #include <asm/qspinlock.h> -#else - -#ifdef CONFIG_PARAVIRT_SPINLOCKS - -static inline void __ticket_enter_slowpath(arch_spinlock_t *lock) -{ - set_bit(0, (volatile unsigned long *)&lock->tickets.head); -} - -#else /* !CONFIG_PARAVIRT_SPINLOCKS */ -static __always_inline void __ticket_lock_spinning(arch_spinlock_t *lock, - __ticket_t ticket) -{ -} -static inline void __ticket_unlock_kick(arch_spinlock_t *lock, - __ticket_t ticket) -{ -} - -#endif /* CONFIG_PARAVIRT_SPINLOCKS */ -static inline int __tickets_equal(__ticket_t one, __ticket_t two) -{ - return !((one ^ two) & ~TICKET_SLOWPATH_FLAG); -} - -static inline void __ticket_check_and_clear_slowpath(arch_spinlock_t *lock, - __ticket_t head) -{ - if (head & TICKET_SLOWPATH_FLAG) { - arch_spinlock_t old, new; - - old.tickets.head = head; - new.tickets.head = head & ~TICKET_SLOWPATH_FLAG; - old.tickets.tail = new.tickets.head + TICKET_LOCK_INC; - new.tickets.tail = old.tickets.tail; - - /* try to clear slowpath flag when there are no contenders */ - cmpxchg(&lock->head_tail, old.head_tail, new.head_tail); - } -} - -static __always_inline int arch_spin_value_unlocked(arch_spinlock_t lock) -{ - return __tickets_equal(lock.tickets.head, lock.tickets.tail); -} - -/* - * Ticket locks are conceptually two parts, one indicating the current head of - * the queue, and the other indicating the current tail. The lock is acquired - * by atomically noting the tail and incrementing it by one (thus adding - * ourself to the queue and noting our position), then waiting until the head - * becomes equal to the the initial value of the tail. - * - * We use an xadd covering *both* parts of the lock, to increment the tail and - * also load the position of the head, which takes care of memory ordering - * issues and should be optimal for the uncontended case. Note the tail must be - * in the high part, because a wide xadd increment of the low part would carry - * up and contaminate the high part. - */ -static __always_inline void arch_spin_lock(arch_spinlock_t *lock) -{ - register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC }; - - inc = xadd(&lock->tickets, inc); - if (likely(inc.head == inc.tail)) - goto out; - - for (;;) { - unsigned count = SPIN_THRESHOLD; - - do { - inc.head = READ_ONCE(lock->tickets.head); - if (__tickets_equal(inc.head, inc.tail)) - goto clear_slowpath; - cpu_relax(); - } while (--count); - __ticket_lock_spinning(lock, inc.tail); - } -clear_slowpath: - __ticket_check_and_clear_slowpath(lock, inc.head); -out: - barrier(); /* make sure nothing creeps before the lock is taken */ -} - -static __always_inline int arch_spin_trylock(arch_spinlock_t *lock) -{ - arch_spinlock_t old, new; - - old.tickets = READ_ONCE(lock->tickets); - if (!__tickets_equal(old.tickets.head, old.tickets.tail)) - return 0; - - new.head_tail = old.head_tail + (TICKET_LOCK_INC << TICKET_SHIFT); - new.head_tail &= ~TICKET_SLOWPATH_FLAG; - - /* cmpxchg is a full barrier, so nothing can move before it */ - return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_tail; -} - -static __always_inline void arch_spin_unlock(arch_spinlock_t *lock) -{ - if (TICKET_SLOWPATH_FLAG && - static_key_false(¶virt_ticketlocks_enabled)) { - __ticket_t head; - - BUILD_BUG_ON(((__ticket_t)NR_CPUS) != NR_CPUS); - - head = xadd(&lock->tickets.head, TICKET_LOCK_INC); - - if (unlikely(head & TICKET_SLOWPATH_FLAG)) { - head &= ~TICKET_SLOWPATH_FLAG; - __ticket_unlock_kick(lock, (head + TICKET_LOCK_INC)); - } - } else - __add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX); -} - -static inline int arch_spin_is_locked(arch_spinlock_t *lock) -{ - struct __raw_tickets tmp = READ_ONCE(lock->tickets); - - return !__tickets_equal(tmp.tail, tmp.head); -} - -static inline int arch_spin_is_contended(arch_spinlock_t *lock) -{ - struct __raw_tickets tmp = READ_ONCE(lock->tickets); - - tmp.head &= ~TICKET_SLOWPATH_FLAG; - return (__ticket_t)(tmp.tail - tmp.head) > TICKET_LOCK_INC; -} -#define arch_spin_is_contended arch_spin_is_contended - -static __always_inline void arch_spin_lock_flags(arch_spinlock_t *lock, - unsigned long flags) -{ - arch_spin_lock(lock); -} - -static inline void arch_spin_unlock_wait(arch_spinlock_t *lock) -{ - __ticket_t head = READ_ONCE(lock->tickets.head); - - for (;;) { - struct __raw_tickets tmp = READ_ONCE(lock->tickets); - /* - * We need to check "unlocked" in a loop, tmp.head == head - * can be false positive because of overflow. - */ - if (__tickets_equal(tmp.head, tmp.tail) || - !__tickets_equal(tmp.head, head)) - break; - - cpu_relax(); - } -} -#endif /* CONFIG_QUEUED_SPINLOCKS */ /* * Read-write spinlocks, allowing multiple readers diff --git a/arch/x86/include/asm/spinlock_types.h b/arch/x86/include/asm/spinlock_types.h index 65c3e37f879a..25311ebb446c 100644 --- a/arch/x86/include/asm/spinlock_types.h +++ b/arch/x86/include/asm/spinlock_types.h @@ -23,20 +23,7 @@ typedef u32 __ticketpair_t; #define TICKET_SHIFT (sizeof(__ticket_t) * 8) -#ifdef CONFIG_QUEUED_SPINLOCKS #include <asm-generic/qspinlock_types.h> -#else -typedef struct arch_spinlock { - union { - __ticketpair_t head_tail; - struct __raw_tickets { - __ticket_t head, tail; - } tickets; - }; -} arch_spinlock_t; - -#define __ARCH_SPIN_LOCK_UNLOCKED { { 0 } } -#endif /* CONFIG_QUEUED_SPINLOCKS */ #include <asm-generic/qrwlock_types.h> diff --git a/arch/x86/kernel/kvm.c b/arch/x86/kernel/kvm.c index 1726c4c12336..865058d087ac 100644 --- a/arch/x86/kernel/kvm.c +++ b/arch/x86/kernel/kvm.c @@ -575,9 +575,6 @@ static void kvm_kick_cpu(int cpu) kvm_hypercall2(KVM_HC_KICK_CPU, flags, apicid); } - -#ifdef CONFIG_QUEUED_SPINLOCKS - #include <asm/qspinlock.h> static void kvm_wait(u8 *ptr, u8 val) @@ -606,243 +603,6 @@ out: local_irq_restore(flags); } -#else /* !CONFIG_QUEUED_SPINLOCKS */ - -enum kvm_contention_stat { - TAKEN_SLOW, - TAKEN_SLOW_PICKUP, - RELEASED_SLOW, - RELEASED_SLOW_KICKED, - NR_CONTENTION_STATS -}; - -#ifdef CONFIG_KVM_DEBUG_FS -#define HISTO_BUCKETS 30 - -static struct kvm_spinlock_stats -{ - u32 contention_stats[NR_CONTENTION_STATS]; - u32 histo_spin_blocked[HISTO_BUCKETS+1]; - u64 time_blocked; -} spinlock_stats; - -static u8 zero_stats; - -static inline void check_zero(void) -{ - u8 ret; - u8 old; - - old = READ_ONCE(zero_stats); - if (unlikely(old)) { - ret = cmpxchg(&zero_stats, old, 0); - /* This ensures only one fellow resets the stat */ - if (ret == old) - memset(&spinlock_stats, 0, sizeof(spinlock_stats)); - } -} - -static inline void add_stats(enum kvm_contention_stat var, u32 val) -{ - check_zero(); - spinlock_stats.contention_stats[var] += val; -} - - -static inline u64 spin_time_start(void) -{ - return sched_clock(); -} - -static void __spin_time_accum(u64 delta, u32 *array) -{ - unsigned index; - - index = ilog2(delta); - check_zero(); - - if (index < HISTO_BUCKETS) - array[index]++; - else - array[HISTO_BUCKETS]++; -} - -static inline void spin_time_accum_blocked(u64 start) -{ - u32 delta; - - delta = sched_clock() - start; - __spin_time_accum(delta, spinlock_stats.histo_spin_blocked); - spinlock_stats.time_blocked += delta; -} - -static struct dentry *d_spin_debug; -static struct dentry *d_kvm_debug; - -static struct dentry *kvm_init_debugfs(void) -{ - d_kvm_debug = debugfs_create_dir("kvm-guest", NULL); - if (!d_kvm_debug) - printk(KERN_WARNING "Could not create 'kvm' debugfs directory\n"); - - return d_kvm_debug; -} - -static int __init kvm_spinlock_debugfs(void) -{ - struct dentry *d_kvm; - - d_kvm = kvm_init_debugfs(); - if (d_kvm == NULL) - return -ENOMEM; - - d_spin_debug = debugfs_create_dir("spinlocks", d_kvm); - - debugfs_create_u8("zero_stats", 0644, d_spin_debug, &zero_stats); - - debugfs_create_u32("taken_slow", 0444, d_spin_debug, - &spinlock_stats.contention_stats[TAKEN_SLOW]); - debugfs_create_u32("taken_slow_pickup", 0444, d_spin_debug, - &spinlock_stats.contention_stats[TAKEN_SLOW_PICKUP]); - - debugfs_create_u32("released_slow", 0444, d_spin_debug, - &spinlock_stats.contention_stats[RELEASED_SLOW]); - debugfs_create_u32("released_slow_kicked", 0444, d_spin_debug, - &spinlock_stats.contention_stats[RELEASED_SLOW_KICKED]); - - debugfs_create_u64("time_blocked", 0444, d_spin_debug, - &spinlock_stats.time_blocked); - - debugfs_create_u32_array("histo_blocked", 0444, d_spin_debug, - spinlock_stats.histo_spin_blocked, HISTO_BUCKETS + 1); - - return 0; -} -fs_initcall(kvm_spinlock_debugfs); -#else /* !CONFIG_KVM_DEBUG_FS */ -static inline void add_stats(enum kvm_contention_stat var, u32 val) -{ -} - -static inline u64 spin_time_start(void) -{ - return 0; -} - -static inline void spin_time_accum_blocked(u64 start) -{ -} -#endif /* CONFIG_KVM_DEBUG_FS */ - -struct kvm_lock_waiting { - struct arch_spinlock *lock; - __ticket_t want; -}; - -/* cpus 'waiting' on a spinlock to become available */ -static cpumask_t waiting_cpus; - -/* Track spinlock on which a cpu is waiting */ -static DEFINE_PER_CPU(struct kvm_lock_waiting, klock_waiting); - -__visible void kvm_lock_spinning(struct arch_spinlock *lock, __ticket_t want) -{ - struct kvm_lock_waiting *w; - int cpu; - u64 start; - unsigned long flags; - __ticket_t head; - - if (in_nmi()) - return; - - w = this_cpu_ptr(&klock_waiting); - cpu = smp_processor_id(); - start = spin_time_start(); - - /* - * Make sure an interrupt handler can't upset things in a - * partially setup state. - */ - local_irq_save(flags); - - /* - * The ordering protocol on this is that the "lock" pointer - * may only be set non-NULL if the "want" ticket is correct. - * If we're updating "want", we must first clear "lock". - */ - w->lock = NULL; - smp_wmb(); - w->want = want; - smp_wmb(); - w->lock = lock; - - add_stats(TAKEN_SLOW, 1); - - /* - * This uses set_bit, which is atomic but we should not rely on its - * reordering gurantees. So barrier is needed after this call. - */ - cpumask_set_cpu(cpu, &waiting_cpus); - - barrier(); - - /* - * Mark entry to slowpath before doing the pickup test to make - * sure we don't deadlock with an unlocker. - */ - __ticket_enter_slowpath(lock); - - /* make sure enter_slowpath, which is atomic does not cross the read */ - smp_mb__after_atomic(); - - /* - * check again make sure it didn't become free while - * we weren't looking. - */ - head = READ_ONCE(lock->tickets.head); - if (__tickets_equal(head, want)) { - add_stats(TAKEN_SLOW_PICKUP, 1); - goto out; - } - - /* - * halt until it's our turn and kicked. Note that we do safe halt - * for irq enabled case to avoid hang when lock info is overwritten - * in irq spinlock slowpath and no spurious interrupt occur to save us. - */ - if (arch_irqs_disabled_flags(flags)) - halt(); - else - safe_halt(); - -out: - cpumask_clear_cpu(cpu, &waiting_cpus); - w->lock = NULL; - local_irq_restore(flags); - spin_time_accum_blocked(start); -} -PV_CALLEE_SAVE_REGS_THUNK(kvm_lock_spinning); - -/* Kick vcpu waiting on @lock->head to reach value @ticket */ -static void kvm_unlock_kick(struct arch_spinlock *lock, __ticket_t ticket) -{ - int cpu; - - add_stats(RELEASED_SLOW, 1); - for_each_cpu(cpu, &waiting_cpus) { - const struct kvm_lock_waiting *w = &per_cpu(klock_waiting, cpu); - if (READ_ONCE(w->lock) == lock && - READ_ONCE(w->want) == ticket) { - add_stats(RELEASED_SLOW_KICKED, 1); - kvm_kick_cpu(cpu); - break; - } - } -} - -#endif /* !CONFIG_QUEUED_SPINLOCKS */ - /* * Setup pv_lock_ops to exploit KVM_FEATURE_PV_UNHALT if present. */ @@ -854,16 +614,11 @@ void __init kvm_spinlock_init(void) if (!kvm_para_has_feature(KVM_FEATURE_PV_UNHALT)) return; -#ifdef CONFIG_QUEUED_SPINLOCKS __pv_init_lock_hash(); pv_lock_ops.queued_spin_lock_slowpath = __pv_queued_spin_lock_slowpath; pv_lock_ops.queued_spin_unlock = PV_CALLEE_SAVE(__pv_queued_spin_unlock); pv_lock_ops.wait = kvm_wait; pv_lock_ops.kick = kvm_kick_cpu; -#else /* !CONFIG_QUEUED_SPINLOCKS */ - pv_lock_ops.lock_spinning = PV_CALLEE_SAVE(kvm_lock_spinning); - pv_lock_ops.unlock_kick = kvm_unlock_kick; -#endif } static __init int kvm_spinlock_init_jump(void) diff --git a/arch/x86/kernel/paravirt-spinlocks.c b/arch/x86/kernel/paravirt-spinlocks.c index 1939a0269377..2c55a003b793 100644 --- a/arch/x86/kernel/paravirt-spinlocks.c +++ b/arch/x86/kernel/paravirt-spinlocks.c @@ -8,7 +8,6 @@ #include <asm/paravirt.h> -#ifdef CONFIG_QUEUED_SPINLOCKS __visible void __native_queued_spin_unlock(struct qspinlock *lock) { native_queued_spin_unlock(lock); @@ -21,19 +20,13 @@ bool pv_is_native_spin_unlock(void) return pv_lock_ops.queued_spin_unlock.func == __raw_callee_save___native_queued_spin_unlock; } -#endif struct pv_lock_ops pv_lock_ops = { #ifdef CONFIG_SMP -#ifdef CONFIG_QUEUED_SPINLOCKS .queued_spin_lock_slowpath = native_queued_spin_lock_slowpath, .queued_spin_unlock = PV_CALLEE_SAVE(__native_queued_spin_unlock), .wait = paravirt_nop, .kick = paravirt_nop, -#else /* !CONFIG_QUEUED_SPINLOCKS */ - .lock_spinning = __PV_IS_CALLEE_SAVE(paravirt_nop), - .unlock_kick = paravirt_nop, -#endif /* !CONFIG_QUEUED_SPINLOCKS */ #endif /* SMP */ }; EXPORT_SYMBOL(pv_lock_ops); diff --git a/arch/x86/kernel/paravirt_patch_32.c b/arch/x86/kernel/paravirt_patch_32.c index 158dc0650d5d..920c6ae08592 100644 --- a/arch/x86/kernel/paravirt_patch_32.c +++ b/arch/x86/kernel/paravirt_patch_32.c @@ -10,7 +10,7 @@ DEF_NATIVE(pv_mmu_ops, write_cr3, "mov %eax, %cr3"); DEF_NATIVE(pv_mmu_ops, read_cr3, "mov %cr3, %eax"); DEF_NATIVE(pv_cpu_ops, clts, "clts"); -#if defined(CONFIG_PARAVIRT_SPINLOCKS) && defined(CONFIG_QUEUED_SPINLOCKS) +#if defined(CONFIG_PARAVIRT_SPINLOCKS) DEF_NATIVE(pv_lock_ops, queued_spin_unlock, "movb $0, (%eax)"); #endif @@ -49,7 +49,7 @@ unsigned native_patch(u8 type, u16 clobbers, void *ibuf, PATCH_SITE(pv_mmu_ops, read_cr3); PATCH_SITE(pv_mmu_ops, write_cr3); PATCH_SITE(pv_cpu_ops, clts); -#if defined(CONFIG_PARAVIRT_SPINLOCKS) && defined(CONFIG_QUEUED_SPINLOCKS) +#if defined(CONFIG_PARAVIRT_SPINLOCKS) case PARAVIRT_PATCH(pv_lock_ops.queued_spin_unlock): if (pv_is_native_spin_unlock()) { start = start_pv_lock_ops_queued_spin_unlock; diff --git a/arch/x86/kernel/paravirt_patch_64.c b/arch/x86/kernel/paravirt_patch_64.c index e70087a04cc8..bb3840cedb4f 100644 --- a/arch/x86/kernel/paravirt_patch_64.c +++ b/arch/x86/kernel/paravirt_patch_64.c @@ -19,7 +19,7 @@ DEF_NATIVE(pv_cpu_ops, swapgs, "swapgs"); DEF_NATIVE(, mov32, "mov %edi, %eax"); DEF_NATIVE(, mov64, "mov %rdi, %rax"); -#if defined(CONFIG_PARAVIRT_SPINLOCKS) && defined(CONFIG_QUEUED_SPINLOCKS) +#if defined(CONFIG_PARAVIRT_SPINLOCKS) DEF_NATIVE(pv_lock_ops, queued_spin_unlock, "movb $0, (%rdi)"); #endif @@ -61,7 +61,7 @@ unsigned native_patch(u8 type, u16 clobbers, void *ibuf, PATCH_SITE(pv_cpu_ops, clts); PATCH_SITE(pv_mmu_ops, flush_tlb_single); PATCH_SITE(pv_cpu_ops, wbinvd); -#if defined(CONFIG_PARAVIRT_SPINLOCKS) && defined(CONFIG_QUEUED_SPINLOCKS) +#if defined(CONFIG_PARAVIRT_SPINLOCKS) case PARAVIRT_PATCH(pv_lock_ops.queued_spin_unlock): if (pv_is_native_spin_unlock()) { start = start_pv_lock_ops_queued_spin_unlock; diff --git a/arch/x86/xen/spinlock.c b/arch/x86/xen/spinlock.c index f42e78de1e10..3d6e0064cbfc 100644 --- a/arch/x86/xen/spinlock.c +++ b/arch/x86/xen/spinlock.c @@ -21,8 +21,6 @@ static DEFINE_PER_CPU(int, lock_kicker_irq) = -1; static DEFINE_PER_CPU(char *, irq_name); static bool xen_pvspin = true; -#ifdef CONFIG_QUEUED_SPINLOCKS - #include <asm/qspinlock.h> static void xen_qlock_kick(int cpu) @@ -71,207 +69,6 @@ static void xen_qlock_wait(u8 *byte, u8 val) xen_poll_irq(irq); } -#else /* CONFIG_QUEUED_SPINLOCKS */ - -enum xen_contention_stat { - TAKEN_SLOW, - TAKEN_SLOW_PICKUP, - TAKEN_SLOW_SPURIOUS, - RELEASED_SLOW, - RELEASED_SLOW_KICKED, - NR_CONTENTION_STATS -}; - - -#ifdef CONFIG_XEN_DEBUG_FS -#define HISTO_BUCKETS 30 -static struct xen_spinlock_stats -{ - u32 contention_stats[NR_CONTENTION_STATS]; - u32 histo_spin_blocked[HISTO_BUCKETS+1]; - u64 time_blocked; -} spinlock_stats; - -static u8 zero_stats; - -static inline void check_zero(void) -{ - u8 ret; - u8 old = READ_ONCE(zero_stats); - if (unlikely(old)) { - ret = cmpxchg(&zero_stats, old, 0); - /* This ensures only one fellow resets the stat */ - if (ret == old) - memset(&spinlock_stats, 0, sizeof(spinlock_stats)); - } -} - -static inline void add_stats(enum xen_contention_stat var, u32 val) -{ - check_zero(); - spinlock_stats.contention_stats[var] += val; -} - -static inline u64 spin_time_start(void) -{ - return xen_clocksource_read(); -} - -static void __spin_time_accum(u64 delta, u32 *array) -{ - unsigned index = ilog2(delta); - - check_zero(); - - if (index < HISTO_BUCKETS) - array[index]++; - else - array[HISTO_BUCKETS]++; -} - -static inline void spin_time_accum_blocked(u64 start) -{ - u32 delta = xen_clocksource_read() - start; - - __spin_time_accum(delta, spinlock_stats.histo_spin_blocked); - spinlock_stats.time_blocked += delta; -} -#else /* !CONFIG_XEN_DEBUG_FS */ -static inline void add_stats(enum xen_contention_stat var, u32 val) -{ -} - -static inline u64 spin_time_start(void) -{ - return 0; -} - -static inline void spin_time_accum_blocked(u64 start) -{ -} -#endif /* CONFIG_XEN_DEBUG_FS */ - -struct xen_lock_waiting { - struct arch_spinlock *lock; - __ticket_t want; -}; - -static DEFINE_PER_CPU(struct xen_lock_waiting, lock_waiting); -static cpumask_t waiting_cpus; - -__visible void xen_lock_spinning(struct arch_spinlock *lock, __ticket_t want) -{ - int irq = __this_cpu_read(lock_kicker_irq); - struct xen_lock_waiting *w = this_cpu_ptr(&lock_waiting); - int cpu = smp_processor_id(); - u64 start; - __ticket_t head; - unsigned long flags; - - /* If kicker interrupts not initialized yet, just spin */ - if (irq == -1) - return; - - start = spin_time_start(); - - /* - * Make sure an interrupt handler can't upset things in a - * partially setup state. - */ - local_irq_save(flags); - /* - * We don't really care if we're overwriting some other - * (lock,want) pair, as that would mean that we're currently - * in an interrupt context, and the outer context had - * interrupts enabled. That has already kicked the VCPU out - * of xen_poll_irq(), so it will just return spuriously and - * retry with newly setup (lock,want). - * - * The ordering protocol on this is that the "lock" pointer - * may only be set non-NULL if the "want" ticket is correct. - * If we're updating "want", we must first clear "lock". - */ - w->lock = NULL; - smp_wmb(); - w->want = want; - smp_wmb(); - w->lock = lock; - - /* This uses set_bit, which atomic and therefore a barrier */ - cpumask_set_cpu(cpu, &waiting_cpus); - add_stats(TAKEN_SLOW, 1); - - /* clear pending */ - xen_clear_irq_pending(irq); - - /* Only check lock once pending cleared */ - barrier(); - - /* - * Mark entry to slowpath before doing the pickup test to make - * sure we don't deadlock with an unlocker. - */ - __ticket_enter_slowpath(lock); - - /* make sure enter_slowpath, which is atomic does not cross the read */ - smp_mb__after_atomic(); - - /* - * check again make sure it didn't become free while - * we weren't looking - */ - head = READ_ONCE(lock->tickets.head); - if (__tickets_equal(head, want)) { - add_stats(TAKEN_SLOW_PICKUP, 1); - goto out; - } - - /* Allow interrupts while blocked */ - local_irq_restore(flags); - - /* - * If an interrupt happens here, it will leave the wakeup irq - * pending, which will cause xen_poll_irq() to return - * immediately. - */ - - /* Block until irq becomes pending (or perhaps a spurious wakeup) */ - xen_poll_irq(irq); - add_stats(TAKEN_SLOW_SPURIOUS, !xen_test_irq_pending(irq)); - - local_irq_save(flags); - - kstat_incr_irq_this_cpu(irq); -out: - cpumask_clear_cpu(cpu, &waiting_cpus); - w->lock = NULL; - - local_irq_restore(flags); - - spin_time_accum_blocked(start); -} -PV_CALLEE_SAVE_REGS_THUNK(xen_lock_spinning); - -static void xen_unlock_kick(struct arch_spinlock *lock, __ticket_t next) -{ - int cpu; - - add_stats(RELEASED_SLOW, 1); - - for_each_cpu(cpu, &waiting_cpus) { - const struct xen_lock_waiting *w = &per_cpu(lock_waiting, cpu); - - /* Make sure we read lock before want */ - if (READ_ONCE(w->lock) == lock && - READ_ONCE(w->want) == next) { - add_stats(RELEASED_SLOW_KICKED, 1); - xen_send_IPI_one(cpu, XEN_SPIN_UNLOCK_VECTOR); - break; - } - } -} -#endif /* CONFIG_QUEUED_SPINLOCKS */ - static irqreturn_t dummy_handler(int irq, void *dev_id) { BUG(); @@ -334,16 +131,12 @@ void __init xen_init_spinlocks(void) return; } printk(KERN_DEBUG "xen: PV spinlocks enabled\n"); -#ifdef CONFIG_QUEUED_SPINLOCKS + __pv_init_lock_hash(); pv_lock_ops.queued_spin_lock_slowpath = __pv_queued_spin_lock_slowpath; pv_lock_ops.queued_spin_unlock = PV_CALLEE_SAVE(__pv_queued_spin_unlock); pv_lock_ops.wait = xen_qlock_wait; pv_lock_ops.kick = xen_qlock_kick; -#else - pv_lock_ops.lock_spinning = PV_CALLEE_SAVE(xen_lock_spinning); - pv_lock_ops.unlock_kick = xen_unlock_kick; -#endif } /* @@ -372,44 +165,3 @@ static __init int xen_parse_nopvspin(char *arg) } early_param("xen_nopvspin", xen_parse_nopvspin); -#if defined(CONFIG_XEN_DEBUG_FS) && !defined(CONFIG_QUEUED_SPINLOCKS) - -static struct dentry *d_spin_debug; - -static int __init xen_spinlock_debugfs(void) -{ - struct dentry *d_xen = xen_init_debugfs(); - - if (d_xen == NULL) - return -ENOMEM; - - if (!xen_pvspin) - return 0; - - d_spin_debug = debugfs_create_dir("spinlocks", d_xen); - - debugfs_create_u8("zero_stats", 0644, d_spin_debug, &zero_stats); - - debugfs_create_u32("taken_slow", 0444, d_spin_debug, - &spinlock_stats.contention_stats[TAKEN_SLOW]); - debugfs_create_u32("taken_slow_pickup", 0444, d_spin_debug, - &spinlock_stats.contention_stats[TAKEN_SLOW_PICKUP]); - debugfs_create_u32("taken_slow_spurious", 0444, d_spin_debug, - &spinlock_stats.contention_stats[TAKEN_SLOW_SPURIOUS]); - - debugfs_create_u32("released_slow", 0444, d_spin_debug, - &spinlock_stats.contention_stats[RELEASED_SLOW]); - debugfs_create_u32("released_slow_kicked", 0444, d_spin_debug, - &spinlock_stats.contention_stats[RELEASED_SLOW_KICKED]); - - debugfs_create_u64("time_blocked", 0444, d_spin_debug, - &spinlock_stats.time_blocked); - - debugfs_create_u32_array("histo_blocked", 0444, d_spin_debug, - spinlock_stats.histo_spin_blocked, HISTO_BUCKETS + 1); - - return 0; -} -fs_initcall(xen_spinlock_debugfs); - -#endif /* CONFIG_XEN_DEBUG_FS */ diff --git a/fs/Kconfig b/fs/Kconfig index 2bc7ad775842..3ef62bad8f2b 100644 --- a/fs/Kconfig +++ b/fs/Kconfig @@ -79,6 +79,7 @@ config EXPORTFS_BLOCK_OPS config FILE_LOCKING bool "Enable POSIX file locking API" if EXPERT default y + select PERCPU_RWSEM help This option enables standard file locking support, required for filesystems like NFS and for the flock() system diff --git a/fs/locks.c b/fs/locks.c index ee1b15f6fc13..133fb2543d21 100644 --- a/fs/locks.c +++ b/fs/locks.c @@ -127,7 +127,6 @@ #include <linux/pid_namespace.h> #include <linux/hashtable.h> #include <linux/percpu.h> -#include <linux/lglock.h> #define CREATE_TRACE_POINTS #include <trace/events/filelock.h> @@ -158,12 +157,18 @@ int lease_break_time = 45; /* * The global file_lock_list is only used for displaying /proc/locks, so we - * keep a list on each CPU, with each list protected by its own spinlock via - * the file_lock_lglock. Note that alterations to the list also require that - * the relevant flc_lock is held. + * keep a list on each CPU, with each list protected by its own spinlock. + * Global serialization is done using file_rwsem. + * + * Note that alterations to the list also require that the relevant flc_lock is + * held. */ -DEFINE_STATIC_LGLOCK(file_lock_lglock); -static DEFINE_PER_CPU(struct hlist_head, file_lock_list); +struct file_lock_list_struct { + spinlock_t lock; + struct hlist_head hlist; +}; +static DEFINE_PER_CPU(struct file_lock_list_struct, file_lock_list); +DEFINE_STATIC_PERCPU_RWSEM(file_rwsem); /* * The blocked_hash is used to find POSIX lock loops for deadlock detection. @@ -587,15 +592,23 @@ static int posix_same_owner(struct file_lock *fl1, struct file_lock *fl2) /* Must be called with the flc_lock held! */ static void locks_insert_global_locks(struct file_lock *fl) { - lg_local_lock(&file_lock_lglock); + struct file_lock_list_struct *fll = this_cpu_ptr(&file_lock_list); + + percpu_rwsem_assert_held(&file_rwsem); + + spin_lock(&fll->lock); fl->fl_link_cpu = smp_processor_id(); - hlist_add_head(&fl->fl_link, this_cpu_ptr(&file_lock_list)); - lg_local_unlock(&file_lock_lglock); + hlist_add_head(&fl->fl_link, &fll->hlist); + spin_unlock(&fll->lock); } /* Must be called with the flc_lock held! */ static void locks_delete_global_locks(struct file_lock *fl) { + struct file_lock_list_struct *fll; + + percpu_rwsem_assert_held(&file_rwsem); + /* * Avoid taking lock if already unhashed. This is safe since this check * is done while holding the flc_lock, and new insertions into the list @@ -603,9 +616,11 @@ static void locks_delete_global_locks(struct file_lock *fl) */ if (hlist_unhashed(&fl->fl_link)) return; - lg_local_lock_cpu(&file_lock_lglock, fl->fl_link_cpu); + + fll = per_cpu_ptr(&file_lock_list, fl->fl_link_cpu); + spin_lock(&fll->lock); hlist_del_init(&fl->fl_link); - lg_local_unlock_cpu(&file_lock_lglock, fl->fl_link_cpu); + spin_unlock(&fll->lock); } static unsigned long @@ -915,6 +930,7 @@ static int flock_lock_inode(struct inode *inode, struct file_lock *request) return -ENOMEM; } + percpu_down_read_preempt_disable(&file_rwsem); spin_lock(&ctx->flc_lock); if (request->fl_flags & FL_ACCESS) goto find_conflict; @@ -955,6 +971,7 @@ find_conflict: out: spin_unlock(&ctx->flc_lock); + percpu_up_read_preempt_enable(&file_rwsem); if (new_fl) locks_free_lock(new_fl); locks_dispose_list(&dispose); @@ -991,6 +1008,7 @@ static int posix_lock_inode(struct inode *inode, struct file_lock *request, new_fl2 = locks_alloc_lock(); } + percpu_down_read_preempt_disable(&file_rwsem); spin_lock(&ctx->flc_lock); /* * New lock request. Walk all POSIX locks and look for conflicts. If @@ -1162,6 +1180,7 @@ static int posix_lock_inode(struct inode *inode, struct file_lock *request, } out: spin_unlock(&ctx->flc_lock); + percpu_up_read_preempt_enable(&file_rwsem); /* * Free any unused locks. */ @@ -1436,6 +1455,7 @@ int __break_lease(struct inode *inode, unsigned int mode, unsigned int type) return error; } + percpu_down_read_preempt_disable(&file_rwsem); spin_lock(&ctx->flc_lock); time_out_leases(inode, &dispose); @@ -1487,9 +1507,13 @@ restart: locks_insert_block(fl, new_fl); trace_break_lease_block(inode, new_fl); spin_unlock(&ctx->flc_lock); + percpu_up_read_preempt_enable(&file_rwsem); + locks_dispose_list(&dispose); error = wait_event_interruptible_timeout(new_fl->fl_wait, !new_fl->fl_next, break_time); + + percpu_down_read_preempt_disable(&file_rwsem); spin_lock(&ctx->flc_lock); trace_break_lease_unblock(inode, new_fl); locks_delete_block(new_fl); @@ -1506,6 +1530,7 @@ restart: } out: spin_unlock(&ctx->flc_lock); + percpu_up_read_preempt_enable(&file_rwsem); locks_dispose_list(&dispose); locks_free_lock(new_fl); return error; @@ -1660,6 +1685,7 @@ generic_add_lease(struct file *filp, long arg, struct file_lock **flp, void **pr return -EINVAL; } + percpu_down_read_preempt_disable(&file_rwsem); spin_lock(&ctx->flc_lock); time_out_leases(inode, &dispose); error = check_conflicting_open(dentry, arg, lease->fl_flags); @@ -1730,6 +1756,7 @@ out_setup: lease->fl_lmops->lm_setup(lease, priv); out: spin_unlock(&ctx->flc_lock); + percpu_up_read_preempt_enable(&file_rwsem); locks_dispose_list(&dispose); if (is_deleg) inode_unlock(inode); @@ -1752,6 +1779,7 @@ static int generic_delete_lease(struct file *filp, void *owner) return error; } + percpu_down_read_preempt_disable(&file_rwsem); spin_lock(&ctx->flc_lock); list_for_each_entry(fl, &ctx->flc_lease, fl_list) { if (fl->fl_file == filp && @@ -1764,6 +1792,7 @@ static int generic_delete_lease(struct file *filp, void *owner) if (victim) error = fl->fl_lmops->lm_change(victim, F_UNLCK, &dispose); spin_unlock(&ctx->flc_lock); + percpu_up_read_preempt_enable(&file_rwsem); locks_dispose_list(&dispose); return error; } @@ -2703,9 +2732,9 @@ static void *locks_start(struct seq_file *f, loff_t *pos) struct locks_iterator *iter = f->private; iter->li_pos = *pos + 1; - lg_global_lock(&file_lock_lglock); + percpu_down_write(&file_rwsem); spin_lock(&blocked_lock_lock); - return seq_hlist_start_percpu(&file_lock_list, &iter->li_cpu, *pos); + return seq_hlist_start_percpu(&file_lock_list.hlist, &iter->li_cpu, *pos); } static void *locks_next(struct seq_file *f, void *v, loff_t *pos) @@ -2713,14 +2742,14 @@ static void *locks_next(struct seq_file *f, void *v, loff_t *pos) struct locks_iterator *iter = f->private; ++iter->li_pos; - return seq_hlist_next_percpu(v, &file_lock_list, &iter->li_cpu, pos); + return seq_hlist_next_percpu(v, &file_lock_list.hlist, &iter->li_cpu, pos); } static void locks_stop(struct seq_file *f, void *v) __releases(&blocked_lock_lock) { spin_unlock(&blocked_lock_lock); - lg_global_unlock(&file_lock_lglock); + percpu_up_write(&file_rwsem); } static const struct seq_operations locks_seq_operations = { @@ -2761,10 +2790,13 @@ static int __init filelock_init(void) filelock_cache = kmem_cache_create("file_lock_cache", sizeof(struct file_lock), 0, SLAB_PANIC, NULL); - lg_lock_init(&file_lock_lglock, "file_lock_lglock"); - for_each_possible_cpu(i) - INIT_HLIST_HEAD(per_cpu_ptr(&file_lock_list, i)); + for_each_possible_cpu(i) { + struct file_lock_list_struct *fll = per_cpu_ptr(&file_lock_list, i); + + spin_lock_init(&fll->lock); + INIT_HLIST_HEAD(&fll->hlist); + } return 0; } diff --git a/include/linux/lglock.h b/include/linux/lglock.h deleted file mode 100644 index c92ebd100d9b..000000000000 --- a/include/linux/lglock.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Specialised local-global spinlock. Can only be declared as global variables - * to avoid overhead and keep things simple (and we don't want to start using - * these inside dynamically allocated structures). - * - * "local/global locks" (lglocks) can be used to: - * - * - Provide fast exclusive access to per-CPU data, with exclusive access to - * another CPU's data allowed but possibly subject to contention, and to - * provide very slow exclusive access to all per-CPU data. - * - Or to provide very fast and scalable read serialisation, and to provide - * very slow exclusive serialisation of data (not necessarily per-CPU data). - * - * Brlocks are also implemented as a short-hand notation for the latter use - * case. - * - * Copyright 2009, 2010, Nick Piggin, Novell Inc. - */ -#ifndef __LINUX_LGLOCK_H -#define __LINUX_LGLOCK_H - -#include <linux/spinlock.h> -#include <linux/lockdep.h> -#include <linux/percpu.h> -#include <linux/cpu.h> -#include <linux/notifier.h> - -#ifdef CONFIG_SMP - -#ifdef CONFIG_DEBUG_LOCK_ALLOC -#define LOCKDEP_INIT_MAP lockdep_init_map -#else -#define LOCKDEP_INIT_MAP(a, b, c, d) -#endif - -struct lglock { - arch_spinlock_t __percpu *lock; -#ifdef CONFIG_DEBUG_LOCK_ALLOC - struct lock_class_key lock_key; - struct lockdep_map lock_dep_map; -#endif -}; - -#define DEFINE_LGLOCK(name) \ - static DEFINE_PER_CPU(arch_spinlock_t, name ## _lock) \ - = __ARCH_SPIN_LOCK_UNLOCKED; \ - struct lglock name = { .lock = &name ## _lock } - -#define DEFINE_STATIC_LGLOCK(name) \ - static DEFINE_PER_CPU(arch_spinlock_t, name ## _lock) \ - = __ARCH_SPIN_LOCK_UNLOCKED; \ - static struct lglock name = { .lock = &name ## _lock } - -void lg_lock_init(struct lglock *lg, char *name); - -void lg_local_lock(struct lglock *lg); -void lg_local_unlock(struct lglock *lg); -void lg_local_lock_cpu(struct lglock *lg, int cpu); -void lg_local_unlock_cpu(struct lglock *lg, int cpu); - -void lg_double_lock(struct lglock *lg, int cpu1, int cpu2); -void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2); - -void lg_global_lock(struct lglock *lg); -void lg_global_unlock(struct lglock *lg); - -#else -/* When !CONFIG_SMP, map lglock to spinlock */ -#define lglock spinlock -#define DEFINE_LGLOCK(name) DEFINE_SPINLOCK(name) -#define DEFINE_STATIC_LGLOCK(name) static DEFINE_SPINLOCK(name) -#define lg_lock_init(lg, name) spin_lock_init(lg) -#define lg_local_lock spin_lock -#define lg_local_unlock spin_unlock -#define lg_local_lock_cpu(lg, cpu) spin_lock(lg) -#define lg_local_unlock_cpu(lg, cpu) spin_unlock(lg) -#define lg_global_lock spin_lock -#define lg_global_unlock spin_unlock -#endif - -#endif diff --git a/include/linux/percpu-rwsem.h b/include/linux/percpu-rwsem.h index c2fa3ecb0dce..5b2e6159b744 100644 --- a/include/linux/percpu-rwsem.h +++ b/include/linux/percpu-rwsem.h @@ -10,32 +10,122 @@ struct percpu_rw_semaphore { struct rcu_sync rss; - unsigned int __percpu *fast_read_ctr; + unsigned int __percpu *read_count; struct rw_semaphore rw_sem; - atomic_t slow_read_ctr; - wait_queue_head_t write_waitq; + wait_queue_head_t writer; + int readers_block; }; -extern void percpu_down_read(struct percpu_rw_semaphore *); -extern int percpu_down_read_trylock(struct percpu_rw_semaphore *); -extern void percpu_up_read(struct percpu_rw_semaphore *); +#define DEFINE_STATIC_PERCPU_RWSEM(name) \ +static DEFINE_PER_CPU(unsigned int, __percpu_rwsem_rc_##name); \ +static struct percpu_rw_semaphore name = { \ + .rss = __RCU_SYNC_INITIALIZER(name.rss, RCU_SCHED_SYNC), \ + .read_count = &__percpu_rwsem_rc_##name, \ + .rw_sem = __RWSEM_INITIALIZER(name.rw_sem), \ + .writer = __WAIT_QUEUE_HEAD_INITIALIZER(name.writer), \ +} + +extern int __percpu_down_read(struct percpu_rw_semaphore *, int); +extern void __percpu_up_read(struct percpu_rw_semaphore *); + +static inline void percpu_down_read_preempt_disable(struct percpu_rw_semaphore *sem) +{ + might_sleep(); + + rwsem_acquire_read(&sem->rw_sem.dep_map, 0, 0, _RET_IP_); + + preempt_disable(); + /* + * We are in an RCU-sched read-side critical section, so the writer + * cannot both change sem->state from readers_fast and start checking + * counters while we are here. So if we see !sem->state, we know that + * the writer won't be checking until we're past the preempt_enable() + * and that one the synchronize_sched() is done, the writer will see + * anything we did within this RCU-sched read-size critical section. + */ + __this_cpu_inc(*sem->read_count); + if (unlikely(!rcu_sync_is_idle(&sem->rss))) + __percpu_down_read(sem, false); /* Unconditional memory barrier */ + barrier(); + /* + * The barrier() prevents the compiler from + * bleeding the critical section out. + */ +} + +static inline void percpu_down_read(struct percpu_rw_semaphore *sem) +{ + percpu_down_read_preempt_disable(sem); + preempt_enable(); +} + +static inline int percpu_down_read_trylock(struct percpu_rw_semaphore *sem) +{ + int ret = 1; + + preempt_disable(); + /* + * Same as in percpu_down_read(). + */ + __this_cpu_inc(*sem->read_count); + if (unlikely(!rcu_sync_is_idle(&sem->rss))) + ret = __percpu_down_read(sem, true); /* Unconditional memory barrier */ + preempt_enable(); + /* + * The barrier() from preempt_enable() prevents the compiler from + * bleeding the critical section out. + */ + + if (ret) + rwsem_acquire_read(&sem->rw_sem.dep_map, 0, 1, _RET_IP_); + + return ret; +} + +static inline void percpu_up_read_preempt_enable(struct percpu_rw_semaphore *sem) +{ + /* + * The barrier() prevents the compiler from + * bleeding the critical section out. + */ + barrier(); + /* + * Same as in percpu_down_read(). + */ + if (likely(rcu_sync_is_idle(&sem->rss))) + __this_cpu_dec(*sem->read_count); + else + __percpu_up_read(sem); /* Unconditional memory barrier */ + preempt_enable(); + + rwsem_release(&sem->rw_sem.dep_map, 1, _RET_IP_); +} + +static inline void percpu_up_read(struct percpu_rw_semaphore *sem) +{ + preempt_disable(); + percpu_up_read_preempt_enable(sem); +} extern void percpu_down_write(struct percpu_rw_semaphore *); extern void percpu_up_write(struct percpu_rw_semaphore *); extern int __percpu_init_rwsem(struct percpu_rw_semaphore *, const char *, struct lock_class_key *); + extern void percpu_free_rwsem(struct percpu_rw_semaphore *); -#define percpu_init_rwsem(brw) \ +#define percpu_init_rwsem(sem) \ ({ \ static struct lock_class_key rwsem_key; \ - __percpu_init_rwsem(brw, #brw, &rwsem_key); \ + __percpu_init_rwsem(sem, #sem, &rwsem_key); \ }) - #define percpu_rwsem_is_held(sem) lockdep_is_held(&(sem)->rw_sem) +#define percpu_rwsem_assert_held(sem) \ + lockdep_assert_held(&(sem)->rw_sem) + static inline void percpu_rwsem_release(struct percpu_rw_semaphore *sem, bool read, unsigned long ip) { diff --git a/include/linux/rcu_sync.h b/include/linux/rcu_sync.h index a63a33e6196e..ece7ed9a4a70 100644 --- a/include/linux/rcu_sync.h +++ b/include/linux/rcu_sync.h @@ -59,6 +59,7 @@ static inline bool rcu_sync_is_idle(struct rcu_sync *rsp) } extern void rcu_sync_init(struct rcu_sync *, enum rcu_sync_type); +extern void rcu_sync_enter_start(struct rcu_sync *); extern void rcu_sync_enter(struct rcu_sync *); extern void rcu_sync_exit(struct rcu_sync *); extern void rcu_sync_dtor(struct rcu_sync *); diff --git a/kernel/cgroup.c b/kernel/cgroup.c index d6b729beba49..9ba28310eab6 100644 --- a/kernel/cgroup.c +++ b/kernel/cgroup.c @@ -5627,6 +5627,12 @@ int __init cgroup_init(void) BUG_ON(cgroup_init_cftypes(NULL, cgroup_dfl_base_files)); BUG_ON(cgroup_init_cftypes(NULL, cgroup_legacy_base_files)); + /* + * The latency of the synchronize_sched() is too high for cgroups, + * avoid it at the cost of forcing all readers into the slow path. + */ + rcu_sync_enter_start(&cgroup_threadgroup_rwsem.rss); + get_user_ns(init_cgroup_ns.user_ns); mutex_lock(&cgroup_mutex); diff --git a/kernel/futex.c b/kernel/futex.c index 46cb3a301bc1..2c4be467fecd 100644 --- a/kernel/futex.c +++ b/kernel/futex.c @@ -381,8 +381,12 @@ static inline int hb_waiters_pending(struct futex_hash_bucket *hb) #endif } -/* - * We hash on the keys returned from get_futex_key (see below). +/** + * hash_futex - Return the hash bucket in the global hash + * @key: Pointer to the futex key for which the hash is calculated + * + * We hash on the keys returned from get_futex_key (see below) and return the + * corresponding hash bucket in the global hash. */ static struct futex_hash_bucket *hash_futex(union futex_key *key) { @@ -392,7 +396,12 @@ static struct futex_hash_bucket *hash_futex(union futex_key *key) return &futex_queues[hash & (futex_hashsize - 1)]; } -/* + +/** + * match_futex - Check whether two futex keys are equal + * @key1: Pointer to key1 + * @key2: Pointer to key2 + * * Return 1 if two futex_keys are equal, 0 otherwise. */ static inline int match_futex(union futex_key *key1, union futex_key *key2) diff --git a/kernel/hung_task.c b/kernel/hung_task.c index d234022805dc..432c3d71d195 100644 --- a/kernel/hung_task.c +++ b/kernel/hung_task.c @@ -117,7 +117,7 @@ static void check_hung_task(struct task_struct *t, unsigned long timeout) pr_err("\"echo 0 > /proc/sys/kernel/hung_task_timeout_secs\"" " disables this message.\n"); sched_show_task(t); - debug_show_held_locks(t); + debug_show_all_locks(); touch_nmi_watchdog(); diff --git a/kernel/locking/Makefile b/kernel/locking/Makefile index 31322a4275cd..6f88e352cd4f 100644 --- a/kernel/locking/Makefile +++ b/kernel/locking/Makefile @@ -18,7 +18,6 @@ obj-$(CONFIG_LOCKDEP) += lockdep_proc.o endif obj-$(CONFIG_SMP) += spinlock.o obj-$(CONFIG_LOCK_SPIN_ON_OWNER) += osq_lock.o -obj-$(CONFIG_SMP) += lglock.o obj-$(CONFIG_PROVE_LOCKING) += spinlock.o obj-$(CONFIG_QUEUED_SPINLOCKS) += qspinlock.o obj-$(CONFIG_RT_MUTEXES) += rtmutex.o diff --git a/kernel/locking/lglock.c b/kernel/locking/lglock.c deleted file mode 100644 index 951cfcd10b4a..000000000000 --- a/kernel/locking/lglock.c +++ /dev/null @@ -1,111 +0,0 @@ -/* See include/linux/lglock.h for description */ -#include <linux/module.h> -#include <linux/lglock.h> -#include <linux/cpu.h> -#include <linux/string.h> - -/* - * Note there is no uninit, so lglocks cannot be defined in - * modules (but it's fine to use them from there) - * Could be added though, just undo lg_lock_init - */ - -void lg_lock_init(struct lglock *lg, char *name) -{ - LOCKDEP_INIT_MAP(&lg->lock_dep_map, name, &lg->lock_key, 0); -} -EXPORT_SYMBOL(lg_lock_init); - -void lg_local_lock(struct lglock *lg) -{ - arch_spinlock_t *lock; - - preempt_disable(); - lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_); - lock = this_cpu_ptr(lg->lock); - arch_spin_lock(lock); -} -EXPORT_SYMBOL(lg_local_lock); - -void lg_local_unlock(struct lglock *lg) -{ - arch_spinlock_t *lock; - - lock_release(&lg->lock_dep_map, 1, _RET_IP_); - lock = this_cpu_ptr(lg->lock); - arch_spin_unlock(lock); - preempt_enable(); -} -EXPORT_SYMBOL(lg_local_unlock); - -void lg_local_lock_cpu(struct lglock *lg, int cpu) -{ - arch_spinlock_t *lock; - - preempt_disable(); - lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_); - lock = per_cpu_ptr(lg->lock, cpu); - arch_spin_lock(lock); -} -EXPORT_SYMBOL(lg_local_lock_cpu); - -void lg_local_unlock_cpu(struct lglock *lg, int cpu) -{ - arch_spinlock_t *lock; - - lock_release(&lg->lock_dep_map, 1, _RET_IP_); - lock = per_cpu_ptr(lg->lock, cpu); - arch_spin_unlock(lock); - preempt_enable(); -} -EXPORT_SYMBOL(lg_local_unlock_cpu); - -void lg_double_lock(struct lglock *lg, int cpu1, int cpu2) -{ - BUG_ON(cpu1 == cpu2); - - /* lock in cpu order, just like lg_global_lock */ - if (cpu2 < cpu1) - swap(cpu1, cpu2); - - preempt_disable(); - lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_); - arch_spin_lock(per_cpu_ptr(lg->lock, cpu1)); - arch_spin_lock(per_cpu_ptr(lg->lock, cpu2)); -} - -void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2) -{ - lock_release(&lg->lock_dep_map, 1, _RET_IP_); - arch_spin_unlock(per_cpu_ptr(lg->lock, cpu1)); - arch_spin_unlock(per_cpu_ptr(lg->lock, cpu2)); - preempt_enable(); -} - -void lg_global_lock(struct lglock *lg) -{ - int i; - - preempt_disable(); - lock_acquire_exclusive(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_); - for_each_possible_cpu(i) { - arch_spinlock_t *lock; - lock = per_cpu_ptr(lg->lock, i); - arch_spin_lock(lock); - } -} -EXPORT_SYMBOL(lg_global_lock); - -void lg_global_unlock(struct lglock *lg) -{ - int i; - - lock_release(&lg->lock_dep_map, 1, _RET_IP_); - for_each_possible_cpu(i) { - arch_spinlock_t *lock; - lock = per_cpu_ptr(lg->lock, i); - arch_spin_unlock(lock); - } - preempt_enable(); -} -EXPORT_SYMBOL(lg_global_unlock); diff --git a/kernel/locking/percpu-rwsem.c b/kernel/locking/percpu-rwsem.c index bec0b647f9cc..ce182599cf2e 100644 --- a/kernel/locking/percpu-rwsem.c +++ b/kernel/locking/percpu-rwsem.c @@ -8,152 +8,186 @@ #include <linux/sched.h> #include <linux/errno.h> -int __percpu_init_rwsem(struct percpu_rw_semaphore *brw, +int __percpu_init_rwsem(struct percpu_rw_semaphore *sem, const char *name, struct lock_class_key *rwsem_key) { - brw->fast_read_ctr = alloc_percpu(int); - if (unlikely(!brw->fast_read_ctr)) + sem->read_count = alloc_percpu(int); + if (unlikely(!sem->read_count)) return -ENOMEM; /* ->rw_sem represents the whole percpu_rw_semaphore for lockdep */ - __init_rwsem(&brw->rw_sem, name, rwsem_key); - rcu_sync_init(&brw->rss, RCU_SCHED_SYNC); - atomic_set(&brw->slow_read_ctr, 0); - init_waitqueue_head(&brw->write_waitq); + rcu_sync_init(&sem->rss, RCU_SCHED_SYNC); + __init_rwsem(&sem->rw_sem, name, rwsem_key); + init_waitqueue_head(&sem->writer); + sem->readers_block = 0; return 0; } EXPORT_SYMBOL_GPL(__percpu_init_rwsem); -void percpu_free_rwsem(struct percpu_rw_semaphore *brw) +void percpu_free_rwsem(struct percpu_rw_semaphore *sem) { /* * XXX: temporary kludge. The error path in alloc_super() * assumes that percpu_free_rwsem() is safe after kzalloc(). */ - if (!brw->fast_read_ctr) + if (!sem->read_count) return; - rcu_sync_dtor(&brw->rss); - free_percpu(brw->fast_read_ctr); - brw->fast_read_ctr = NULL; /* catch use after free bugs */ + rcu_sync_dtor(&sem->rss); + free_percpu(sem->read_count); + sem->read_count = NULL; /* catch use after free bugs */ } EXPORT_SYMBOL_GPL(percpu_free_rwsem); -/* - * This is the fast-path for down_read/up_read. If it succeeds we rely - * on the barriers provided by rcu_sync_enter/exit; see the comments in - * percpu_down_write() and percpu_up_write(). - * - * If this helper fails the callers rely on the normal rw_semaphore and - * atomic_dec_and_test(), so in this case we have the necessary barriers. - */ -static bool update_fast_ctr(struct percpu_rw_semaphore *brw, unsigned int val) +int __percpu_down_read(struct percpu_rw_semaphore *sem, int try) { - bool success; + /* + * Due to having preemption disabled the decrement happens on + * the same CPU as the increment, avoiding the + * increment-on-one-CPU-and-decrement-on-another problem. + * + * If the reader misses the writer's assignment of readers_block, then + * the writer is guaranteed to see the reader's increment. + * + * Conversely, any readers that increment their sem->read_count after + * the writer looks are guaranteed to see the readers_block value, + * which in turn means that they are guaranteed to immediately + * decrement their sem->read_count, so that it doesn't matter that the + * writer missed them. + */ - preempt_disable(); - success = rcu_sync_is_idle(&brw->rss); - if (likely(success)) - __this_cpu_add(*brw->fast_read_ctr, val); - preempt_enable(); + smp_mb(); /* A matches D */ - return success; -} + /* + * If !readers_block the critical section starts here, matched by the + * release in percpu_up_write(). + */ + if (likely(!smp_load_acquire(&sem->readers_block))) + return 1; -/* - * Like the normal down_read() this is not recursive, the writer can - * come after the first percpu_down_read() and create the deadlock. - * - * Note: returns with lock_is_held(brw->rw_sem) == T for lockdep, - * percpu_up_read() does rwsem_release(). This pairs with the usage - * of ->rw_sem in percpu_down/up_write(). - */ -void percpu_down_read(struct percpu_rw_semaphore *brw) -{ - might_sleep(); - rwsem_acquire_read(&brw->rw_sem.dep_map, 0, 0, _RET_IP_); + /* + * Per the above comment; we still have preemption disabled and + * will thus decrement on the same CPU as we incremented. + */ + __percpu_up_read(sem); - if (likely(update_fast_ctr(brw, +1))) - return; + if (try) + return 0; - /* Avoid rwsem_acquire_read() and rwsem_release() */ - __down_read(&brw->rw_sem); - atomic_inc(&brw->slow_read_ctr); - __up_read(&brw->rw_sem); -} -EXPORT_SYMBOL_GPL(percpu_down_read); + /* + * We either call schedule() in the wait, or we'll fall through + * and reschedule on the preempt_enable() in percpu_down_read(). + */ + preempt_enable_no_resched(); -int percpu_down_read_trylock(struct percpu_rw_semaphore *brw) -{ - if (unlikely(!update_fast_ctr(brw, +1))) { - if (!__down_read_trylock(&brw->rw_sem)) - return 0; - atomic_inc(&brw->slow_read_ctr); - __up_read(&brw->rw_sem); - } - - rwsem_acquire_read(&brw->rw_sem.dep_map, 0, 1, _RET_IP_); + /* + * Avoid lockdep for the down/up_read() we already have them. + */ + __down_read(&sem->rw_sem); + this_cpu_inc(*sem->read_count); + __up_read(&sem->rw_sem); + + preempt_disable(); return 1; } +EXPORT_SYMBOL_GPL(__percpu_down_read); -void percpu_up_read(struct percpu_rw_semaphore *brw) +void __percpu_up_read(struct percpu_rw_semaphore *sem) { - rwsem_release(&brw->rw_sem.dep_map, 1, _RET_IP_); - - if (likely(update_fast_ctr(brw, -1))) - return; + smp_mb(); /* B matches C */ + /* + * In other words, if they see our decrement (presumably to aggregate + * zero, as that is the only time it matters) they will also see our + * critical section. + */ + __this_cpu_dec(*sem->read_count); - /* false-positive is possible but harmless */ - if (atomic_dec_and_test(&brw->slow_read_ctr)) - wake_up_all(&brw->write_waitq); + /* Prod writer to recheck readers_active */ + wake_up(&sem->writer); } -EXPORT_SYMBOL_GPL(percpu_up_read); +EXPORT_SYMBOL_GPL(__percpu_up_read); + +#define per_cpu_sum(var) \ +({ \ + typeof(var) __sum = 0; \ + int cpu; \ + compiletime_assert_atomic_type(__sum); \ + for_each_possible_cpu(cpu) \ + __sum += per_cpu(var, cpu); \ + __sum; \ +}) -static int clear_fast_ctr(struct percpu_rw_semaphore *brw) +/* + * Return true if the modular sum of the sem->read_count per-CPU variable is + * zero. If this sum is zero, then it is stable due to the fact that if any + * newly arriving readers increment a given counter, they will immediately + * decrement that same counter. + */ +static bool readers_active_check(struct percpu_rw_semaphore *sem) { - unsigned int sum = 0; - int cpu; + if (per_cpu_sum(*sem->read_count) != 0) + return false; + + /* + * If we observed the decrement; ensure we see the entire critical + * section. + */ - for_each_possible_cpu(cpu) { - sum += per_cpu(*brw->fast_read_ctr, cpu); - per_cpu(*brw->fast_read_ctr, cpu) = 0; - } + smp_mb(); /* C matches B */ - return sum; + return true; } -void percpu_down_write(struct percpu_rw_semaphore *brw) +void percpu_down_write(struct percpu_rw_semaphore *sem) { + /* Notify readers to take the slow path. */ + rcu_sync_enter(&sem->rss); + + down_write(&sem->rw_sem); + /* - * Make rcu_sync_is_idle() == F and thus disable the fast-path in - * percpu_down_read() and percpu_up_read(), and wait for gp pass. - * - * The latter synchronises us with the preceding readers which used - * the fast-past, so we can not miss the result of __this_cpu_add() - * or anything else inside their criticial sections. + * Notify new readers to block; up until now, and thus throughout the + * longish rcu_sync_enter() above, new readers could still come in. */ - rcu_sync_enter(&brw->rss); + WRITE_ONCE(sem->readers_block, 1); - /* exclude other writers, and block the new readers completely */ - down_write(&brw->rw_sem); + smp_mb(); /* D matches A */ - /* nobody can use fast_read_ctr, move its sum into slow_read_ctr */ - atomic_add(clear_fast_ctr(brw), &brw->slow_read_ctr); + /* + * If they don't see our writer of readers_block, then we are + * guaranteed to see their sem->read_count increment, and therefore + * will wait for them. + */ - /* wait for all readers to complete their percpu_up_read() */ - wait_event(brw->write_waitq, !atomic_read(&brw->slow_read_ctr)); + /* Wait for all now active readers to complete. */ + wait_event(sem->writer, readers_active_check(sem)); } EXPORT_SYMBOL_GPL(percpu_down_write); -void percpu_up_write(struct percpu_rw_semaphore *brw) +void percpu_up_write(struct percpu_rw_semaphore *sem) { - /* release the lock, but the readers can't use the fast-path */ - up_write(&brw->rw_sem); /* - * Enable the fast-path in percpu_down_read() and percpu_up_read() - * but only after another gp pass; this adds the necessary barrier - * to ensure the reader can't miss the changes done by us. + * Signal the writer is done, no fast path yet. + * + * One reason that we cannot just immediately flip to readers_fast is + * that new readers might fail to see the results of this writer's + * critical section. + * + * Therefore we force it through the slow path which guarantees an + * acquire and thereby guarantees the critical section's consistency. + */ + smp_store_release(&sem->readers_block, 0); + + /* + * Release the write lock, this will allow readers back in the game. + */ + up_write(&sem->rw_sem); + + /* + * Once this completes (at least one RCU-sched grace period hence) the + * reader fast path will be available again. Safe to use outside the + * exclusive write lock because its counting. */ - rcu_sync_exit(&brw->rss); + rcu_sync_exit(&sem->rss); } EXPORT_SYMBOL_GPL(percpu_up_write); diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h index 8a99abf58080..e3b5520005db 100644 --- a/kernel/locking/qspinlock_paravirt.h +++ b/kernel/locking/qspinlock_paravirt.h @@ -70,11 +70,14 @@ struct pv_node { static inline bool pv_queued_spin_steal_lock(struct qspinlock *lock) { struct __qspinlock *l = (void *)lock; - int ret = !(atomic_read(&lock->val) & _Q_LOCKED_PENDING_MASK) && - (cmpxchg(&l->locked, 0, _Q_LOCKED_VAL) == 0); - qstat_inc(qstat_pv_lock_stealing, ret); - return ret; + if (!(atomic_read(&lock->val) & _Q_LOCKED_PENDING_MASK) && + (cmpxchg(&l->locked, 0, _Q_LOCKED_VAL) == 0)) { + qstat_inc(qstat_pv_lock_stealing, true); + return true; + } + + return false; } /* @@ -257,7 +260,6 @@ static struct pv_node *pv_unhash(struct qspinlock *lock) static inline bool pv_wait_early(struct pv_node *prev, int loop) { - if ((loop & PV_PREV_CHECK_MASK) != 0) return false; @@ -286,12 +288,10 @@ static void pv_wait_node(struct mcs_spinlock *node, struct mcs_spinlock *prev) { struct pv_node *pn = (struct pv_node *)node; struct pv_node *pp = (struct pv_node *)prev; - int waitcnt = 0; int loop; bool wait_early; - /* waitcnt processing will be compiled out if !QUEUED_LOCK_STAT */ - for (;; waitcnt++) { + for (;;) { for (wait_early = false, loop = SPIN_THRESHOLD; loop; loop--) { if (READ_ONCE(node->locked)) return; @@ -315,7 +315,6 @@ static void pv_wait_node(struct mcs_spinlock *node, struct mcs_spinlock *prev) if (!READ_ONCE(node->locked)) { qstat_inc(qstat_pv_wait_node, true); - qstat_inc(qstat_pv_wait_again, waitcnt); qstat_inc(qstat_pv_wait_early, wait_early); pv_wait(&pn->state, vcpu_halted); } @@ -456,12 +455,9 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node) pv_wait(&l->locked, _Q_SLOW_VAL); /* - * The unlocker should have freed the lock before kicking the - * CPU. So if the lock is still not free, it is a spurious - * wakeup or another vCPU has stolen the lock. The current - * vCPU should spin again. + * Because of lock stealing, the queue head vCPU may not be + * able to acquire the lock before it has to wait again. */ - qstat_inc(qstat_pv_spurious_wakeup, READ_ONCE(l->locked)); } /* @@ -544,7 +540,7 @@ __visible void __pv_queued_spin_unlock(struct qspinlock *lock) * unhash. Otherwise it would be possible to have multiple @lock * entries, which would be BAD. */ - locked = cmpxchg(&l->locked, _Q_LOCKED_VAL, 0); + locked = cmpxchg_release(&l->locked, _Q_LOCKED_VAL, 0); if (likely(locked == _Q_LOCKED_VAL)) return; diff --git a/kernel/locking/qspinlock_stat.h b/kernel/locking/qspinlock_stat.h index b9d031516254..eb0a599fcf58 100644 --- a/kernel/locking/qspinlock_stat.h +++ b/kernel/locking/qspinlock_stat.h @@ -24,8 +24,8 @@ * pv_latency_wake - average latency (ns) from vCPU kick to wakeup * pv_lock_slowpath - # of locking operations via the slowpath * pv_lock_stealing - # of lock stealing operations - * pv_spurious_wakeup - # of spurious wakeups - * pv_wait_again - # of vCPU wait's that happened after a vCPU kick + * pv_spurious_wakeup - # of spurious wakeups in non-head vCPUs + * pv_wait_again - # of wait's after a queue head vCPU kick * pv_wait_early - # of early vCPU wait's * pv_wait_head - # of vCPU wait's at the queue head * pv_wait_node - # of vCPU wait's at a non-head queue node diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c index 447e08de1fab..2337b4bb2366 100644 --- a/kernel/locking/rwsem-xadd.c +++ b/kernel/locking/rwsem-xadd.c @@ -121,16 +121,19 @@ enum rwsem_wake_type { * - woken process blocks are discarded from the list after having task zeroed * - writers are only marked woken if downgrading is false */ -static struct rw_semaphore * -__rwsem_mark_wake(struct rw_semaphore *sem, - enum rwsem_wake_type wake_type, struct wake_q_head *wake_q) +static void __rwsem_mark_wake(struct rw_semaphore *sem, + enum rwsem_wake_type wake_type, + struct wake_q_head *wake_q) { - struct rwsem_waiter *waiter; - struct task_struct *tsk; - struct list_head *next; - long oldcount, woken, loop, adjustment; + struct rwsem_waiter *waiter, *tmp; + long oldcount, woken = 0, adjustment = 0; + + /* + * Take a peek at the queue head waiter such that we can determine + * the wakeup(s) to perform. + */ + waiter = list_first_entry(&sem->wait_list, struct rwsem_waiter, list); - waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); if (waiter->type == RWSEM_WAITING_FOR_WRITE) { if (wake_type == RWSEM_WAKE_ANY) { /* @@ -142,19 +145,19 @@ __rwsem_mark_wake(struct rw_semaphore *sem, */ wake_q_add(wake_q, waiter->task); } - goto out; + + return; } - /* Writers might steal the lock before we grant it to the next reader. + /* + * Writers might steal the lock before we grant it to the next reader. * We prefer to do the first reader grant before counting readers * so we can bail out early if a writer stole the lock. */ - adjustment = 0; if (wake_type != RWSEM_WAKE_READ_OWNED) { adjustment = RWSEM_ACTIVE_READ_BIAS; try_reader_grant: oldcount = atomic_long_fetch_add(adjustment, &sem->count); - if (unlikely(oldcount < RWSEM_WAITING_BIAS)) { /* * If the count is still less than RWSEM_WAITING_BIAS @@ -164,7 +167,8 @@ __rwsem_mark_wake(struct rw_semaphore *sem, */ if (atomic_long_add_return(-adjustment, &sem->count) < RWSEM_WAITING_BIAS) - goto out; + return; + /* Last active locker left. Retry waking readers. */ goto try_reader_grant; } @@ -176,38 +180,23 @@ __rwsem_mark_wake(struct rw_semaphore *sem, rwsem_set_reader_owned(sem); } - /* Grant an infinite number of read locks to the readers at the front - * of the queue. Note we increment the 'active part' of the count by - * the number of readers before waking any processes up. + /* + * Grant an infinite number of read locks to the readers at the front + * of the queue. We know that woken will be at least 1 as we accounted + * for above. Note we increment the 'active part' of the count by the + * number of readers before waking any processes up. */ - woken = 0; - do { - woken++; + list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) { + struct task_struct *tsk; - if (waiter->list.next == &sem->wait_list) + if (waiter->type == RWSEM_WAITING_FOR_WRITE) break; - waiter = list_entry(waiter->list.next, - struct rwsem_waiter, list); - - } while (waiter->type != RWSEM_WAITING_FOR_WRITE); - - adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment; - if (waiter->type != RWSEM_WAITING_FOR_WRITE) - /* hit end of list above */ - adjustment -= RWSEM_WAITING_BIAS; - - if (adjustment) - atomic_long_add(adjustment, &sem->count); - - next = sem->wait_list.next; - loop = woken; - do { - waiter = list_entry(next, struct rwsem_waiter, list); - next = waiter->list.next; + woken++; tsk = waiter->task; wake_q_add(wake_q, tsk); + list_del(&waiter->list); /* * Ensure that the last operation is setting the reader * waiter to nil such that rwsem_down_read_failed() cannot @@ -215,13 +204,16 @@ __rwsem_mark_wake(struct rw_semaphore *sem, * to the task to wakeup. */ smp_store_release(&waiter->task, NULL); - } while (--loop); + } - sem->wait_list.next = next; - next->prev = &sem->wait_list; + adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment; + if (list_empty(&sem->wait_list)) { + /* hit end of list above */ + adjustment -= RWSEM_WAITING_BIAS; + } - out: - return sem; + if (adjustment) + atomic_long_add(adjustment, &sem->count); } /* @@ -235,7 +227,6 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) struct task_struct *tsk = current; WAKE_Q(wake_q); - /* set up my own style of waitqueue */ waiter.task = tsk; waiter.type = RWSEM_WAITING_FOR_READ; @@ -247,7 +238,8 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) /* we're now waiting on the lock, but no longer actively locking */ count = atomic_long_add_return(adjustment, &sem->count); - /* If there are no active locks, wake the front queued process(es). + /* + * If there are no active locks, wake the front queued process(es). * * If there are no writers and we are first in the queue, * wake our own waiter to join the existing active readers ! @@ -255,7 +247,7 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) if (count == RWSEM_WAITING_BIAS || (count > RWSEM_WAITING_BIAS && adjustment != -RWSEM_ACTIVE_READ_BIAS)) - sem = __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); + __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); raw_spin_unlock_irq(&sem->wait_lock); wake_up_q(&wake_q); @@ -505,7 +497,7 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state) if (count > RWSEM_WAITING_BIAS) { WAKE_Q(wake_q); - sem = __rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q); + __rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q); /* * The wakeup is normally called _after_ the wait_lock * is released, but given that we are proactively waking @@ -614,9 +606,8 @@ struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem) raw_spin_lock_irqsave(&sem->wait_lock, flags); locked: - /* do nothing if list empty */ if (!list_empty(&sem->wait_list)) - sem = __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); + __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); raw_spin_unlock_irqrestore(&sem->wait_lock, flags); wake_up_q(&wake_q); @@ -638,9 +629,8 @@ struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem) raw_spin_lock_irqsave(&sem->wait_lock, flags); - /* do nothing if list empty */ if (!list_empty(&sem->wait_list)) - sem = __rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q); + __rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q); raw_spin_unlock_irqrestore(&sem->wait_lock, flags); wake_up_q(&wake_q); diff --git a/kernel/rcu/sync.c b/kernel/rcu/sync.c index be922c9f3d37..50d1861f7759 100644 --- a/kernel/rcu/sync.c +++ b/kernel/rcu/sync.c @@ -68,6 +68,8 @@ void rcu_sync_lockdep_assert(struct rcu_sync *rsp) RCU_LOCKDEP_WARN(!gp_ops[rsp->gp_type].held(), "suspicious rcu_sync_is_idle() usage"); } + +EXPORT_SYMBOL_GPL(rcu_sync_lockdep_assert); #endif /** @@ -83,6 +85,18 @@ void rcu_sync_init(struct rcu_sync *rsp, enum rcu_sync_type type) } /** + * Must be called after rcu_sync_init() and before first use. + * + * Ensures rcu_sync_is_idle() returns false and rcu_sync_{enter,exit}() + * pairs turn into NO-OPs. + */ +void rcu_sync_enter_start(struct rcu_sync *rsp) +{ + rsp->gp_count++; + rsp->gp_state = GP_PASSED; +} + +/** * rcu_sync_enter() - Force readers onto slowpath * @rsp: Pointer to rcu_sync structure to use for synchronization * diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c index 4a1ca5f6da7e..ae6f41fb9cba 100644 --- a/kernel/stop_machine.c +++ b/kernel/stop_machine.c @@ -20,7 +20,6 @@ #include <linux/kallsyms.h> #include <linux/smpboot.h> #include <linux/atomic.h> -#include <linux/lglock.h> #include <linux/nmi.h> /* @@ -47,13 +46,9 @@ struct cpu_stopper { static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper); static bool stop_machine_initialized = false; -/* - * Avoids a race between stop_two_cpus and global stop_cpus, where - * the stoppers could get queued up in reverse order, leading to - * system deadlock. Using an lglock means stop_two_cpus remains - * relatively cheap. - */ -DEFINE_STATIC_LGLOCK(stop_cpus_lock); +/* static data for stop_cpus */ +static DEFINE_MUTEX(stop_cpus_mutex); +static bool stop_cpus_in_progress; static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int nr_todo) { @@ -230,14 +225,26 @@ static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1, struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1); struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2); int err; - - lg_double_lock(&stop_cpus_lock, cpu1, cpu2); +retry: spin_lock_irq(&stopper1->lock); spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING); err = -ENOENT; if (!stopper1->enabled || !stopper2->enabled) goto unlock; + /* + * Ensure that if we race with __stop_cpus() the stoppers won't get + * queued up in reverse order leading to system deadlock. + * + * We can't miss stop_cpus_in_progress if queue_stop_cpus_work() has + * queued a work on cpu1 but not on cpu2, we hold both locks. + * + * It can be falsely true but it is safe to spin until it is cleared, + * queue_stop_cpus_work() does everything under preempt_disable(). + */ + err = -EDEADLK; + if (unlikely(stop_cpus_in_progress)) + goto unlock; err = 0; __cpu_stop_queue_work(stopper1, work1); @@ -245,8 +252,12 @@ static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1, unlock: spin_unlock(&stopper2->lock); spin_unlock_irq(&stopper1->lock); - lg_double_unlock(&stop_cpus_lock, cpu1, cpu2); + if (unlikely(err == -EDEADLK)) { + while (stop_cpus_in_progress) + cpu_relax(); + goto retry; + } return err; } /** @@ -316,9 +327,6 @@ bool stop_one_cpu_nowait(unsigned int cpu, cpu_stop_fn_t fn, void *arg, return cpu_stop_queue_work(cpu, work_buf); } -/* static data for stop_cpus */ -static DEFINE_MUTEX(stop_cpus_mutex); - static bool queue_stop_cpus_work(const struct cpumask *cpumask, cpu_stop_fn_t fn, void *arg, struct cpu_stop_done *done) @@ -332,7 +340,8 @@ static bool queue_stop_cpus_work(const struct cpumask *cpumask, * preempted by a stopper which might wait for other stoppers * to enter @fn which can lead to deadlock. */ - lg_global_lock(&stop_cpus_lock); + preempt_disable(); + stop_cpus_in_progress = true; for_each_cpu(cpu, cpumask) { work = &per_cpu(cpu_stopper.stop_work, cpu); work->fn = fn; @@ -341,7 +350,8 @@ static bool queue_stop_cpus_work(const struct cpumask *cpumask, if (cpu_stop_queue_work(cpu, work)) queued = true; } - lg_global_unlock(&stop_cpus_lock); + stop_cpus_in_progress = false; + preempt_enable(); return queued; } |