Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Showing with 68 additions and 54 deletions
...@@ -15,3 +15,7 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory ...@@ -15,3 +15,7 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.bootstrap.servers=localhost:9092 systems.kafka.consumer.bootstrap.servers=localhost:9092
systems.kafka.producer.bootstrap.servers=localhost:9092 systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1 systems.kafka.default.stream.replication.factor=1
# Configure JMX metrics exporter
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=jmx
...@@ -6,4 +6,11 @@ ENV ENABLE_METRICS=true ...@@ -6,4 +6,11 @@ ENV ENABLE_METRICS=true
ADD build/distributions/uc3-beam-samza.tar / ADD build/distributions/uc3-beam-samza.tar /
ADD samza-standalone.properties / ADD samza-standalone.properties /
CMD /uc3-beam-samza/bin/uc3-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=$ENABLE_METRICS --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc3-beam-samza/bin/uc3-beam-samza \
--configFilePath=samza-standalone.properties \
--samzaExecutionEnvironment=STANDALONE \
--maxSourceParallelism=$MAX_SOURCE_PARALLELISM \
--enableMetrics=$ENABLE_METRICS \
--configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
...@@ -15,3 +15,7 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory ...@@ -15,3 +15,7 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.bootstrap.servers=localhost:9092 systems.kafka.consumer.bootstrap.servers=localhost:9092
systems.kafka.producer.bootstrap.servers=localhost:9092 systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1 systems.kafka.default.stream.replication.factor=1
# Configure JMX metrics exporter
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=jmx
...@@ -6,4 +6,10 @@ ENV ENABLE_METRICS=true ...@@ -6,4 +6,10 @@ ENV ENABLE_METRICS=true
ADD build/distributions/uc4-beam-samza.tar / ADD build/distributions/uc4-beam-samza.tar /
ADD samza-standalone.properties / ADD samza-standalone.properties /
CMD /uc4-beam-samza/bin/uc4-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=$ENABLE_METRICS --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}" CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc4-beam-samza/bin/uc4-beam-samza \
--configFilePath=samza-standalone.properties \
--samzaExecutionEnvironment=STANDALONE \
--maxSourceParallelism=$MAX_SOURCE_PARALLELISM \
--enableMetrics=$ENABLE_METRICS \
--configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
...@@ -16,6 +16,10 @@ systems.kafka.consumer.bootstrap.servers=localhost:9092 ...@@ -16,6 +16,10 @@ systems.kafka.consumer.bootstrap.servers=localhost:9092
systems.kafka.producer.bootstrap.servers=localhost:9092 systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.default.stream.replication.factor=1 systems.kafka.default.stream.replication.factor=1
# Configure JMX metrics exporter
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=jmx
# Configure serialization and stores # Configure serialization and stores
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
......
...@@ -28,12 +28,12 @@ abstract class AbstractResourcePatcher( ...@@ -28,12 +28,12 @@ abstract class AbstractResourcePatcher(
when (resource) { when (resource) {
is Deployment -> { is Deployment -> {
resource.spec.template.spec.containers.filter { it.name == container }.forEach { resource.spec.template.spec.containers.filter { it.name == container }.forEach {
setLimits(it, value) setValues(it, value)
} }
} }
is StatefulSet -> { is StatefulSet -> {
resource.spec.template.spec.containers.filter { it.name == container }.forEach { resource.spec.template.spec.containers.filter { it.name == container }.forEach {
setLimits(it, value) setValues(it, value)
} }
} }
else -> { else -> {
...@@ -43,7 +43,7 @@ abstract class AbstractResourcePatcher( ...@@ -43,7 +43,7 @@ abstract class AbstractResourcePatcher(
return resource return resource
} }
private fun setLimits(container: Container, value: String) { private fun setValues(container: Container, value: String) {
val quantity = if (this.format != null || this.factor != null) { val quantity = if (this.format != null || this.factor != null) {
val amountAsInt = value.toIntOrNull()?.times(this.factor ?: 1) val amountAsInt = value.toIntOrNull()?.times(this.factor ?: 1)
if (amountAsInt == null) { if (amountAsInt == null) {
...@@ -56,8 +56,8 @@ abstract class AbstractResourcePatcher( ...@@ -56,8 +56,8 @@ abstract class AbstractResourcePatcher(
Quantity(value) Quantity(value)
} }
setLimits(container, quantity) setValues(container, quantity)
} }
abstract fun setLimits(container: Container, quantity: Quantity) abstract fun setValues(container: Container, quantity: Quantity)
} }
\ No newline at end of file
...@@ -12,86 +12,76 @@ class PatcherFactory { ...@@ -12,86 +12,76 @@ class PatcherFactory {
* Create patcher based on the given [PatcherDefinition] and * Create patcher based on the given [PatcherDefinition] and
* the list of KubernetesResources. * the list of KubernetesResources.
* *
* @param patcherDefinition The [PatcherDefinition] for which are * @param patcher The [PatcherDefinition] for which are [Patcher] should be created.
* [Patcher] should be created.
* @param k8sResources List of all available Kubernetes resources.
* This is a list of pairs<String, KubernetesResource>:
* The frist corresponds to the filename where the resource is defined.
* The second corresponds to the concrete [KubernetesResource] that should be patched.
* @return The created [Patcher]. * @return The created [Patcher].
* @throws IllegalArgumentException if no patcher can be created. * @throws IllegalArgumentException if no patcher can be created.
*/ */
fun createPatcher( fun createPatcher(patcher: PatcherDefinition): Patcher {
patcherDefinition: PatcherDefinition, return when (patcher.type) {
): Patcher {
return try {
when (patcherDefinition.type) {
"ReplicaPatcher" -> ReplicaPatcher( "ReplicaPatcher" -> ReplicaPatcher(
) )
"NumNestedGroupsLoadGeneratorReplicaPatcher" -> NumNestedGroupsLoadGeneratorReplicaPatcher( "NumNestedGroupsLoadGeneratorReplicaPatcher" -> NumNestedGroupsLoadGeneratorReplicaPatcher(
loadGenMaxRecords = patcherDefinition.properties["loadGenMaxRecords"]!!, loadGenMaxRecords = patcher.properties["loadGenMaxRecords"] ?: throwInvalid(patcher),
numSensors = patcherDefinition.properties["numSensors"]!! numSensors = patcher.properties["numSensors"] ?: throwInvalid(patcher)
) )
"NumSensorsLoadGeneratorReplicaPatcher" -> NumSensorsLoadGeneratorReplicaPatcher( "NumSensorsLoadGeneratorReplicaPatcher" -> NumSensorsLoadGeneratorReplicaPatcher(
loadGenMaxRecords = patcherDefinition.properties["loadGenMaxRecords"]!! loadGenMaxRecords = patcher.properties["loadGenMaxRecords"] ?: throwInvalid(patcher)
) )
"DataVolumeLoadGeneratorReplicaPatcher" -> DataVolumeLoadGeneratorReplicaPatcher( "DataVolumeLoadGeneratorReplicaPatcher" -> DataVolumeLoadGeneratorReplicaPatcher(
maxVolume = patcherDefinition.properties["maxVolume"]!!.toInt(), maxVolume = (patcher.properties["maxVolume"] ?: throwInvalid(patcher)).toInt(),
container = patcherDefinition.properties["container"]!!, container = patcher.properties["container"] ?: throwInvalid(patcher),
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"EnvVarPatcher" -> EnvVarPatcher( "EnvVarPatcher" -> EnvVarPatcher(
container = patcherDefinition.properties["container"]!!, container = patcher.properties["container"] ?: throwInvalid(patcher),
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"NodeSelectorPatcher" -> NodeSelectorPatcher( "NodeSelectorPatcher" -> NodeSelectorPatcher(
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"ResourceLimitPatcher" -> ResourceLimitPatcher( "ResourceLimitPatcher" -> ResourceLimitPatcher(
container = patcherDefinition.properties["container"]!!, container = patcher.properties["container"] ?: throwInvalid(patcher),
limitedResource = patcherDefinition.properties["limitedResource"]!!, limitedResource = patcher.properties["limitedResource"] ?: throwInvalid(patcher),
format = patcherDefinition.properties["format"], format = patcher.properties["format"],
factor = patcherDefinition.properties["factor"]?.toInt() factor = patcher.properties["factor"]?.toInt()
) )
"ResourceRequestPatcher" -> ResourceRequestPatcher( "ResourceRequestPatcher" -> ResourceRequestPatcher(
container = patcherDefinition.properties["container"]!!, container = patcher.properties["container"] ?: throwInvalid(patcher),
requestedResource = patcherDefinition.properties["requestedResource"]!!, requestedResource = patcher.properties["requestedResource"] ?: throwInvalid(patcher),
format = patcherDefinition.properties["format"], format = patcher.properties["format"],
factor = patcherDefinition.properties["factor"]?.toInt() factor = patcher.properties["factor"]?.toInt()
) )
"SchedulerNamePatcher" -> SchedulerNamePatcher() "SchedulerNamePatcher" -> SchedulerNamePatcher()
"LabelPatcher" -> LabelPatcher( "LabelPatcher" -> LabelPatcher(
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"MatchLabelPatcher" -> MatchLabelPatcher( "MatchLabelPatcher" -> MatchLabelPatcher(
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"TemplateLabelPatcher" -> TemplateLabelPatcher( "TemplateLabelPatcher" -> TemplateLabelPatcher(
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"ImagePatcher" -> ImagePatcher( "ImagePatcher" -> ImagePatcher(
container = patcherDefinition.properties["container"]!! container = patcher.properties["container"] ?: throwInvalid(patcher)
) )
"ConfigMapYamlPatcher" -> ConfigMapYamlPatcher( "ConfigMapYamlPatcher" -> ConfigMapYamlPatcher(
fileName = patcherDefinition.properties["fileName"]!!, fileName = patcher.properties["fileName"] ?: throwInvalid(patcher),
variableName = patcherDefinition.properties["variableName"]!! variableName = patcher.properties["variableName"] ?: throwInvalid(patcher)
) )
"NamePatcher" -> NamePatcher() "NamePatcher" -> NamePatcher()
"ServiceSelectorPatcher" -> ServiceSelectorPatcher( "ServiceSelectorPatcher" -> ServiceSelectorPatcher(
variableName = patcherDefinition.properties["label"]!! variableName = patcher.properties["label"] ?: throwInvalid(patcher)
) )
"VolumesConfigMapPatcher" -> VolumesConfigMapPatcher( "VolumesConfigMapPatcher" -> VolumesConfigMapPatcher(
volumeName = patcherDefinition.properties["volumeName"]!! volumeName = patcher.properties["volumeName"] ?: throwInvalid(patcher)
) )
else -> throw InvalidPatcherConfigurationException("Patcher type ${patcherDefinition.type} not found.") else -> throw InvalidPatcherConfigurationException("Patcher type ${patcher.type} not found.")
} }
} catch (e: NullPointerException) { }
throw InvalidPatcherConfigurationException(
"Could not create patcher with type ${patcherDefinition.type}" + private fun throwInvalid(patcher: PatcherDefinition): String {
" Probably a required patcher argument was not specified.", e throw InvalidPatcherConfigurationException("Could not create patcher with type ${patcher.type}. Probably a required patcher argument was not specified.")
)
}
} }
} }
} }
...@@ -24,7 +24,7 @@ class ResourceLimitPatcher( ...@@ -24,7 +24,7 @@ class ResourceLimitPatcher(
format = format, format = format,
factor = factor factor = factor
) { ) {
override fun setLimits(container: Container, quantity: Quantity) { override fun setValues(container: Container, quantity: Quantity) {
when { when {
container.resources == null -> { container.resources == null -> {
val resource = ResourceRequirements() val resource = ResourceRequirements()
......
...@@ -4,7 +4,6 @@ import io.fabric8.kubernetes.api.model.Container ...@@ -4,7 +4,6 @@ import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.Quantity import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.kubernetes.api.model.ResourceRequirements import io.fabric8.kubernetes.api.model.ResourceRequirements
/** /**
* The Resource request [Patcher] sets resource requests for Deployments and StatefulSets. * The Resource request [Patcher] sets resource requests for Deployments and StatefulSets.
* *
...@@ -25,7 +24,7 @@ class ResourceRequestPatcher( ...@@ -25,7 +24,7 @@ class ResourceRequestPatcher(
factor = factor factor = factor
) { ) {
override fun setLimits(container: Container, quantity: Quantity) { override fun setValues(container: Container, quantity: Quantity) {
when { when {
container.resources == null -> { container.resources == null -> {
val resource = ResourceRequirements() val resource = ResourceRequirements()
......