Padrões de Concorrência
Cássio Botaro
Engenheiro de Software
10/05/2024
Sobre mim
Olá, Mundo
Geradores
Trabalhador
Grupo de Trabalhadores
Pipelines
01
02
03
04
05
CONTEÚDO
Fan-In
06
Fan-Out
Janela Deslizante
Processamento em Lote
Sistema de Ticket
07
08
09
10
CONTEÚDO
Engenheiro de Yaml. Às vezes me colocam no YouTube e na maior parte do tempo escrevo coisas no Github.
— GopherCon Brasil 2024.
SOBRE MIM
github.com/cassiobotaro/concorrencia-go
Olá, Mundo
<SEÇÃO 01>
func main() {
canal := make(chan string)
go func() {
canal <- "Olá, mundo!"
}()
fmt.Println(<-canal)
}
O canal é a comunicação entre a gorrotina principal e a que foi lançada.
Olá, Mundo
“Olá, Mundo!”
“Olá, Mundo!”
“Olá, Mundo!”
Principal
Gorrotina
Geradores
<SEÇÃO 02>
Geradores
Geradores são funções que iniciam uma gorrotina para escrever uma lista de valores em um canal que é retornado para quem acionou a função.
func sequenciaNumeros(inicial, final int) <-chan int {
saida := make(chan int)
go func() {
for i := inicial; i <= final; i++ {
saida <- i
}
// após gerar todos os valores, fecha o canal
close(saida)
}()
return saida
}
Geradores
O programa itera sobre os valores gerados até o gerador ser completamente consumido.
func main() {
valores := sequenciaNumeros(1, 1000)
for valor := range valores {
fmt.Printf("valor: %v\n", valor)
}
}
Principal
Gerador
1
2
valor: 1
3
Valor: 2
Trabalhador
<SEÇÃO 03>
Trabalhador
Um trabalhador é uma gorrotina que recebe valores de um canal e os processa.
func trabalhador(entrada <-chan int) {
for valor := range entrada {
fmt.Printf("valor: %v\n", valor)
}
}
Trabalhador
No exemplo valores inteiros são enviados pela função principal (main) através do canal de entrada e processados por um trabalhador.
func main() {
entrada := make(chan int)
go trabalhador(entrada)
for i := 0; i < 10; i++ {
entrada <- i
}
close(entrada) // avisando ao trabalhador que terminou
}
Principal
Trabalhador
1
2
valor: 1
3
Valor: 2
Grupo de Trabalhadores
<SEÇÃO 04>
Grupo de Trabalhadores
Uma coleção de gorrotinas que ficam esperando tarefas serem atribuídas a elas.
for i := 0; i < nTrabalhadores; i++ {
go trabalhador(i+1, entrada, saida, &wg)
}
wg.Add(nTrabalhadores)
go func() {
wg.Wait()
close(saida)
}()
return saida
Grupo de Trabalhadores
func main() {
entrada := sequenciaNumeros(1, 10)
saida := grupoDeTrabalhadores(entrada, 2)
for s := range saida {
fmt.Println(s)
}
}
Grupo de Trabalhadores
Gerador
Principal
1
2
3
1⚙️
2
2⚙️
2
3⚙️
4
2
4
6
4
6
6
Pipeline
<SEÇÃO 05>
Pipeline
Um pipeline trabalha recebendo valores de um canal e escrevendo em outro canal, normalmente após realizar alguma transformação no valor.
saida := make(chan int)
go func() {
for valor := range entrada {
saida <- valor * 2
}
close(saida)
}()
return saida
Pipeline
Vários pipelines poderiam ser encadeados para realizar múltiplas transformações.
func main() {
for valor := range dobro(dobro(sequenciaNumeros(1, 10))) {
fmt.Printf("valor: %v\n", valor)
}
}
Gerador
Principal
Pipeline
Pipeline
1
1
2
2
4
2
2
4
4
3
3
Valor: 4
Fan-In
<SEÇÃO 06>
Fan-In
Um fan-in copia dados de múltiplos canais de entrada e escreve em um único canal de saída.
enviarSaida := func(c <-chan int) {
for n := range c {
saida <- n
}
wg.Done()
}
wg.Add(len(entradas))
for _, c := range entradas {
go enviarSaida(c)
}
Fan-In
canal := fanin(
sequenciaNumeros(1, 10),
sequenciaNumeros(11, 20),
sequenciaNumeros(21, 30),
)
for valor := range canal {
fmt.Printf("valor: %v\n", valor)
}
Gerador
Gerador
Gerador
Fan-In
Principal
1
11
21
21
22
Valor: 21
2
1
Fan-Out
<SEÇÃO 07>
Fan-Out
Um fan-out copia dados de um canal de entrada para múltiplos canais de saída.
var wg sync.WaitGroup
for valor := range entrada {
wg.Add(len(saidas))
for _, saida := range saidas {
go publicar(saida, valor, &wg)
}
wg.Wait()
}
Fan-Out
saida1 := make(chan int)
saida2 := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
go trabalhador(saida1, 1, &wg)
go trabalhador(saida2, 2, &wg)
fanout(sequenciaNumeros(1, 10), saida1, saida2)
wg.Wait()
Gerador
Fan-Out
1
Trabalhador
Trabalhador
2
1
3
2
Valor: 1
Valor: 1
Janela Deslizante
<SEÇÃO 08>
Janela Deslizante
select {
case saida <- buffer.Front().Value: // consumidor lê o dado
buffer.Remove(buffer.Front()) // remove primeiro item
// …
Uma janela deslizante (sliding window) é utilizada para prevenir que um leitor lento trave um escritor rápido.
Janela Deslizante
case val := <-entrada: // recebeu nova entrada
if val == nil {
entrada = nil // invalida entrada
continue // continua já que podemos ter dados no buffer
}
if buffer.Len() == tamanho {
buffer.Remove(buffer.Front())
}
buffer.PushBack(val) // adiciona novo dado no buffer
Janela Deslizante
func main() {
valores := sequenciaNumeros(1, 10)
saida := make(chan interface{})
go leitorLento(saida)
janelaDeslizante(saida, valores, 3)
}
Gerador
Janela Deslizante
Trabalhador
| | |
1
1
2
2
3
3
4
2
3
4
5
Valor: 2
3
4
6
5
Processamento em Lote
<SEÇÃO 09>
Processamento em Lote
Um processamento em lote (batch processing) é usado quando uma gorrotina gera itens um-por-um mas o consumidor deseja processar os itens em blocos.
select {
case r := <-entrada:
if r == valorZero {
fechado = true // fecha no valor zero
continue
}
buf = append(buf, r) // Adiciona o item no buffer
deveDescarregar = len(buf) == tamanhoLote
// ...
Processamento em Lote
// ...
if deveDescarregar {
saida <- buf
buf = make([]req, 0, tamanhoLote)
}
Processamento em Lote
entrada := make(chan req)
saida := processamentoLotes(entrada, descarga, 3)
pronto := processadorLotes(saida)
entrada <- req{valor: 1}
entrada <- req{valor: 2}
entrada <- req{valor: 3}
entrada <- req{valor: 6}
entrada <- req{valor: 7}
Gerador
Processamento em lote
Trabalhador
| | |
1
1
2
2
3
3
4
processando lote com valores: [{1} {2} {3}]
Sistema de Ticket
<SEÇÃO 10>
Sistema de ticket
Um sistema de ticket é usado para controlar quando um determinado trabalho pode ser executado, normalmente é utilizado para limitar o uso de um recurso sobre um período de tempo.
func trabalhador(tickets <-chan ticket, work <-chan Trabalho) {
for w := range work {
<-tickets // espera por um ticket
w() // executa um trabalho
}
}
Sistema de ticket
func bilheteria(tickets chan<- ticket, timeout time.Duration, nTickets int) {
for {
for i := 0; i < nTickets; i++ {
tickets <- ticket(i)
}
<-time.After(timeout)
}
}
Sistema de ticket
Enviamos através de um canal 30 processamentos a serem feitos, mas o sistema de ticket garante que apenas 10 processamentos sejam executados por segundo.
tickets := make(chan ticket)
trabalhos := make(chan Trabalho)
go bilheteria(tickets, 1*time.Second, 10)
go trabalhador(tickets, trabalhos)
for i := 0; i <= 30; i++ {
trabalhos <- func() { fmt.Println("processando ticket")
}
fmt.Println("trabalho ", i, " enviado")
}
Bilheteria
Principal
🎫
Trabalhador
Trabalhador
Trabalhador
📦
📦
📦
🎫
🎫
📦
📦
🎫
🎫
🎫
📦
📦
📦
ROB PIKE
Don't communicate by sharing memory, share memory by communicating.
{
CASSIOBOTARO
CASSIOBOTARO
CASSIOBOTARO
CONTATo
CONECTE-SE COMIGO!