Goroutine in Io Wait State for Long Time

Goroutine in IO wait state for long time

These could easily be clients the initiated a request but never completed it, or slow clients etc.

You should configure the Read/Write timeouts of your server (server.ReadTimeout and server.WriteTimeout respectively):

s := new(http.Server)
// ...
s.ReadTimeout = 5 * time.Second
s.WriteTimeout = 5 * time.Second
// ...

goroutines have high idle wake up calls

You're wasting CPU here (superfluous loop):

  for {
// ...
default:
// High CPU usage here.
}
}

Try something like this:

 func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
defer socket.Close()

socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
pubJsonDecoder(message, testing)
//tradesParser(message);
}

<-ctx.Done()
log.Println("closing public socket")
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
defer socket.Close()

socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
}

<-ctx.Done()
log.Println("closing private socket")
}

Also this may help:

https://github.com/gorilla/websocket/blob/master/examples/chat/client.go

Why is my function not waiting for the goroutines to complete?

The issue is here:

go GetPainting(re.IMG, &ed, &wg) // goroutine alters ed
...
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)

"A go statement starts the execution of a function call as an independent concurrent thread of control..." (source); you should not make assumptions as to when the goroutine will perform any action. So what might (I have not looked exactly how goroutines are currently managed) happen is something like:

  1. go GetPainting(re.IMG, &ed, &wg) - runtime schedules GetPainting to run.
  2. results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData) runs (ed.EndodedData is still nil).
  3. GetPainting runs.

You have created a data race; that is you have one goroutine writing to ed.EncodedData and another reading from it without synchronisation. Generally it's difficult to predict what will happen when there is a race; but in this case your goroutine is performing IO (http.Get) so it's very probable that the write will occur after the read.

To help explain this (and potential solutions) lets simplify your example (playground):

func routine(wg *sync.WaitGroup, val *int) {
defer wg.Done()
time.Sleep(time.Microsecond)
*val = rand.Int()
}

func main() {
const iterations = 5
var wg sync.WaitGroup
wg.Add(iterations)
r := make([]int, iterations)
results := make([]string, iterations)
for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
wg.Wait()
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}
}

As you will see after the WaitGroup is done r (similar to your ed) is populated but results contains all 0 values. This points towards a simple solution (playground):

for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
}
wg.Wait()
results := make([]string, iterations)
for i := 0; i < 5; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}

This works because you are not accessing anything that the goroutines write to before you know that they are finished (via the WaitGroup). It's fairly simple to transfer this method into your code (create a slice of utils.EncodedImage and check for errors/results after the wg.Wait()).

While the above works it will never complete before all goroutines complete. Often that is not desirable, for instance, if receiving one error is fatal then you probably want to return a response to the user (and stop any ongoing work) as soon as the error is received.

There are a range of ways of dealing with this. Passing functions a Context is a very common means of enabling you to signal when they should stop (for your use-case see NewRequestWithContext). When it comes to handling the responses you can code this yourself (but it is easy to leak goroutines) or use something like golang.org/x/sync/errgroup. Here is an example (playground):

func routine(ctx context.Context, val *int) error {
select {
case <-time.After(time.Microsecond * time.Duration(rand.Intn(20))): // select will exit after a number of Milliseconds
case <-ctx.Done(): // unless this is met (operation cancelled)
fmt.Println("GoRoutine ending due to context")
return ctx.Err()
}
*val = rand.Int()
fmt.Println("generated ", *val)
if simulateErrors && *val > (math.MaxInt/2) {
return errors.New("Number too big")
}
return nil
}

func main() {
const iterations = 5
// In your case source context should probably come from gin.Context so the operation is cancelled if the connection drops
g, ctx := errgroup.WithContext(context.Background())
r := make([]int, iterations)
for i := 0; i < iterations; i++ {
x := &r[i]
g.Go(func() error {
return routine(ctx, x)
})
}
if err := g.Wait(); err != nil {
fmt.Println("Got an error!", err)
return // Here you send error as response (you might want to send something generic to avoid leaking system detail)
}
// Everything has processed OK
results := make([]string, iterations)
for i := 0; i < iterations; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
fmt.Println(r[i], results[i])
}
}

Note: Be careful using panic in production code. In your example you are doing this when an HTTP Get fails; this is something that is likely to happen at some point and you don't really want your application to shutdown if it does (return a sensible error to the end user and perhaps log the failure). It is possible to catch panics but its generally best to deal with errors as they are detected.

If the Wait() method of the sync.WaitGroup type blocks, and is thus not asynchronous, why use it?

Your understanding of "blocking" is incorrect. Blocking operations such as WaitGroup.Wait() or a channel receive (when there is no value to receive) only block the execution of the goroutine, they do not (necessarily) block the OS thread which is used to execute the (statements of the) goroutine.

Whenever a blocking operation (such as the above mentioned) is encountered, the goroutine scheduler may (and it will) switch to another goroutine that may continue to run. There are no (significant) CPU cycles lost during a WaitGroup.Wait() call, if there are other goroutines that may continue to run, they will.

Please check related question: Number of threads used by Go runtime

Running a Go routine indefinitely

When the primary goroutine at main exits, all goroutines you might have spawned will be orphaned and eventually die.

You could keep the main goroutine running forever with an infinite loop with for {}, but you might want to keep an exit channel instead:

exit := make(chan string)

// Spawn all you worker goroutines, and send a message to exit when you're done.

for {
select {
case <-exit:
os.Exit(0)
}
}

Update: Cerise Limón pointed out that goroutines are just killed immediately when main exits. I think this is supposed to be the officially specified behaviour.

Another update: The for-select works better when you have multiple ways to exit / multiple exit channels. For a single exit channel you could just do

<-exit

at the end instead of a for and select loop.

Lock when using io.Copy in a goroutine

MarcoLucidi was right, I was opening too many files at a time. I limited the number of concurrent goroutines and now it works fine.



Related Topics



Leave a reply



Submit