Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Showing with 68 additions and 54 deletions
......@@ -15,3 +15,7 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.bootstrap.servers=localhost:9092
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
# Configure JMX metrics exporter
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=jmx
......@@ -6,4 +6,11 @@ ENV ENABLE_METRICS=true
ADD build/distributions/uc3-beam-samza.tar /
ADD samza-standalone.properties /
CMD /uc3-beam-samza/bin/uc3-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=$ENABLE_METRICS --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc3-beam-samza/bin/uc3-beam-samza \
--configFilePath=samza-standalone.properties \
--samzaExecutionEnvironment=STANDALONE \
--maxSourceParallelism=$MAX_SOURCE_PARALLELISM \
--enableMetrics=$ENABLE_METRICS \
--configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
......@@ -15,3 +15,7 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.bootstrap.servers=localhost:9092
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
# Configure JMX metrics exporter
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=jmx
......@@ -6,4 +6,10 @@ ENV ENABLE_METRICS=true
ADD build/distributions/uc4-beam-samza.tar /
ADD samza-standalone.properties /
CMD /uc4-beam-samza/bin/uc4-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=$ENABLE_METRICS --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc4-beam-samza/bin/uc4-beam-samza \
--configFilePath=samza-standalone.properties \
--samzaExecutionEnvironment=STANDALONE \
--maxSourceParallelism=$MAX_SOURCE_PARALLELISM \
--enableMetrics=$ENABLE_METRICS \
--configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
......@@ -16,6 +16,10 @@ systems.kafka.consumer.bootstrap.servers=localhost:9092
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1
# Configure JMX metrics exporter
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=jmx
# Configure serialization and stores
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
......
......@@ -28,12 +28,12 @@ abstract class AbstractResourcePatcher(
when (resource) {
is Deployment -> {
resource.spec.template.spec.containers.filter { it.name == container }.forEach {
setLimits(it, value)
setValues(it, value)
}
}
is StatefulSet -> {
resource.spec.template.spec.containers.filter { it.name == container }.forEach {
setLimits(it, value)
setValues(it, value)
}
}
else -> {
......@@ -43,7 +43,7 @@ abstract class AbstractResourcePatcher(
return resource
}
private fun setLimits(container: Container, value: String) {
private fun setValues(container: Container, value: String) {
val quantity = if (this.format != null || this.factor != null) {
val amountAsInt = value.toIntOrNull()?.times(this.factor ?: 1)
if (amountAsInt == null) {
......@@ -56,8 +56,8 @@ abstract class AbstractResourcePatcher(
Quantity(value)
}
setLimits(container, quantity)
setValues(container, quantity)
}
abstract fun setLimits(container: Container, quantity: Quantity)
abstract fun setValues(container: Container, quantity: Quantity)
}
\ No newline at end of file
......@@ -12,86 +12,76 @@ class PatcherFactory {
* Create patcher based on the given [PatcherDefinition] and
* the list of KubernetesResources.
*
* @param patcherDefinition The [PatcherDefinition] for which are
* [Patcher] should be created.
* @param k8sResources List of all available Kubernetes resources.
* This is a list of pairs<String, KubernetesResource>:
* The frist corresponds to the filename where the resource is defined.
* The second corresponds to the concrete [KubernetesResource] that should be patched.
* @param patcher The [PatcherDefinition] for which are [Patcher] should be created.
* @return The created [Patcher].
* @throws IllegalArgumentException if no patcher can be created.
*/
fun createPatcher(
patcherDefinition: PatcherDefinition,
): Patcher {
return try {
when (patcherDefinition.type) {
fun createPatcher(patcher: PatcherDefinition): Patcher {
return when (patcher.type) {
"ReplicaPatcher" -> ReplicaPatcher(
)
"NumNestedGroupsLoadGeneratorReplicaPatcher" -> NumNestedGroupsLoadGeneratorReplicaPatcher(
loadGenMaxRecords = patcherDefinition.properties["loadGenMaxRecords"]!!,
numSensors = patcherDefinition.properties["numSensors"]!!
loadGenMaxRecords = patcher.properties["loadGenMaxRecords"] ?: throwInvalid(patcher),
numSensors = patcher.properties["numSensors"] ?: throwInvalid(patcher)
)
"NumSensorsLoadGeneratorReplicaPatcher" -> NumSensorsLoadGeneratorReplicaPatcher(
loadGenMaxRecords = patcherDefinition.properties["loadGenMaxRecords"]!!
loadGenMaxRecords = patcher.properties["loadGenMaxRecords"] ?: throwInvalid(patcher)
)
"DataVolumeLoadGeneratorReplicaPatcher" -> DataVolumeLoadGeneratorReplicaPatcher(
maxVolume = patcherDefinition.properties["maxVolume"]!!.toInt(),
container = patcherDefinition.properties["container"]!!,
variableName = patcherDefinition.properties["variableName"]!!
maxVolume = (patcher.properties["maxVolume"] ?: throwInvalid(patcher)).toInt(),
container = patcher.properties["container"] ?: throwInvalid(patcher),
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"EnvVarPatcher" -> EnvVarPatcher(
container = patcherDefinition.properties["container"]!!,
variableName = patcherDefinition.properties["variableName"]!!
container = patcher.properties["container"] ?: throwInvalid(patcher),
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"NodeSelectorPatcher" -> NodeSelectorPatcher(
variableName = patcherDefinition.properties["variableName"]!!
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"ResourceLimitPatcher" -> ResourceLimitPatcher(
container = patcherDefinition.properties["container"]!!,
limitedResource = patcherDefinition.properties["limitedResource"]!!,
format = patcherDefinition.properties["format"],
factor = patcherDefinition.properties["factor"]?.toInt()
container = patcher.properties["container"] ?: throwInvalid(patcher),
limitedResource = patcher.properties["limitedResource"] ?: throwInvalid(patcher),
format = patcher.properties["format"],
factor = patcher.properties["factor"]?.toInt()
)
"ResourceRequestPatcher" -> ResourceRequestPatcher(
container = patcherDefinition.properties["container"]!!,
requestedResource = patcherDefinition.properties["requestedResource"]!!,
format = patcherDefinition.properties["format"],
factor = patcherDefinition.properties["factor"]?.toInt()
container = patcher.properties["container"] ?: throwInvalid(patcher),
requestedResource = patcher.properties["requestedResource"] ?: throwInvalid(patcher),
format = patcher.properties["format"],
factor = patcher.properties["factor"]?.toInt()
)
"SchedulerNamePatcher" -> SchedulerNamePatcher()
"LabelPatcher" -> LabelPatcher(
variableName = patcherDefinition.properties["variableName"]!!
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"MatchLabelPatcher" -> MatchLabelPatcher(
variableName = patcherDefinition.properties["variableName"]!!
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"TemplateLabelPatcher" -> TemplateLabelPatcher(
variableName = patcherDefinition.properties["variableName"]!!
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"ImagePatcher" -> ImagePatcher(
container = patcherDefinition.properties["container"]!!
container = patcher.properties["container"] ?: throwInvalid(patcher)
)
"ConfigMapYamlPatcher" -> ConfigMapYamlPatcher(
fileName = patcherDefinition.properties["fileName"]!!,
variableName = patcherDefinition.properties["variableName"]!!
fileName = patcher.properties["fileName"] ?: throwInvalid(patcher),
variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
)
"NamePatcher" -> NamePatcher()
"ServiceSelectorPatcher" -> ServiceSelectorPatcher(
variableName = patcherDefinition.properties["label"]!!
variableName = patcher.properties["label"] ?: throwInvalid(patcher)
)
"VolumesConfigMapPatcher" -> VolumesConfigMapPatcher(
volumeName = patcherDefinition.properties["volumeName"]!!
volumeName = patcher.properties["volumeName"] ?: throwInvalid(patcher)
)
else -> throw InvalidPatcherConfigurationException("Patcher type ${patcherDefinition.type} not found.")
else -> throw InvalidPatcherConfigurationException("Patcher type ${patcher.type} not found.")
}
} catch (e: NullPointerException) {
throw InvalidPatcherConfigurationException(
"Could not create patcher with type ${patcherDefinition.type}" +
" Probably a required patcher argument was not specified.", e
)
}
}
private fun throwInvalid(patcher: PatcherDefinition): String {
throw InvalidPatcherConfigurationException("Could not create patcher with type ${patcher.type}. Probably a required patcher argument was not specified.")
}
}
}
......@@ -24,7 +24,7 @@ class ResourceLimitPatcher(
format = format,
factor = factor
) {
override fun setLimits(container: Container, quantity: Quantity) {
override fun setValues(container: Container, quantity: Quantity) {
when {
container.resources == null -> {
val resource = ResourceRequirements()
......
......@@ -4,7 +4,6 @@ import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.kubernetes.api.model.ResourceRequirements
/**
* The Resource request [Patcher] sets resource requests for Deployments and StatefulSets.
*
......@@ -25,7 +24,7 @@ class ResourceRequestPatcher(
factor = factor
) {
override fun setLimits(container: Container, quantity: Quantity) {
override fun setValues(container: Container, quantity: Quantity) {
when {
container.resources == null -> {
val resource = ResourceRequirements()
......