Синхронізація в Go: горутини, тести, варіанти
Привіт, мене звати Ярослав, займаюсь розробкою в компанії Evrius. Ця стаття про синхронізацію результатів від паралельно виконаних підзадач, призначена для спеціалістів-початківців та тих, хто планує перейти на Go.
На початку 2019 року, маючи досвід з Go, шукав нову роботу. Під час більшості співбесід ставили запитання, як розпаралелити виконання завдання. Приблизний опис завдання: є список посилань, треба за ними перейти, отримати результат та синхронізувати. Вирішення було достатньо для проходження технічної частини в пару аутсорс-компаній.
Вартість горутини
Кожен розробник, який використовує Go, знає, що горутини дешеві. Трохи менше знають, що розмір мінімального стека горутини змінювали в ранніх версіях Go, у версії 1.13.
_StackMin = 2048
А щоб перевірити горутини на швидкодію, напишемо тест, у якому запустимо N горутин з простим завданням, дочекаємося завершення й подивимося результати:
package benchmarks
import (
"sync"
"sync/atomic"
"testing"
)
func BenchmarkGoroutineCost(b *testing.B) {
var value uint32
var wg sync.WaitGroup
wg.Add(b.N)
for i := 0; i < b.N; i++ {
go func() {
atomic.AddUint32(&value, 1)
wg.Done()
}()
}
wg.Wait()
if value != uint32(b.N) {
b.Errorf("expected %d, got %d", b.N, value)
}
}
go version
go version go1.13.3 linux/amd64
go test./benchmarks/... -v -bench=BenchmarkGoroutineCost -benchmem
BenchmarkGoroutineCost-4 3437931 351 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.563s
~350 наносекунд на створення, виконання, завершення горутини.
Спершу я в це повірив, але потім вирішив написати ще один тест, щоб перевірити відсутність оптимізації такої простої операції, а також те, що чисельність горутин під час виконання збільшується.
package benchmarks
import (
"runtime"
"sync"
"sync/atomic"
"testing"
)
type goRuntimeMaxCount struct {
mu sync.Mutex
value int
}
func (c *goRuntimeMaxCount) update() {
var value = runtime.NumGoroutine()
c.mu.Lock()
if value > c.value {
c.value = value
}
c.mu.Unlock()
}
func (c *goRuntimeMaxCount) get() int {
return c.value
}
func BenchmarkGoroutineCostDump(b *testing.B) {
var (
value = uint32(0)
wg = new(sync.WaitGroup)
goRuntimeCount = new(goRuntimeMaxCount)
)
b.Logf("before goroutine count %d", goRuntimeCount.get())
wg.Add(b.N)
for i := 0; i < b.N; i++ {
go func() {
atomic.AddUint32(&value, 1)
goRuntimeCount.update()
wg.Done()
}()
}
wg.Wait()
if value != uint32(b.N) {
b.Errorf("expected %d, got %d", b.N, value)
}
b.Logf("after goroutine count %d for b.N = %d", goRuntimeCount.get(), b.N)
}
go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostDump -benchmem
BenchmarkGoroutineCostDump-4 3223651 366 ns/op 0 B/op 0 allocs/op
before goroutine count 0
after goroutine count 904 for b.N = 1000000
before goroutine count 0
after goroutine count 3160 for b.N = 3223651
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.562s
Тест показав, що горутини справді створюються. А тепер змінимо тест так, щоб на кожній ітерації циклу горутина створювалася і завершувалася:
func BenchmarkGoroutineCostOne(b *testing.B) {
var value uint32
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
atomic.AddUint32(&value, 1)
wg.Done()
}()
wg.Wait()
}
if value != uint32(b.N) {
b.Errorf("expected %d, got %d", b.N, value)
}
}
func BenchmarkGoroutineCostOneOverhead(b *testing.B) {
var value uint32
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
atomic.AddUint32(&value, 1)
wg.Done()
wg.Wait()
}
if value != uint32(b.N) {
b.Errorf("expected %d, got %d", b.N, value)
}
}
go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostOne -benchmem
BenchmarkGoroutineCostOne-4 1488328 841 ns/op 0 B/op 0 allocs/op
BenchmarkGoroutineCostOneOverhead-4 32435746 34.6 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 3.229s
Тести проводив на домашньому Intel® Core™ i5-4210U CPU @ 1.70GHz, що показали вартість горутини ~800 наносекунд.
Для порівняння: знаходження максимального елементу в масиві з 1024 елементів ~1400 наносекунд.
Атомарність
Ми вже використовували пакет atomic, потрібний для паралельних операцій, щоб гарантувати їхню успішність.
Якщо з попередніх тестів забрати atomic і використовувати просте додавання:
func TestParallelPureIncrement(t *testing.T) {
const n = 1000000
var (
value uint32
wg = new(sync.WaitGroup)
)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
value++ // same sa "value = value + 1"
wg.Done()
}()
}
wg.Wait()
if value != n {
t.Errorf("expected %d, got %d", n, value)
}
}
Отримаємо:
=== RUN TestParallelPureIncrement
--- FAIL: TestParallelPureIncrement (0.35s)
expected 1000000, got 851804
FAIL
FAIL gitlab.com/go-yp/go-sync/benchmarks 0.353s
Очікували 1 000 000, отримали 851 804, через відсутність синхронізації між горутинами.
На основі пакета atomic ґрунтуються інші структури в Go, що використовують для синхронізації. Так, у тесті замість WaitGroup.Wait, ми можемо використати циклічну перевірку:
func TestParallelPureAtomic(t *testing.T) {
const n = 1000000
var value uint32
for i := 0; i < n; i++ {
go func() {
atomic.AddUint32(&value, 1)
}()
}
for atomic.LoadUint32(&value) < n {
// NOP
}
}
=== RUN TestParallelPureAtomic
--- PASS: TestParallelPureAtomic (0.42s)
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 0.425s
і додати перемикання на інші горутини runtime.Gosched()
for atomic.LoadUint32(&value) < n {
runtime.Gosched()
}
Проте ліпше використовуйте sync.WaitGroup.
Синхронізація результатів від горутин і перегони даних (data race)
Якщо ви початківець у Go, а вам треба розпаралелити завдання, то ліпше напишіть тест для цього завдання, розпаралельте й запустіть з флагом -race, щоб перевірити наявність data race.
Data race — помилка проектування, стан програми, коли один або більше потоків змінюють дані без блокування й один або більше читають ці дані без блокування, у результаті програма працює інакше, ніж очікуємо.
Знайдемо функціонал, що розробили в однопотоковому варіанті й зі збільшенням завдань — розпаралелили.
Наприклад, візьмемо сторінку bestofjs.org/projects, де навпроти кожного проекту бачимо число зірок на GitHub-і.
Раз на день ці числа треба оновлювати, а в базі всього 20 репозиторіїв.
Є функція, щоб отримати число зірок за ID репозиторію:
func fetchRepositoryStarByID(id int) int {
// emulate slow http request by sleep
time.Sleep(100 * time.Millisecond)
// emulate response
var stars = id % 32
return stars
}
і програма, яку запускають раз на день, щоб оновити число зірочок для кожного репозиторію:
package main
import (
"fmt"
"time"
)
const repositoryCount = 20
type repository struct {
id int
starCount int
}
func main() {
var startTime = time.Now()
var ids = getRepositoryIDs()
var repositories = fetchRepositoryStarsByIDs(ids)
updateRepositoryStars(repositories)
var duration = time.Since(startTime)
fmt.Printf("fetch %d from %d repositories by %d \n", len(repositories), repositoryCount, duration)
}
func getRepositoryIDs() []int {
return make([]int, repositoryCount)
}
func fetchRepositoryStarsByIDs(ids []int) []repository {
var result = make([]repository, 0, len(ids))
for _, id := range ids {
result = append(result, repository{
id: id,
starCount: fetchRepositoryStarByID(id),
})
}
return result
}
func fetchRepositoryStarByID(id int) int {
// emulate slow http request by sleep
time.Sleep(100 * time.Millisecond)
// emulate response
var stars = id % 32
return stars
}
func updateRepositoryStars(repositories []repository) {
// NOP
}
Програма виконується за 2 секунди.
Опублікували React, Vue, Svelte й розширення до них, тепер у базі 100 репозиторіїв і програма виконується за 10 секунд.
З цим треба щось робити, бо чекаємо, що в проекті буде ще більше репозиторіїв, а отже, оновлення займатиме ще більше часу.
Вирішили розпаралелити з використанням горутин і WaitGroup.
Тепер fetchRepositoryStarsByIDs має такий вигляд:
package main
import (
"fmt"
"sync"
"time"
)
const repositoryCount = 100
// ... same
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, 0, length)
wg = new(sync.WaitGroup)
)
wg.Add(length)
for _, id := range ids {
go func() {
result = append(result, repository{
id: id,
starCount: fetchRepositoryStarByID(id),
})
wg.Done()
}()
}
wg.Wait()
return result
}
виконується за 100 мілісекунд, але в консолі бачимо, що тільки частину репозиторіїв оновлено:
fetch 81 from 100 repositories by 112999451
Запустимо з флагом -race
go run -race main.go
й отримаємо повідомлення, у яких рядках коду є помилки (вивів тільки частину повідомлення):
WARNING: DATA RACE
Read at 0×00c00009a020 by goroutine 8:
main.fetchRepositoryStarsByIDs.func1()
gitlab.com/go-yp/go-sync/main.go:41 +0×91
У цьому коді відразу дві помилки з data race.
Перша помилка data race: змінна id змінюється в основній горутині, де виконується цикл for і читається зі створених горутин.
Ось приклад, що покаже цю помилку:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var ids = []int{1, 2, 3}
var wg = new(sync.WaitGroup)
wg.Add(3)
for _, id := range ids {
go func() {
time.Sleep(time.Millisecond)
fmt.Printf("id is %d\n", id)
wg.Done()
}()
}
wg.Wait()
}
id is 3
id is 3
id is 3
Варіанти розв’язання:
for _, id := range ids {
go func(id int) {
time.Sleep(time.Millisecond)
fmt.Printf("id is %d\n", id)
wg.Done()
}(id)
}
for _, id := range ids {
id := id
go func() {
time.Sleep(time.Millisecond)
fmt.Printf("id is %d\n", id)
wg.Done()
}()
}
Друга помилка data race: це append, що змінює SliceHeader через append з багатьох горутин.
Є три відомі мені варіанти розв’язання проблеми data race під час збереження результатів від горутин.
У першому: ми ініціалізуємо slice, і кожна горутина пише у свій індекс:
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, length)
wg = new(sync.WaitGroup)
)
wg.Add(length)
for i, id := range ids {
go func(i, id int) {
result[i] = repository{
id: id,
starCount: fetchRepositoryStarByID(id),
}
wg.Done()
}(i, id)
}
wg.Wait()
return result
}
Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race.
fetch 100 from 100 repositories by 113026518
Якщо переглянути документацію — кожний елемент масиву як окрема змінна і в цьому прикладі з fetchRepositoryStarsByIDs кожна горутина працює зі своїм індексом (змінною), а отже, немає data race:
Structured variables of array, slice, and struct types have elements and fields that may be addressed individually. Each such element acts like a variable.
Це саме питання про запис у різні індекси є на stackoverflow.
Другий: обернути append в sync.Mutex:
package main
import (
"fmt"
"sync"
"time"
)
const repositoryCount = 100
// ... same
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, 0, length)
wg = new(sync.WaitGroup)
mu = new(sync.Mutex)
)
wg.Add(length)
for _, id := range ids {
go func(id int) {
var starCount = fetchRepositoryStarByID(id)
mu.Lock()
result = append(result, repository{
id: id,
starCount: starCount,
})
mu.Unlock()
wg.Done()
}(id)
}
wg.Wait()
return result
}
Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race (так само).
Жодної переваги перед першим варіантом.
Діє таке саме правило, що й у циклах:
- Якщо дію можна винести за цикл, так ліпше й зробити.
- Якщо дію можна винести за mutex, так ліпше й зробити.
Наприклад, написавши такий код:
go func(id int) {
mu.Lock()
var starCount = fetchRepositoryStarByID(id)
result = append(result, repository{
id: id,
starCount: starCount,
})
mu.Unlock()
wg.Done()
}(id)
Програма буде заблокована під час виконання важкої операції fetchRepositoryStarByID і стане послідовною з часом виконання 10 секунд.
Третій: писати результати в канал, це стандартне рішення, бо канали створені для можливості писання й читання з багатьох горутин, без помилки data race:
func fetchRepositoryStarsByIDs(ids []int) []repository {
var length = len(ids)
// can also use channel with length, in this case result will be same
// var resultChan = make(chan repository, length)
var resultChan = make(chan repository)
var wg = new(sync.WaitGroup)
wg.Add(length)
for _, id := range ids {
go func(id int) {
resultChan <- repository{
id: id,
starCount: fetchRepositoryStarByID(id),
}
wg.Done()
}(id)
}
go func() {
wg.Wait()
// close chan and break read loop
close(resultChan)
}()
var repositories = make([]repository, 0, length)
// read loop, while resultChan is open
for result := range resultChan {
repositories = append(repositories, result)
}
return repositories
}
Перевага цього варіанта в тому, що замість збереження в slice ми можемо відразу починати опрацьовувати результати й навіть розпаралелити читання, якщо потрібно.
Є ще один варіант — комбінація першого й другого:
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, length)
wg = new(sync.WaitGroup)
index = int32(-1)
)
wg.Add(length)
for _, id := range ids {
go func(id int) {
newIndex := atomic.AddInt32(&index, 1)
result[newIndex] = repository{
id: id,
starCount: fetchRepositoryStarByID(id),
}
wg.Done()
}(id)
}
wg.Wait()
return result
}
Throttling (rate limiting)
Проект із зірочками зростає і вже має 1000 репозиторіїв.
GitHub (або інший сервіс) починає повертати HTTP status 429 (too many requests) або HTTP Timeout замість зірок, коли є багато одночасних запитів.
У нашому прикладі ми додамо додатковий time.Sleep(time.Second) у fetchRepositoryStarByID, щоб емулювати затримку HTTP Timeout:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const repositoryCount = 1000
// ... same
func fetchRepositoryStarByID(id int) int {
// emulate timeout
if runtime.NumGoroutine() > 250 {
time.Sleep(time.Second)
}
// emulate slow http request by sleep
time.Sleep(100 * time.Millisecond)
// emulate response
var stars = id % 32
return stars
}
Запустимо:
go run main.go
fetch 1000 from 1000 repositories by 1232180912
Як бачимо, тепер код виконується за секунду.
Тепер обмежмо число одночасних запитів за раз до 200.
Розгляньмо теж 3 варіанти (якщо комбінувати з попередніми, то, звісно, буде більше).
Найпростіший варіант — через канали (можна також пошукати за словом semaphore):
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const (
repositoryCount = 1000
requestLimit = 200
)
// ... same
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, length)
wg = new(sync.WaitGroup)
// WE ADD THIS
throttler = make(chan struct{}, requestLimit)
)
wg.Add(length)
for i, id := range ids {
// WE ADD THIS
throttler <- struct{}{}
go func(i, id int) {
result[i] = repository{
id: id,
starCount: fetchRepositoryStarByID(id),
}
// WE ADD THIS
<-throttler
wg.Done()
}(i, id)
}
wg.Wait()
close(throttler)
return result
}
go run -race main.go
fetch 1000 from 1000 repositories by 535724238
За півсекунди.
Ми пишемо в канал 200 разів і запускаємо 200 горутин, запустити наступну зможемо тоді, коли хтось прочитає з каналу.
Варіант через запуск воркерів (workers).
Запускаємо N воркерів (де N — наша константа request Limit), що з одного каналу читають завдання, а в інший пишуть результат:
package main
// ... same
func workerFetchRepositoryStarByID(wg *sync.WaitGroup, requestIdChannel <-chan int, responseRepositoryChannel chan<- repository) {
// read while requestIdChannel open
for id := range requestIdChannel {
var starCount = fetchRepositoryStarByID(id)
responseRepositoryChannel <- repository{
id: id,
starCount: starCount,
}
}
wg.Done()
}
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, 0, length)
workerCount = requestLimit
)
if workerCount > length {
workerCount = length
}
var (
requestIdChannel = make(chan int, workerCount)
responseRepositoryChannel = make(chan repository, workerCount)
workerComplete = new(sync.WaitGroup)
readComplete = new(sync.WaitGroup)
)
workerComplete.Add(workerCount)
for i := 0; i < workerCount; i++ {
go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel)
}
readComplete.Add(1)
go func() {
for responseRepository := range responseRepositoryChannel {
result = append(result, responseRepository)
}
readComplete.Done()
}()
for _, id := range ids {
requestIdChannel <- id
}
close(requestIdChannel)
workerComplete.Wait()
close(responseRepositoryChannel)
readComplete.Wait()
return result
}
go run -race main.go
fetch 1000 from 1000 repositories by 540738968
Так багато коду для синхронізації, бо ми визначили розміри каналів, що менше загальної чисельності репозиторіїв.
Цей код працюватиме навіть з каналами без буфера:
var ( requestIdChannel = make(chan int, 0) responseRepositoryChannel = make(chan repository, 0) )
Якщо ж використовувати ресурси пам’яті сповна, то код матиме простіший вигляд:
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, 0, length)
workerCount = requestLimit
)
if workerCount > length {
workerCount = length
}
var (
requestIdChannel = make(chan int, length)
responseRepositoryChannel = make(chan repository, length)
workerComplete = new(sync.WaitGroup)
)
workerComplete.Add(workerCount)
for i := 0; i < workerCount; i++ {
go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel)
}
for _, id := range ids {
requestIdChannel <- id
}
close(requestIdChannel)
workerComplete.Wait()
close(responseRepositoryChannel)
for responseRepository := range responseRepositoryChannel {
result = append(result, responseRepository)
}
return result
}
і якщо потім захочемо змінити:
var ( requestIdChannel = make(chan int, 0) responseRepositoryChannel = make(chan repository, 0) )
то заблокуємо виконання назавжди, коли воркери почнуть писати в responseRepositoryChannel, а читання вже після завершення.
Останній варіант — розділити початковий slice на пачки й для кожної розпаралелити виконання.
Ми використаємо зовнішній пакет, що поділить на діапазони:
package main
import (
"fmt"
"github.com/gopereza/packer"
"runtime"
"sync"
"time"
)
const (
repositoryCount = 1000
requestLimit = 200
)
// ... same
func fetchRepositoryStarsByIDs(ids []int) []repository {
var (
length = len(ids)
result = make([]repository, length)
wg = new(sync.WaitGroup)
)
var packs = packer.Pack(length, requestLimit)
for _, pack := range packs {
for i := pack.From; i < pack.To; i++ {
wg.Add(1)
go func(i int) {
var id = ids[i]
result[i] = repository{
id: id,
starCount: fetchRepositoryStarByID(id),
}
wg.Done()
}(i)
}
wg.Wait()
}
return result
}
Схожий приклад бачив у реальному проекті.
Недолік такого рішення — одна тривала операція затримає виконання всієї пачки, тому переписував на варіант з воркерами.
Епілог
Ми в проекті вибрали варіант з воркерами, що відправляють пачками.
Репозиторій, у якому тестував варіанти.
Далі буде.