phantom
phantom copied to clipboard
Integrating Phantom with Opentracing
I'm looking to see how to integrate the java-cassandra-driver for opentracing with Phantom.
The Java API expects to wrap Cluster, and takes an Initializer:
// Instantiate tracer
Tracer tracer = ...
// Instantiate Cluster Builder
Cluster.Builder builder = Cluster.builder().addContactPoints("127.0.0.1").withPort(9142);
// Instantiate Tracing Cluster
Cluster cluster = new TracingCluster(builder, tracer);
https://github.com/opentracing-contrib/java-cassandra-driver#usage
But there doesn't seem to be an accessible way to do this from Phantom -- the closest I've found is extend the SessionProvider:
class OpentracingSessionProvider(space: KeySpace, builder: ClusterBuilder) extends DefaultSessionProvider(space, builder) with Traceable {
override lazy val cluster: Cluster = new TracingCluster(builder(Cluster.builder), tracer)
}
But from there, it doesn't seem practical to swap out the SessionProvider instantiation. I can't map it through the Cluster.Builder as the API expects the end result.
So where I am right now is...
package sbux.ucp.simple.user.cassandra
import com.datastax.driver.core.{Cluster, Session, _}
import com.outworkers.phantom.connectors.{
ClusterBuilder,
KeySpaceCQLQuery,
SessionAugmenterImplicits,
SessionProvider
}
import com.outworkers.phantom.dsl._
import io.opentracing.Tracer
import org.slf4j.LoggerFactory
import scala.concurrent.blocking
import scala.util.control.{NoStackTrace, NonFatal}
import scala.util.{Failure, Success, Try}
class OpentracingSessionProvider(tracer: Tracer,
val space: KeySpace,
builder: ClusterBuilder,
autoinit: Boolean = true,
keyspaceQuery: Option[KeySpaceCQLQuery] = None,
errorHandler: Throwable => Throwable = identity)
extends SessionProvider {
val logger = LoggerFactory.getLogger(this.getClass)
val cluster: Cluster = builder(Cluster.builder).build
def defaultKeyspaceCreationQuery(session: Session, keySpace: String): String = {
s"CREATE KEYSPACE IF NOT EXISTS $keySpace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};"
}
/**
* Initializes the keySpace with the given name on
* the specified Session.
*/
protected[this] def initKeySpace(session: Session, space: String): Session = blocking {
blocking {
val query =
keyspaceQuery.map(_.queryString).getOrElse(defaultKeyspaceCreationQuery(session, space))
logger.info(s"Automatically initialising keyspace $space with query $query")
session.execute(query)
}
session
}
/**
* Creates a new Session for the specified keySpace.
*/
protected[this] def createSession(keySpace: String): Session = {
Try {
val session = blocking {
cluster.connect
}
if (autoinit) {
initKeySpace(session, keySpace)
} else {
logger.info(s"Auto-init set to false, keyspace $space is not being auto-created.")
session
}
} match {
case Success(value) => value
case Failure(NonFatal(err)) => throw errorHandler(err);
}
}
val session: Session = createSession(space.name)
}
class OpentracingCassandraConnection(tracer: Tracer,
name: String,
clusterBuilder: ClusterBuilder,
autoinit: Boolean,
keyspaceFn: Option[KeySpaceCQLQuery] = None,
errorHandler: Throwable => Throwable = identity) { outer =>
import scala.collection.JavaConverters._
lazy val provider = new OpentracingSessionProvider(
tracer,
KeySpace(name),
clusterBuilder,
autoinit,
keyspaceFn,
errorHandler
)
/**
* The Session associated with this keySpace.
*/
lazy val session: Session = provider.session
def cassandraVersions: Set[VersionNumber] = {
session.getCluster.getMetadata.getAllHosts.asScala
.map(_.getCassandraVersion)
.toSet[VersionNumber]
}
def cassandraVersion: Option[VersionNumber] = {
val versions = cassandraVersions
if (versions.nonEmpty) {
val single = versions.headOption
if (cassandraVersions.size == 1) {
single
} else {
if (single.forall(item => versions.forall(item ==))) {
single
} else {
throw new RuntimeException(
s"Illegal single version comparison. You are connected to clusters of different versions." +
s"Available versions are: ${versions.mkString(", ")}"
) with NoStackTrace
}
}
} else {
throw new RuntimeException(
"Could not extract any versions from the cluster, versions were empty")
}
}
/**
* Trait that can be mixed into `CassandraTable`
* instances.
*/
trait Connector
extends com.outworkers.phantom.connectors.Connector
with SessionAugmenterImplicits {
lazy val provider: OpentracingSessionProvider = outer.provider
lazy val keySpace: String = outer.name
implicit val space: KeySpace = KeySpace(outer.name)
def cassandraVersion: Option[VersionNumber] = outer.cassandraVersion
def cassandraVersions: Set[VersionNumber] = outer.cassandraVersions
}
}
object OpentracingContactPoints {
/**
* A keyspace builder based on the specified
* contact points, all running on the default port.
*/
def apply(tracer: Tracer, hosts: Seq[String]): OpentracingKeySpaceBuilder =
new OpentracingKeySpaceBuilder(tracer, _.addContactPoints(hosts: _*))
/**
* A keyspace builder based on the specified
* contact points, all running on the specified port.
*/
def apply(tracer: Tracer, hosts: Seq[String], port: Int): OpentracingKeySpaceBuilder =
new OpentracingKeySpaceBuilder(tracer, _.addContactPoints(hosts: _*).withPort(port))
}
class OpentracingKeySpaceBuilder(tracer: Tracer, clusterBuilder: ClusterBuilder) {
/**
* Specify an additional builder to be applied when creating the Cluster instance.
* This hook exposes the underlying Java API of the builder API of the Cassandra
* driver.
*/
def withClusterBuilder(builder: ClusterBuilder): OpentracingKeySpaceBuilder =
new OpentracingKeySpaceBuilder(tracer, clusterBuilder andThen builder)
/**
* Disables the heartbeat for the current builder.
* This is designed for local instantiations of connectors or test environments.
* @return A new cluster builder, with the heartbeat interval set to 0(disabled).
*/
def noHeartbeat(): OpentracingKeySpaceBuilder = {
new OpentracingKeySpaceBuilder(tracer,
clusterBuilder andThen (_.withPoolingOptions(
new PoolingOptions().setHeartbeatIntervalSeconds(0))))
}
/**
* Creates and can initialise a keyspace with the given name.
* @param name The name of the keyspace, case sensititve by default.
* @param autoinit Whether or not to automatically initialise the keyspace before the session is created.
* @param query The builder to use when producing the keyspace query.
* @return
*/
def keySpace(
name: String,
autoinit: Boolean = true,
query: Option[KeySpaceCQLQuery] = None,
errorHandler: Throwable => Throwable = identity
): OpentracingCassandraConnection = {
new OpentracingCassandraConnection(tracer, name, clusterBuilder, autoinit, query, errorHandler)
}
/**
* Creates and can initialise a keyspace with the given name.
* This will automatically initialise the keyspace by default, as we consider
* passing a specific keyspace query indicates clear intent you want this to happen.
* @param name The name of the keyspace, case sensititve by default.
* @param query The builder to use when producing the keyspace query.
* @return
*/
@deprecated("Simply pass in a keySpace query, the keyspace is not required", "2.8.5")
def keySpace(
name: String,
query: KeySpaceCQLQuery
): OpentracingCassandraConnection = {
new OpentracingCassandraConnection(tracer, name, clusterBuilder, true, Some(query))
}
/**
* Creates and can initialise a keyspace with the given name.
* This will automatically initialise the keyspace by default, as we consider
* passing a specific keyspace query indicates clear intent you want this to happen.
* @param query The builder to use when producing the keyspace query.
* @return
*/
def keySpace(
query: KeySpaceCQLQuery
): OpentracingCassandraConnection = {
new OpentracingCassandraConnection(tracer, query.keyspace, clusterBuilder, true, Some(query))
}
}
Hi @wsargent,
There's probably a far far easier way to expose this from within phantom, let us think of a way to include it in the next release, sounds like all you need is access to that session initialiser, we can de-couple that implementation and allow you to deal with it from within the phantom builder itself.
Regards,
Thanks! I know that working with a type safe builder pattern can be involved -- what I've done in the past has been to use a trait with an abstract type member Self:
https://github.com/playframework/play-ws/blob/master/play-ws-standalone/src/main/scala/play/api/libs/ws/StandaloneWSRequest.scala#L14
so implementations can return the subtype:
https://github.com/playframework/play-ws/blob/master/play-ahc-ws-standalone/src/main/scala/play/api/libs/ws/ahc/StandaloneAhcWSRequest.scala#L44
and not the super type like you'd normally get. (There was a reason I didn't use this.type, but I'm afraid I don't remember it right now).
@wsargent Where is the actual line where the tracer gets used? This has now become top priority and we can deliver the functionality as part of phantom, but I can't really tell rom the above code where it's interplaying with Cassandra, I'm guessing that's inside OpentracingCassandraConnection, but that particular code is not found in the things you pasted above.
Please let me know if you can so we can push this to prod quickly for you.
The TracingCluster is part of the opentracing-cassandra-driver, so I've been mapping everything out through there. OpentracingCassandraConnection is my rendering of https://github.com/outworkers/phantom/blob/develop/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/CassandraConnection.scala
Hi @wsargent This is becoming a priority on our part soon. However, I don't really like all the open tracing code, because it's got things that work around query strings, which is necessary in the raw driver but not so much in Phantom, because we actually have complex ADTs to model all CQL data-structures and things like that. Query generation is an afterthought from the DSL structures, so we wouldn't need most of the code found in the example.
It may take a while, but if you could invest some time at least to sanity check the output in follow up PRs, I'm sure we can get this out, I know it's already been more than a while since we last discussed this.
@alexflav23 if you can point me at them I'll take a look when I have time. Fair warning, it may be a couple of weeks.