Skip to content
This repository has been archived by the owner on Jul 27, 2023. It is now read-only.

Connection Leaks :( #86

Closed
shanielh opened this issue Dec 10, 2015 · 6 comments
Closed

Connection Leaks :( #86

shanielh opened this issue Dec 10, 2015 · 6 comments
Labels

Comments

@shanielh
Copy link

Hi,

I have a program which calls the acquire lock, release lock and delete key operations a lot of times (~ 1 per second).

When I run the command :

watch -n 0.2 "netstat -t -n | grep '127.0.0.1:8500' | wc -l"

In the background, I can see the there are a lot of connections left open to the consul-agent until it reaches my ulimit.

@rickfast rickfast added the bug label Dec 11, 2015
@rickfast
Copy link
Owner

i'll take a look this weekend

@rickfast
Copy link
Owner

which JAX-RS implementation are you using?

@shanielh
Copy link
Author

I use the Apache one.

@rickfast
Copy link
Owner

Fixed by be0e212

@rickfast
Copy link
Owner

I had a few minutes this morning to take a look at this. We had a similar problem with the agent client. I made sure the response is explicitly close. Please try with 0.9.15 and re-open if you have issues.

@shanielh
Copy link
Author

I confirm that it works 👍

My test code

import java.util.concurrent.Executors

import com.orbitz.consul.Consul
import com.orbitz.consul.model.session.{ImmutableSession, Session}

import scala.concurrent.{Future, ExecutionContext}

object Program extends App {

  def lockWorker(consul: Consul, workerIndex: Int, sessionId: String) = {
    val kvp = consul.keyValueClient()
    val key = workerIndex.toString


    while (true) {
      val lockResult = kvp.acquireLock(key, sessionId)
      if (lockResult) {
        println(s"Locked $workerIndex")

        Thread sleep 100

        val unlockResult = kvp.releaseLock(key, sessionId)
        if (unlockResult) {
          println(s"Unlocked $workerIndex")
          kvp.deleteKey(key)
        } else {
          println(s"Unable to unlock $workerIndex")
        }
      } else {
        println(s"Unable to lock $workerIndex")
      }
    }

  }


  println("Hello World")
  val consul = Consul.builder().build()

  val sessionName = java.util.UUID.randomUUID().toString
  val sessionRequest = ImmutableSession.builder().name(sessionName).behavior("delete").ttl("60s").build()

  val session = consul.sessionClient().createSession(sessionRequest).getId

  private val numberOfWorkers = 10
  val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numberOfWorkers))

  for (i <- 0 to numberOfWorkers) {
    val workerIndex = i

    Future {
      lockWorker(consul, workerIndex, session)
    }(executionContext)
  }

  println(s"Running $numberOfWorkers in the background, Press enter to exit...")
  scala.io.StdIn.readLine()
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants