Skip to content

Commit

Permalink
Merge pull request #74 from mesos/broker_stickiness
Browse files Browse the repository at this point in the history
broker_stickiness
  • Loading branch information
joestein committed Jul 13, 2015
2 parents 9575a1f + a724e6e commit 6a893d9
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 24 deletions.
30 changes: 23 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ broker:
state: stopped
resources: cpus:1.00, mem:2048, heap:1024, port:auto
options: log.dirs=/mnt/array1/broker0
failover: delay:10s, max-delay:60s
failover: delay:1m, max-delay:10m
stickiness: period:10m, hostname:slave0, expires:2015-07-10 15:51:43+03
# ./kafka-mesos.sh start 0
Broker 0 started
Expand All @@ -226,19 +227,22 @@ brokers:
active: false
state: stopped
resources: cpus:1.00, mem:2048, heap:1024, port:auto
failover: delay:10s, max-delay:60s
failover: delay:1m, max-delay:10m
stickiness: period:10m, hostname:slave0, expires:2015-07-10 15:51:43+03
id: 1
active: false
state: stopped
resources: cpus:1.00, mem:2048, heap:1024, port:auto
failover: delay:10s, max-delay:60s
failover: delay:1m, max-delay:10m
stickiness: period:10m, hostname:slave1, expires:2015-07-10 15:51:43+03
id: 2
active: false
state: stopped
resources: cpus:1.00, mem:2048, heap:1024, port:auto
failover: delay:10s, max-delay:60s
failover: delay:1m, max-delay:10m
stickiness: period:10m, hostname:slave2, expires:2015-07-10 15:51:43+03
#./kafka-mesos.sh start 0
Broker 0 started
Expand All @@ -259,18 +263,28 @@ clusterStorage=zk:/kafka-mesos
Failed Broker Recovery
------------------------
When the broker fails, kafka mesos scheduler assumes that the failure is recoverable. Scheduler will try
to restart broker on any matched slave after waiting failover-delay (i.e. 30s, 2m). Initially waiting
delay is equal to failover-delay setting. After each serial failure it doubles until it reaches failover-max-delay value.
to restart broker after waiting failover-delay (i.e. 30s, 2m). Initially waiting delay is equal to failover-delay setting.
After each serial failure it doubles until it reaches failover-max-delay value.

If failover-max-tries is defined and serial failure count exceeds it, broker will be deactivated.

Following failover settings exists:
```
--failover-delay - initial failover delay to wait after failure, required
--failover-delay - initial failover delay to wait after failure, required
--failover-max-delay - max failover delay, required
--failover-max-tries - max failover tries to deactivate broker, optional
```

Broker Placement Stickiness
---------------------------
If broker is started during stickiness-period time from it's stop time, scheduler will place the broker on the same node
as it was during last successful start. This is related both to failover and manual restarts.

Following stickiness settings exists:
```
--stickiness-period - period of time during which broker would be restarted on the same node
```

Navigating the CLI
==================

Expand Down Expand Up @@ -300,6 +314,7 @@ Option Description
log.dirs=/tmp/kafka/$id,num.io.threads=16
file:server.properties
--port port or range (31092, 31090..31100). Default - auto
--stickiness-period stickiness period to preserve same node for broker (5m, 10m, 1h)
Generic Options
Option Description
Expand Down Expand Up @@ -350,6 +365,7 @@ Option Description
log.dirs=/tmp/kafka/$id,num.io.threads=16
file:server.properties
--port port or range (31092, 31090..31100). Default - auto
--stickiness-period stickiness period to preserve same node for broker (5m, 10m, 1h)
Generic Options
Option Description
Expand Down
62 changes: 59 additions & 3 deletions src/scala/ly/stealth/mesos/kafka/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.parsing.json.JSONObject
import scala.collection
import org.apache.mesos.Protos.{Resource, Offer}
import java.util.{TimeZone, Collections, Date, UUID}
import ly.stealth.mesos.kafka.Broker.Failover
import ly.stealth.mesos.kafka.Broker.{Stickiness, Failover}
import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range, Str}
import java.text.SimpleDateFormat

