Bagaimana cara menunggu semua goroutine selesai tanpa menggunakan waktu.

108

Kode ini memilih semua file xml dalam folder yang sama, sebagai executable yang dipanggil dan secara asinkron menerapkan pemrosesan ke setiap hasil dalam metode callback (dalam contoh di bawah, hanya nama file yang dicetak).

Bagaimana cara menghindari penggunaan metode tidur agar metode utama tidak keluar? Saya memiliki masalah dalam membungkus kepala saya di sekitar saluran (saya berasumsi itulah yang diperlukan, untuk menyinkronkan hasil) sehingga bantuan apa pun sangat dihargai!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}
Dante
sumber

Jawaban:

173

Anda dapat menggunakan sync.WaitGroup . Mengutip contoh terkait:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}
zzzz
sumber
11
Ada alasan Anda harus melakukan wg.Add (1) di luar rutinitas go? Bisakah kita melakukannya di dalam sebelum penundaan wg.Done ()?
duduk
18
sat, ya, ada alasannya, itu dijelaskan di sync.WaitGroup.Add dokumen: Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.
wobmene
15
Mengadaptasi kode ini menyebabkan saya sesi debugging yang lama karena goroutine saya adalah fungsi bernama dan meneruskan WaitGroup sebagai nilai akan menyalinnya dan membuat wg.Done () tidak efektif. Meskipun ini bisa diperbaiki dengan meneruskan pointer & wg, cara yang lebih baik untuk mencegah kesalahan tersebut adalah dengan mendeklarasikan variabel WaitGroup sebagai pointer di tempat pertama: wg := new(sync.WaitGroup)daripada var wg sync.WaitGroup.
Robert Jack Will
Saya kira itu valid untuk menulis wg.Add(len(urls))tepat di atas baris for _, url := range urls, saya percaya lebih baik karena Anda menggunakan Add hanya sekali.
Victor
@RobertJackWill: Catatan bagus! BTW, ini tercakup dalam dokumen : "WaitGroup tidak boleh disalin setelah penggunaan pertama. Sayang sekali Go tidak memiliki cara untuk memaksakan ini . Sebenarnya, bagaimanapun, go vetmendeteksi kasus ini dan memperingatkan dengan" func pass lock by value : sync.WaitGroup berisi sync.noCopy ".
Brent Bradburn
56

WaitGroups jelas merupakan cara kanonik untuk melakukan ini. Hanya demi kelengkapan, berikut solusi yang biasa digunakan sebelum WaitGroups diperkenalkan. Ide dasarnya adalah menggunakan saluran untuk mengatakan "Saya sudah selesai", dan minta goroutine utama menunggu hingga setiap rutinitas pemijahan melaporkan penyelesaiannya.

func main() {
    c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // signal that the routine has completed
        }()
    }

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ {
        <- c
    }
}
joshlf
sumber
9
Senang melihat solusi dengan saluran biasa. Bonus tambahan: jika doSomething()mengembalikan beberapa hasil, daripada Anda dapat meletakkannya di saluran, dan Anda dapat mengumpulkan dan memproses hasil di putaran kedua (segera setelah siap)
andras
4
Ini hanya berfungsi jika Anda sudah mengetahui jumlah gorutine yang ingin Anda mulai. Bagaimana jika Anda menulis semacam crawler html dan memulai gorutines secara rekursif untuk setiap link di halaman?
shinydev
Bagaimanapun juga, Anda harus melacaknya. Dengan WaitGroups, ini sedikit lebih mudah karena setiap kali Anda menelurkan goroutine baru, Anda dapat melakukannya terlebih dahulu wg.Add(1)dan karenanya akan terus melacaknya. Dengan saluran, ini akan lebih sulit.
joshlf
c akan memblokir karena semua rutinitas go akan mencoba mengaksesnya, dan itu tidak disangga
Edwin Ikechukwu Okonkwo
Jika dengan "blok", yang Anda maksud program akan buntu, itu tidak benar. Anda dapat mencoba menjalankannya sendiri. Alasannya adalah bahwa satu-satunya goroutine yang melakukan penulisan cberbeda dari goroutine utama yang membaca c. Jadi, goroutine utama selalu tersedia untuk membaca nilai dari saluran, yang akan terjadi jika salah satu goroutine tersedia untuk menulis nilai ke saluran. Anda benar bahwa jika kode ini tidak memunculkan goroutine tetapi menjalankan semuanya dalam satu goroutine, itu akan menemui jalan buntu.
joshlf
8

sync.WaitGroup dapat membantu Anda di sini.

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) {
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")
}


func main() {
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ {
        wg.Add(1)   
        go wait(i, &wg)
    }
    wg.Wait()
}
dimmg
sumber
1

Meskipun sync.waitGroup(wg) adalah cara kanonik maju, itu mengharuskan Anda melakukan setidaknya beberapa wg.Addpanggilan Anda sebelum Anda wg.Waituntuk menyelesaikan semua. Ini mungkin tidak dapat dilakukan untuk hal-hal sederhana seperti web crawler, di mana Anda tidak mengetahui jumlah panggilan rekursif sebelumnya dan perlu beberapa saat untuk mengambil data yang mendorong wg.Addpanggilan tersebut. Lagi pula, Anda perlu memuat dan mengurai halaman pertama sebelum Anda mengetahui ukuran batch pertama halaman anak.

Saya menulis solusi menggunakan saluran, menghindari waitGroupdalam solusi saya Tur Wisata Go - latihan perayap web . Setiap kali satu atau lebih go-rutin dimulai, Anda mengirimkan nomor tersebut ke childrensaluran. Setiap kali rutinitas pergi akan selesai, Anda mengirim 1ke donesaluran. Ketika jumlah anak sama dengan jumlah selesai, kita selesai.

Satu-satunya kekhawatiran saya yang tersisa adalah ukuran resultssaluran yang di -hardcode , tetapi itu adalah batasan Go (saat ini).


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
    results  chan string
    children chan int
    done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
    rc.children <- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
    rc.done <- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
    fmt.Println("Controller waiting...")
    var children, done int
    for {
        select {
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done {
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            }
        }
    }
}

Kode sumber lengkap untuk solusinya

dirkjot.dll
sumber
1

Berikut adalah solusi yang menggunakan WaitGroup.

Pertama, tentukan 2 metode utilitas:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
    allNodesWaitGroup.Add(1)
    go func() {
        defer allNodesWaitGroup.Done()
        f()
    }()
}

func WaitForAllNodes() {
    allNodesWaitGroup.Wait()
}

Kemudian, ganti pemanggilan callback:

go callback(fileName)

Dengan panggilan ke fungsi utilitas Anda:

util.GoNode(func() { callback(fileName) })

Langkah terakhir, tambahkan baris ini di akhir Anda main, bukan di sleep. Ini akan memastikan thread utama menunggu semua rutinitas selesai sebelum program dapat berhenti.

func main() {
  // ...
  util.WaitForAllNodes()
}
gamliela
sumber