1 of 50

2 of 50

Padrões de Concorrência

Cássio Botaro

Engenheiro de Software

10/05/2024

3 of 50

Sobre mim

Olá, Mundo

Geradores

Trabalhador

Grupo de Trabalhadores

Pipelines

01

02

03

04

05

CONTEÚDO

4 of 50

Fan-In

06

Fan-Out

Janela Deslizante

Processamento em Lote

Sistema de Ticket

07

08

09

10

CONTEÚDO

5 of 50

Engenheiro de Yaml. Às vezes me colocam no YouTube e na maior parte do tempo escrevo coisas no Github.

— GopherCon Brasil 2024.

SOBRE MIM

6 of 50

github.com/cassiobotaro/concorrencia-go

7 of 50

Olá, Mundo

<SEÇÃO 01>

8 of 50

func main() {

canal := make(chan string)

go func() {

canal <- "Olá, mundo!"

}()

fmt.Println(<-canal)

}

O canal é a comunicação entre a gorrotina principal e a que foi lançada.

Olá, Mundo

9 of 50

“Olá, Mundo!”

“Olá, Mundo!”

“Olá, Mundo!”

Principal

Gorrotina

10 of 50

Geradores

<SEÇÃO 02>

11 of 50

Geradores

Geradores são funções que iniciam uma gorrotina para escrever uma lista de valores em um canal que é retornado para quem acionou a função.

func sequenciaNumeros(inicial, final int) <-chan int {

saida := make(chan int)

go func() {

for i := inicial; i <= final; i++ {

saida <- i

}

// após gerar todos os valores, fecha o canal

close(saida)

}()

return saida

}

12 of 50

Geradores

O programa itera sobre os valores gerados até o gerador ser completamente consumido.

func main() {

valores := sequenciaNumeros(1, 1000)

for valor := range valores {

fmt.Printf("valor: %v\n", valor)

}

}

13 of 50

Principal

Gerador

1

2

valor: 1

3

Valor: 2

14 of 50

Trabalhador

<SEÇÃO 03>

15 of 50

Trabalhador

Um trabalhador é uma gorrotina que recebe valores de um canal e os processa.

func trabalhador(entrada <-chan int) {

for valor := range entrada {

fmt.Printf("valor: %v\n", valor)

}

}

16 of 50

Trabalhador

No exemplo valores inteiros são enviados pela função principal (main) através do canal de entrada e processados por um trabalhador.

func main() {

entrada := make(chan int)

go trabalhador(entrada)

for i := 0; i < 10; i++ {

entrada <- i

}

close(entrada) // avisando ao trabalhador que terminou

}

17 of 50

Principal

Trabalhador

1

2

valor: 1

3

Valor: 2

18 of 50

Grupo de Trabalhadores

<SEÇÃO 04>

19 of 50

Grupo de Trabalhadores

Uma coleção de gorrotinas que ficam esperando tarefas serem atribuídas a elas.

for i := 0; i < nTrabalhadores; i++ {

go trabalhador(i+1, entrada, saida, &wg)

}

wg.Add(nTrabalhadores)

go func() {

wg.Wait()

close(saida)

}()

return saida

20 of 50

Grupo de Trabalhadores

func main() {

entrada := sequenciaNumeros(1, 10)

saida := grupoDeTrabalhadores(entrada, 2)

for s := range saida {

fmt.Println(s)

}

}

21 of 50

Grupo de Trabalhadores

Gerador

Principal

1

2

3

1⚙️

2

2⚙️

2

3⚙️

4

2

4

6

4

6

6

22 of 50

Pipeline

<SEÇÃO 05>

23 of 50

Pipeline

Um pipeline trabalha recebendo valores de um canal e escrevendo em outro canal, normalmente após realizar alguma transformação no valor.

saida := make(chan int)

go func() {

for valor := range entrada {

saida <- valor * 2

}

close(saida)

}()

return saida

24 of 50

Pipeline

Vários pipelines poderiam ser encadeados para realizar múltiplas transformações.

func main() {

for valor := range dobro(dobro(sequenciaNumeros(1, 10))) {

fmt.Printf("valor: %v\n", valor)

}

}

25 of 50

Gerador

Principal

Pipeline

Pipeline

1

1

2

2

4

2

2

4

4

3

3

Valor: 4

26 of 50

Fan-In

<SEÇÃO 06>

27 of 50

Fan-In

Um fan-in copia dados de múltiplos canais de entrada e escreve em um único canal de saída.

enviarSaida := func(c <-chan int) {

for n := range c {

saida <- n

}

wg.Done()

}

wg.Add(len(entradas))

for _, c := range entradas {

go enviarSaida(c)

}

28 of 50

Fan-In

canal := fanin(

sequenciaNumeros(1, 10),

sequenciaNumeros(11, 20),

sequenciaNumeros(21, 30),

)

