Введение в concurrency паттерны
Go предлагает множество паттернов для организации конкурентного кода. Понимание этих паттернов помогает писать эффективный и поддерживаемый код.
Fan-out, Fan-in
▸Разделение работы
1func fanOut(input <-chan int, workers int) []<-chan int {2 channels := make([]<-chan int, workers)3 for i := 0; i < workers; i++ {4 channels[i] = process(input)5 }6 return channels7}89func process(in <-chan int) <-chan int {10 out := make(chan int)11 go func() {12 defer close(out)13 for n := range in {14 out <- n * n15 }16 }()17 return out18}
▸Слияние результатов
1func fanIn(channels ...<-chan int) <-chan int {2 var wg sync.WaitGroup3 merged := make(chan int)45 for _, ch := range channels {6 wg.Add(1)7 go func(c <-chan int) {8 defer wg.Done()9 for value := range c {10 merged <- value11 }12 }(ch)13 }1415 go func() {16 wg.Wait()17 close(merged)18 }()1920 return merged21}
▸Использование
1func main() {2 input := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)34 // Fan-out на 3 воркера5 workers := fanOut(input, 3)67 // Fan-in8 for result := range fanIn(workers...) {9 fmt.Println(result)10 }11}
Pipeline
▸Последовательные этапы
1func generate(nums ...int) <-chan int {2 out := make(chan int)3 go func() {4 for _, n := range nums {5 out <- n6 }7 close(out)8 }()9 return out10}1112func square(in <-chan int) <-chan int {13 out := make(chan int)14 go func() {15 for n := range in {16 out <- n * n17 }18 close(out)19 }()20 return out21}2223func filter(in <-chan int, predicate func(int) bool) <-chan int {24 out := make(chan int)25 go func() {26 for n := range in {27 if predicate(n) {28 out <- n29 }30 }31 close(out)32 }()33 return out34}3536func main() {37 // Pipeline: generate -> filter -> square38 ch := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)39 ch = filter(ch, func(n int) bool { return n%2 == 0 })40 ch = square(ch)4142 for result := range ch {43 fmt.Println(result) // 4, 16, 36, 64, 10044 }45}
Worker Pool
▸Ограниченный параллелизм
1func workerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {2 var wg sync.WaitGroup34 for i := 0; i < numWorkers; i++ {5 wg.Add(1)6 go func(workerID int) {7 defer wg.Done()8 for job := range jobs {9 result := process(job)10 results <- result11 }12 }(i)13 }1415 go func() {16 wg.Wait()17 close(results)18 }()19}
▸Использование
1func main() {2 jobs := make(chan Job, 100)3 results := make(chan Result, 100)45 // Запуск воркеров6 workerPool(jobs, results, 5)78 // Отправка задач9 for i := 0; i < 20; i++ {10 jobs <- Job{ID: i}11 }12 close(jobs)1314 // Сбор результатов15 for result := range results {16 fmt.Println(result)17 }18}
Rate Limiting
▸Ограничение частоты
1func main() {2 requests := make(chan int, 5)3 for i := 1; i <= 5; i++ {4 requests <- i5 }6 close(requests)78 // Ограничитель: 2 запроса в секунду9 limiter := time.NewTicker(200 * time.Millisecond)10 defer limiter.Stop()1112 for req := range requests {13 <-limiter.C14 fmt.Println("request", req, time.Now())15 }16}
▸Более сложный rate limiter
1type RateLimiter struct {2 ticker *time.Ticker3 quit chan struct{}4}56func NewRateLimiter(rps int) *RateLimiter {7 return &RateLimiter{8 ticker: time.NewTicker(time.Second / time.Duration(rps)),9 quit: make(chan struct{}),10 }11}1213func (rl *RateLimiter) Wait() {14 <-rl.ticker.C15}1617func (rl *RateLimiter) Stop() {18 rl.ticker.Stop()19 close(rl.quit)20}
Context паттерны
▸Отмена операций
1func longOperation(ctx context.Context) error {2 select {3 case <-time.After(5 * time.Second):4 return nil5 case <-ctx.Done():6 return ctx.Err()7 }8}910func main() {11 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)12 defer cancel()1314 err := longOperation(ctx)15 if err != nil {16 fmt.Println("Operation cancelled:", err)17 }18}
▸Передача значений
1type contextKey string23const userIDKey contextKey = "userID"45func WithUserID(ctx context.Context, userID int) context.Context {6 return context.WithValue(ctx, userIDKey, userID)7}89func GetUserID(ctx context.Context) (int, bool) {10 userID, ok := ctx.Value(userIDKey).(int)11 return userID, ok12}
Мониторинг конкурентного кода
1import "runtime"23func monitorGoroutines() {4 ticker := time.NewTicker(time.Second)5 for range ticker.C {6 fmt.Printf("Goroutines: %d\n", runtime.NumGoroutine())7 }8}910func main() {11 go monitorGoroutines()12 // Основная логика13}
Best Practices
▸Избегайте утечек goroutines
1func processWithTimeout(data []int) []int {2 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)3 defer cancel()45 results := make([]int, len(data))67 var wg sync.WaitGroup8 for i, v := range data {9 wg.Add(1)10 go func(i, v int) {11 defer wg.Done()12 select {13 case <-ctx.Done():14 return15 default:16 results[i] = expensiveCalculation(v)17 }18 }(i, v)19 }2021 wg.Wait()22 return results23}
▸Используйте WaitGroup
1func processAll(items []Item) {2 var wg sync.WaitGroup3 for _, item := range items {4 wg.Add(1)5 go func(item Item) {6 defer wg.Done()7 process(item)8 }(item)9 }10 wg.Wait() // Гарантирует завершение всех горутин11}
Заключение
Concurrency паттерны в Go — это мощные инструменты для создания масштабируемых приложений. Fan-out/fan-in, pipeline и worker pool — основные паттерны, которые стоит знать каждому Go-разработчику. На собеседовании спрашивают про выбор паттерна для конкретной задачи и типичные ошибки.