Expand All @@ -42,6 +42,7 @@ class Broker(_id: String = "0") {
var log4jOptions: util.Map[String, String] = new util.LinkedHashMap()
var jvmOptions: String = null

var stickiness: Stickiness = new Stickiness()
var failover: Failover = new Failover()

def options(defaults: util.Map[String, String] = null): util.Map[String, String] = {
Expand Down Expand Up @@ -111,9 +112,23 @@ class Broker(_id: String = "0") {
-1
}

def shouldStart(now: Date = new Date()): Boolean = active && task == null && !failover.isWaitingDelay(now)
def shouldStart(hostname: String, now: Date = new Date()): Boolean =
active && task == null &&
stickiness.allowsHostname(hostname, now) && !failover.isWaitingDelay(now)

def shouldStop: Boolean = !active && task != null && !task.stopping

def registerStart(hostname: String): Unit = {
stickiness.registerStart(hostname)
failover.resetFailures()
}

def registerStop(now: Date = new Date(), failed: Boolean = false): Unit = {
if (!failed || failover.failures == 0) stickiness.registerStop(now)

if (failed) failover.registerFailure(now)
else failover.resetFailures()
}

def state(now: Date = new Date()): String = {
if (task != null && !task.starting) return task.state
Expand Down Expand Up @@ -169,6 +184,7 @@ class Broker(_id: String = "0") {
if (node.contains("log4jOptions")) log4jOptions = Util.parseMap(node("log4jOptions").asInstanceOf[String])
if (node.contains("jvmOptions")) jvmOptions = node("jvmOptions").asInstanceOf[String]

if (node.contains("stickiness")) stickiness.fromJson(node("stickiness").asInstanceOf[Map[String, Object]])
failover.fromJson(node("failover").asInstanceOf[Map[String, Object]])

if (node.contains("task")) {
Expand All @@ -193,6 +209,7 @@ class Broker(_id: String = "0") {
if (!log4jOptions.isEmpty) obj("log4jOptions") = Util.formatMap(log4jOptions)
if (jvmOptions != null) obj("jvmOptions") = jvmOptions

obj("stickiness") = stickiness.toJson
obj("failover") = failover.toJson
if (task != null) obj("task") = task.toJson

Expand All @@ -212,7 +229,46 @@ object Broker {

def isOptionOverridable(name: String): Boolean = !List("broker.id", "port", "zookeeper.connect").contains(name)

class Failover(_delay: Period = new Period("10s"), _maxDelay: Period = new Period("60s")) {
class Stickiness(_period: Period = new Period("10m")) {
var period: Period = _period
@volatile var hostname: String = null
@volatile var stopTime: Date = null

def expires: Date = if (stopTime != null) new Date(stopTime.getTime + period.ms) else null

def registerStart(hostname: String): Unit = {
this.hostname = hostname
stopTime = null
}

def registerStop(now: Date = new Date()): Unit = {
this.stopTime = now
}

def allowsHostname(hostname: String, now: Date = new Date()): Boolean = {
if (this.hostname == null) return true
if (stopTime == null || now.getTime - stopTime.getTime >= period.ms) return true
this.hostname == hostname
}

def fromJson(node: Map[String, Object]): Unit = {
period = new Period(node("period").asInstanceOf[String])
if (node.contains("stopTime")) stopTime = dateTimeFormat.parse(node("stopTime").asInstanceOf[String])
if (node.contains("hostname")) hostname = node("hostname").asInstanceOf[String]
}

def toJson: JSONObject = {
val obj = new collection.mutable.LinkedHashMap[String, Any]()

obj("period") = "" + period
if (stopTime != null) obj("stopTime") = dateTimeFormat.format(stopTime)
if (hostname != null) obj("hostname") = hostname

new JSONObject(obj.toMap)
}
}

class Failover(_delay: Period = new Period("1m"), _maxDelay: Period = new Period("10m")) {
var delay: Period = _delay
var maxDelay: Period = _maxDelay
var maxTries: Integer = null
Expand Down
11 changes: 10 additions & 1 deletion src/scala/ly/stealth/mesos/kafka/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.io._
import java.util
import scala.collection.JavaConversions._
import java.util.{Properties, Collections}
import ly.stealth.mesos.kafka.Util.Period
import ly.stealth.mesos.kafka.Util.{Str, Period}

object Cli {
var api: String = null
Expand Down Expand Up @@ -262,6 +262,7 @@ object Cli {
parser.accepts("heap", "heap amount in Mb").withRequiredArg().ofType(classOf[java.lang.Long])
parser.accepts("port", "port or range (31092, 31090..31100). Default - auto").withRequiredArg().ofType(classOf[java.lang.String])
parser.accepts("bind-address", "broker bind address (broker0, 192.168.50.*, if:eth1). Default - auto").withRequiredArg().ofType(classOf[java.lang.String])
parser.accepts("stickiness-period", "stickiness period to preserve same node for broker (5m, 10m, 1h)").withRequiredArg().ofType(classOf[String])

parser.accepts("options", "options or file. Examples:\n log.dirs=/tmp/kafka/$id,num.io.threads=16\n file:server.properties").withRequiredArg()
parser.accepts("log4j-options", "log4j options or file. Examples:\n log4j.logger.kafka=DEBUG\\, kafkaAppender\n file:log4j.properties").withRequiredArg()
Expand Down Expand Up @@ -304,6 +305,7 @@ object Cli {
val heap = options.valueOf("heap").asInstanceOf[java.lang.Long]
val port = options.valueOf("port").asInstanceOf[String]
val bindAddress = options.valueOf("bind-address").asInstanceOf[String]
val stickinessPeriod = options.valueOf("stickiness-period").asInstanceOf[String]

val constraints = options.valueOf("constraints").asInstanceOf[String]
val options_ = options.valueOf("options").asInstanceOf[String]
Expand All @@ -322,6 +324,7 @@ object Cli {
if (heap != null) params.put("heap", "" + heap)
if (port != null) params.put("port", port)
if (bindAddress != null) params.put("bindAddress", bindAddress)
if (stickinessPeriod != null) params.put("stickinessPeriod", stickinessPeriod)

if (options_ != null) params.put("options", optionsOrFile(options_))
if (constraints != null) params.put("constraints", constraints)
Expand Down Expand Up @@ -561,6 +564,12 @@ object Cli {
if (broker.failover.maxTries != null) failover += ", max-tries:" + broker.failover.maxTries
printLine(failover, indent)

var stickiness = "stickiness:"
stickiness += " period:" + broker.stickiness.period
if (broker.stickiness.hostname != null) stickiness += ", hostname:" + broker.stickiness.hostname
if (broker.stickiness.stopTime != null) stickiness += ", expires:" + Str.dateTime(broker.stickiness.expires)
printLine(stickiness, indent)

val task = broker.task
if (task != null) {
printLine("task: ", indent)
Expand Down
6 changes: 6 additions & 0 deletions src/scala/ly/stealth/mesos/kafka/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ object HttpServer {
try { new BindAddress(request.getParameter("bindAddress")) }
catch { case e: IllegalArgumentException => errors.add("Invalid bindAddress") }

var stickinessPeriod: Period = null
if (request.getParameter("stickinessPeriod") != null)
try { stickinessPeriod = new Period(request.getParameter("stickinessPeriod")) }
catch { case e: IllegalArgumentException => errors.add("Invalid stickinessPeriod") }

var options: util.Map[String, String] = null
if (request.getParameter("options") != null)
try { options = Util.parseMap(request.getParameter("options"), nullValues = false).filterKeys(Broker.isOptionOverridable).view.force }
Expand Down Expand Up @@ -229,6 +234,7 @@ object HttpServer {
if (heap != null) broker.heap = heap
if (port != null) broker.port = if (port != "") new Range(port) else null
if (bindAddress != null) broker.bindAddress = if (bindAddress != "") new BindAddress(bindAddress) else null
if (stickinessPeriod != null) broker.stickiness.period = stickinessPeriod

if (constraints != null) broker.constraints = constraints
if (options != null) broker.options = options
Expand Down
8 changes: 4 additions & 4 deletions src/scala/ly/stealth/mesos/kafka/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ object Scheduler extends org.apache.mesos.Scheduler {
if (isReconciling) return "reconciling"

var reason = ""
for (broker <- cluster.getBrokers.filter(_.shouldStart())) {
for (broker <- cluster.getBrokers.filter(_.shouldStart(offer.getHostname))) {
val diff = broker.matches(offer, otherTasksAttributes)

if (diff == null) {
Expand Down Expand Up @@ -215,16 +215,16 @@ object Scheduler extends org.apache.mesos.Scheduler {
logger.info(s"Finished reconciling of broker ${broker.id}, task ${broker.task.id}")

broker.task.state = Broker.State.RUNNING
broker.failover.resetFailures()
broker.registerStart(broker.task.hostname)
}

private[kafka] def onBrokerStopped(broker: Broker, status: TaskStatus, now: Date = new Date()): Unit = {
broker.task = null

val failed = broker.active && status.getState != TaskState.TASK_FINISHED && status.getState != TaskState.TASK_KILLED
broker.registerStop(now, failed)

if (failed) {
broker.failover.registerFailure(now)

var msg = s"Broker ${broker.id} failed ${broker.failover.failures}"
if (broker.failover.maxTries != null) msg += "/" + broker.failover.maxTries

Expand Down
Loading

0 comments on commit 6a893d9

Please sign in to comment.