for valor := range canal {

fmt.Printf("valor: %v\n", valor)

}

29 of 50

Gerador

Gerador

Gerador

Fan-In

Principal

1

11

21

21

22

Valor: 21

2

1

30 of 50

Fan-Out

<SEÇÃO 07>

31 of 50

Fan-Out

Um fan-out copia dados de um canal de entrada para múltiplos canais de saída.

var wg sync.WaitGroup

for valor := range entrada {

wg.Add(len(saidas))

for _, saida := range saidas {

go publicar(saida, valor, &wg)

}

wg.Wait()

}

32 of 50

Fan-Out

saida1 := make(chan int)

saida2 := make(chan int)

var wg sync.WaitGroup

wg.Add(2)

go trabalhador(saida1, 1, &wg)

go trabalhador(saida2, 2, &wg)

fanout(sequenciaNumeros(1, 10), saida1, saida2)

wg.Wait()

33 of 50

Gerador

Fan-Out

1

Trabalhador

Trabalhador

2

1

3

2

Valor: 1

Valor: 1

34 of 50

Janela Deslizante

<SEÇÃO 08>

35 of 50

Janela Deslizante

select {

case saida <- buffer.Front().Value: // consumidor lê o dado

buffer.Remove(buffer.Front()) // remove primeiro item

// …

Uma janela deslizante (sliding window) é utilizada para prevenir que um leitor lento trave um escritor rápido.

36 of 50

Janela Deslizante

case val := <-entrada: // recebeu nova entrada

if val == nil {

entrada = nil // invalida entrada

continue // continua já que podemos ter dados no buffer

}

if buffer.Len() == tamanho {

buffer.Remove(buffer.Front())

}

buffer.PushBack(val) // adiciona novo dado no buffer

37 of 50

Janela Deslizante

func main() {

valores := sequenciaNumeros(1, 10)

saida := make(chan interface{})

go leitorLento(saida)

janelaDeslizante(saida, valores, 3)

}

38 of 50

Gerador

Janela Deslizante

Trabalhador

1

1

2

2

3

3

4

2

3

4

5

Valor: 2

3

4

6

5

39 of 50

Processamento em Lote

<SEÇÃO 09>

40 of 50

Processamento em Lote

Um processamento em lote (batch processing) é usado quando uma gorrotina gera itens um-por-um mas o consumidor deseja processar os itens em blocos.

select {

case r := <-entrada:

if r == valorZero {

fechado = true // fecha no valor zero

continue

}

buf = append(buf, r) // Adiciona o item no buffer

deveDescarregar = len(buf) == tamanhoLote

// ...

41 of 50

Processamento em Lote

// ...

if deveDescarregar {

saida <- buf

buf = make([]req, 0, tamanhoLote)

}

42 of 50

Processamento em Lote

entrada := make(chan req)

saida := processamentoLotes(entrada, descarga, 3)

pronto := processadorLotes(saida)

entrada <- req{valor: 1}

entrada <- req{valor: 2}

entrada <- req{valor: 3}

entrada <- req{valor: 6}

entrada <- req{valor: 7}

43 of 50

Gerador

Processamento em lote

Trabalhador

1

1

2

2

3

3

4

processando lote com valores: [{1} {2} {3}]

44 of 50

Sistema de Ticket

<SEÇÃO 10>

45 of 50

Sistema de ticket

Um sistema de ticket é usado para controlar quando um determinado trabalho pode ser executado, normalmente é utilizado para limitar o uso de um recurso sobre um período de tempo.

func trabalhador(tickets <-chan ticket, work <-chan Trabalho) {

for w := range work {

<-tickets // espera por um ticket

w() // executa um trabalho

}

}

46 of 50

Sistema de ticket

func bilheteria(tickets chan<- ticket, timeout time.Duration, nTickets int) {

for {

for i := 0; i < nTickets; i++ {

tickets <- ticket(i)

}

<-time.After(timeout)

}

}

47 of 50

Sistema de ticket

Enviamos através de um canal 30 processamentos a serem feitos, mas o sistema de ticket garante que apenas 10 processamentos sejam executados por segundo.

tickets := make(chan ticket)

trabalhos := make(chan Trabalho)

go bilheteria(tickets, 1*time.Second, 10)

go trabalhador(tickets, trabalhos)

for i := 0; i <= 30; i++ {

trabalhos <- func() { fmt.Println("processando ticket")

}

fmt.Println("trabalho ", i, " enviado")

}

48 of 50

Bilheteria

Principal

🎫

Trabalhador

Trabalhador

Trabalhador

📦

📦

📦

🎫

🎫

📦

📦

🎫

🎫

🎫

📦

📦

📦

49 of 50

ROB PIKE

Don't communicate by sharing memory, share memory by communicating.

{

50 of 50

CASSIOBOTARO

CASSIOBOTARO

CASSIOBOTARO

CONTATo

CONECTE-SE COMIGO!