microbean-kubernetes-controller
microbean-kubernetes-controller copied to clipboard
Addition events for pre-existing resources
Thanks for writing this and the accompanying 11-part blog post! I'm new to kubernetes so this was a good primer. I was confused by addition events I got upon starting up a test controller for pods that already existed prior to the controller being started. From the code it looks like that's expected? If so, what would be the canonical way to properly ignore/filter those out in an event consumer? I spent some time to see if there's a way to provide a non-empty known objects map but I don't think that's what you've intended here right?
Here's my messy throwaway code
public static void main(String ... argz) throws InterruptedException, IOException {
Config config = new ConfigBuilder().withMasterUrl("https://192.168.99.100:8443").build();
KubernetesClient client = new DefaultKubernetesClient(config);
LOG.info("current pods list {}", client.pods().list());
Map<Object,Pod> knownPodjects = new HashMap<>();
ResourceTrackingEventQueueConsumer<Pod> testConsumer = new ResourceTrackingEventQueueConsumer<Pod>(knownPodjects) {
@Override
protected void accept(AbstractEvent<? extends Pod> event) {
LOG.info("Accepting {}", event);
}
};
Controller<Pod> podController = new Controller<Pod>(client.pods().inNamespace("default"), knownPodjects, testConsumer);
podController.start();
LOG.info("Controller started");
Thread.sleep(TimeUnit.DAYS.toMillis(1));
}
and I get output like the following when calling start
2019-02-06 16:54:50.192 [pool-1-thread-1] INFO c.h.j.k.o.service.TestLauncher - Accepting ADDITION: Pod(apiVersion=v1, kind=Pod, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=2019-02-06T21:04:44Z, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=hello-minikube-6fd785d459-, generation=null, initializers=null, labels={pod-template-hash=6fd785d459, run=hello-minikube}, name=hello-minikube-6fd785d459-trqq5, namespace=default, ownerReferences=[OwnerReference(apiVersion=apps/v1, blockOwnerDeletion=true, controller=true, kind=ReplicaSet, name=hello-minikube-6fd785d459, uid=d3ffc20f-2a52-11e9-af67-0800272da21b, additionalProperties={})], resourceVersion=700, selfLink=/api/v1/namespaces/default/pods/hello-minikube-6fd785d459-trqq5, uid=d405fb60-2a52-11e9-af67-0800272da21b, additionalProperties={}), spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[], envFrom=[], image=k8s.gcr.io/echoserver:1.10, imagePullPolicy=IfNotPresent, lifecycle=null, livenessProbe=null, name=hello-minikube, ports=[ContainerPort(containerPort=8080, hostIP=null, hostPort=null, name=null, protocol=TCP, additionalProperties={})], readinessProbe=null, resources=ResourceRequirements(limits=null, requests=null, additionalProperties={}), securityContext=null, stdin=null, stdinOnce=null, terminationMessagePath=/dev/termination-log, terminationMessagePolicy=File, tty=null, volumeDevices=[], volumeMounts=[VolumeMount(mountPath=/var/run/secrets/kubernetes.io/serviceaccount, mountPropagation=null, name=default-token-s47ww, readOnly=true, subPath=null, additionalProperties={})], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=ClusterFirst, enableServiceLinks=true, hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=minikube, nodeSelector=null, priority=0, priorityClassName=null, readinessGates=[], restartPolicy=Always, runtimeClassName=null, schedulerName=default-scheduler, securityContext=PodSecurityContext(fsGroup=null, runAsGroup=null, runAsNonRoot=null, runAsUser=null, seLinuxOptions=null, supplementalGroups=[], sysctls=[], additionalProperties={}), serviceAccount=default, serviceAccountName=default, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=30, tolerations=[Toleration(effect=NoExecute, key=node.kubernetes.io/not-ready, operator=Exists, tolerationSeconds=300, value=null, additionalProperties={}), Toleration(effect=NoExecute, key=node.kubernetes.io/unreachable, operator=Exists, tolerationSeconds=300, value=null, additionalProperties={})], volumes=[Volume(awsElasticBlockStore=null, azureDisk=null, azureFile=null, cephfs=null, cinder=null, configMap=null, downwardAPI=null, emptyDir=null, fc=null, flexVolume=null, flocker=null, gcePersistentDisk=null, gitRepo=null, glusterfs=null, hostPath=null, iscsi=null, name=default-token-s47ww, nfs=null, persistentVolumeClaim=null, photonPersistentDisk=null, portworxVolume=null, projected=null, quobyte=null, rbd=null, scaleIO=null, secret=SecretVolumeSource(defaultMode=420, items=[], optional=null, secretName=default-token-s47ww, additionalProperties={}), storageos=null, vsphereVolume=null, additionalProperties={})], additionalProperties={}), status=PodStatus(conditions=[PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:44Z, message=null, reason=null, status=True, type=Initialized, additionalProperties={}), PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:55Z, message=null, reason=null, status=True, type=Ready, additionalProperties={}), PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:55Z, message=null, reason=null, status=True, type=ContainersReady, additionalProperties={}), PodCondition(lastProbeTime=null, lastTransitionTime=2019-02-06T21:04:44Z, message=null, reason=null, status=True, type=PodScheduled, additionalProperties={})], containerStatuses=[ContainerStatus(containerID=docker://ad6050e94bed55c9830d7105d2a14a937b0ef739d7d75a0311dd0a8cfbf0a794, image=k8s.gcr.io/echoserver:1.10, imageID=docker-pullable://k8s.gcr.io/echoserver@sha256:cb5c1bddd1b5665e1867a7fa1b5fa843a47ee433bbb75d4293888b71def53229, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=hello-minikube, ready=true, restartCount=0, state=ContainerState(running=ContainerStateRunning(startedAt=2019-02-06T21:04:54Z, additionalProperties={}), terminated=null, waiting=null, additionalProperties={}), additionalProperties={})], hostIP=10.0.2.15, initContainerStatuses=[], message=null, nominatedNodeName=null, phase=Running, podIP=172.17.0.4, qosClass=BestEffort, reason=null, startTime=2019-02-06T21:04:44Z, additionalProperties={}), additionalProperties={})
Does there need to be a way to get a controller "up to speed" with the state of the world before it starts processing net-new events as deltas upon that state?
I believe (if I remember right!) that you want to test if the AbstractEvent
is an instance of SynchronizationEvent
. If it is, then it's an event that is getting the controller "up to speed". If it's not, then it's just an Event
, and it is a true event.
For various reasons you don't want to be too reliant on the presence of Event
s to signal that something has happened if you can help it. You want to inspect the state of the cache instead (the knownObjects
map). Having said that for most situations you can probably react to the event itself directly.
The analogous Go code refers to this state as cache sync. I seem to recall I actually fire a Java beans property when this happens, but it's unclear exactly how you'd make use of that here, to be fair.
Thank you for the answer, checking for SynchonizationEvent
will help filter out some noise there.
When you say
you don't want to be too reliant on the presence of
Event
s to signal that something has happened
is the preferred pattern to just poll the cache entries periodically? If the presence of Event
s aren't reliable, why would the cache, which is being updated from events if I understand this correctly, be more reliable?
Just discovered awaitEventCacheSynchronization
method which I hadn't noticed which is handy. An analogous callback on ResourceTrackingEventQueueConsumer
might be nice so that object could manage both time and event based updates itself?
(Bear in mind the Go code is the authoritative answer; I've just tried to translate it into Java idiomatically.)
Remember that if you set up a synchronization interval, then every so often the cache gets repopulated in its entirety via a list operation.
I think the salient point here is when an event comes in, you don't just grab the data from the event, but you ask the cache for the relevant data.
Regarding awaitEventCacheSynchronization
, I'll look to see if it can be exposed elsewhere.
I was dealing with this by checking timestamps, but using the SynchronizationEvent
trait is probably better. I'll check that out.
Might be cool to get events posted when the timed synchronization has happened too?
For background on the patterns involved here, have a look at this: https://stackoverflow.com/questions/31041766/what-does-edge-based-and-level-based-mean
So Kubernetes in general is biased towards level-based. That is, rather than relying on the reception of an event (in general), Kubernetes internals always check the "level" of the system to verify if it matches what any given event or stopwatch expiration might claim it is supposed to be.
I am sure there is more work to do in this area in my framework.
For example, one of the things I'd like to do in the abstract is to tell the event broadcasting mechanism to not fire at all until awaitEventCacheSynchronization
has happened. This is perhaps less important in this project than it is in the microbean-kubernetes-controller-cdi
derivative.