Feature Request: Add new source type cloud-logging for querying Google Cloud logs
Prerequisites
- [x] Search the current open issues
What are you trying to do that currently feels hard or impossible?
Add a new source type called cloud-logging to the MCP Toolbox that enables agents and tools to directly query and analyze logs from Google Cloud Logging (formerly Stackdriver).
This source should allow users to:
Query logs using Cloud Logging’s advanced filters.
Stream or batch retrieve log entries for a given resource, project, or time window.
Support integration with analysis or monitoring tools in MCP.
Suggested Solution(s)
sources: gcp-logs: kind: cloud-logging project: my-gcp-project resource_type: "gce_instance" # optional filter: "severity>=ERROR" time_range: "1h" # e.g., last 1 hour mode: "query" # options: query | stream credentials: use_default_credentials: true
Alternatives Considered
No response
Additional Details
The MCP Toolbox should recognize kind: cloud-logging as a valid source type.
It should authenticate via Application Default Credentials or provided service account JSON key.
Enable:
Query mode: fetch log entries matching the filter and time range.
Stream mode: continuously stream new logs in real-time.
Return log entries in structured format (JSON with timestamp, resource, severity, message).
@vksinha10 Thanks for opening the FR. We would absolutely be happy to accept contributions to the project. Do you think you could share a design doc/issue that we can review?
CC: @kurtisvg
@anubhav756
Proposal
Source: gcloud-logging Will Support Auth (Application Default Credentials, OAuth for user-delegated access)
Tools (Use official Google Cloud Logging Go SDK)
- gcloud-logging-query
- gcloud-logging-stream
- gcloud-logging-list-log-names (helps in discovery of what to query)
- gcloud-logging-list-resources (helps build filters)
- gcloud-logging-count (Useful for fast triage without pulling full logs)
Deliverables
- Source -> source.go & source_test.go
- tools (for each tool) -> tool.go & tool_test.go
- Docs Update (index, source, tools)
- prebuiltconfigs
- Integration test of source and tools
- updated ci
If this looks good to you, I can start work on this and follow up with a PR shortly.
Thanks for outlining the proposal. This looks like a great starting point.
To move forward, could you please provide a more detailed API specification for the proposed tools? Specifically, it would be helpful to understand the expected parameters for each of the gcloud-logging tools (query, stream, list-log-names, list-resources, and count).
A few other points to clarify:
- The proposal mentions support for ADC and OAuth. Are there any other specific auth requirements or considerations we need to account for?
- Could you elaborate on the configuration settings at the source level? Would it be feasible to have a phased approach, where Phase 1 includes all inputs at the tool level, and we can introduce source-level configurations in a later phase.
- Your proposal includes tools for both querying and streaming. Could you elaborate on the expected behavior and use cases for each?
- Your proposal mentions using the official Google Cloud Logging Go SDK. Are there also plans to call the REST API directly?
Once we have a clearer picture of these details, we can move forward with a more formal design and implementation plan.
CC: @averikitsch
@anubhav756 sure, I'll be providing details shortly.
Source
Your proposal mentions using the official Google Cloud Logging Go SDK. Are there also plans to call the REST API directly?
logadmin exposes only list/query operations. logging/apiv2 exposes the streaming RPC. These will provide ability to handle auth, retries, pagination etc. I think the use case can be fulfilled well by utilising the above and I don't see a need to call the REST API directly, incase I am missing out on anything please give your input on this.
The proposal mentions support for ADC and OAuth. Are there any other specific auth requirements or considerations we need to account for?
For Auth, ADC and OAuth seems sufficient but is we take a look at the client initialisation layer, they also support optional environment-driven options like Endpoint override, Scope override and Quota-project these will be necessary in case of transport and permission controls required in restricted networks/constrained credential setups.
Endpoint override -> public Google endpoints are blocked
Scope override -> when issued credentials lack required permissions
Quota-project -> when billing must be redirected
This is a minimal configuration required for both query and stream client.
sources:
cloud-logging:
kind: cloud-logging
name: logs
defaultProject: my-project
useClientOAuth: false # ADC vs user-token
endpoint: "" # optional override
scopes: [] # optional override
quotaProject: "" # optional override
Note: There will be two separate source types, one initializes a logadmin client, the other initializes a logging/apiv2 client.
Could you elaborate on the configuration settings at the source level? Would it be feasible to have a phased approach, where Phase 1 includes all inputs at the tool level, and we can introduce source-level configurations in a later phase.
Source Level inputs - project, auth, Endpoint override, Scope override, Quota-project Tool-level inputs - per-operation parameters only: filters, limits, time ranges, stream commands.
A phased approach is feasible IMO. Phase 1: we can put everything operational in the tools and keep the source minimal
Tools
A more detailed API specification for the proposed tools, expected parameters for each of the gcloud-logging tools (query, stream, list-log-names, list-resources, and count)
- query
// Input
type QueryInput struct {
Filter string `json:"filter"`
Limit int `json:"limit"`
Order string `json:"order,omitempty"`
StartTime *string `json:"startTime,omitempty"`
EndTime *string `json:"endTime,omitempty"`
}
// Output
type QueryOutput struct {
Entries []QueryEntry `json:"entries"`
}
type QueryEntry struct {
LogName string `json:"logName"`
Timestamp string `json:"timestamp"`
Severity string `json:"severity"`
Resource QueryResource `json:"resource"`
JSON map[string]any `json:"jsonPayload"`
Text string `json:"textPayload"`
}
type QueryResource struct {
Type string `json:"type"`
Labels map[string]string `json:"labels"`
}
- list-log-names
// Input
type ListLogNamesInput struct {
Filter *string `json:"filter,omitempty"`
}
// Output
type ListLogNamesOutput struct {
LogNames []string `json:"logNames"`
}
- list-resources
// Input
type ListResourcesInput struct{}
// Output
type ListResourcesOutput struct {
Resources []ResourceDescriptor `json:"resources"`
}
type ResourceDescriptor struct {
Type string `json:"type"`
Labels map[string]string `json:"labels"`
}
- count
type CountInput struct {
Filter string `json:"filter"`
StartTime *string `json:"startTime,omitempty"`
EndTime *string `json:"endTime,omitempty"`
}
type CountOutput struct {
Count int `json:"count"`
}
- Stream
// Input will handle start, read and stop
type StreamInput struct {
Command string `json:"command"` // "start" | "read" | "stop"
Filter *string `json:"filter,omitempty"` // only for start
Cursor *int `json:"cursor,omitempty"` // only for read
}
// Output
type StreamOutput struct {
Status string `json:"status,omitempty"` // start/stop
NextCursor *int `json:"nextCursor,omitempty"` // start/read
Entries []StreamEntry `json:"entries,omitempty"` // read
}
type StreamEntry struct {
Seq int `json:"seq"`
LogName string `json:"logName"`
Timestamp string `json:"timestamp"`
Severity string `json:"severity"`
Resource StreamResource `json:"resource"`
JSON map[string]any `json:"jsonPayload"`
Text string `json:"textPayload"`
}
type StreamResource struct {
Type string `json:"type"`
Labels map[string]string `json:"labels"`
}
Note: As per my understanding Streaming is stateful and long lived, so we cannot implement a tool that starts streaming when the project initialises and only stops when project stops.
A single tool with start, read, stop provides the bridge:
startwill open theTailLogEntriesstream in a backgroundgoroutineowned by the source.readwill return buffered entries since the last cursorstopwill cancel the stream context and terminate the backgroundgoroutine, preventing ongoing billable reads.
Your proposal includes tools for both querying and streaming. Could you elaborate on the expected behavior and use cases for each?
Query: will pull a set of log based on filter and other inputs can be useful to access when agent needs a point in time inspection like analysing recent errors, extract patterns from a fixed slice of logs.
Stream: Can be used by agent to monitor events as they happen like deployment etc and trigger follow up actions, can react to operational signals (deployments, crashes, scaling events).
@anubhav756 @averikitsch Please Review this when ever you see fit, I am looking forward to your valuable input on this,
@anubhav756 It's been a while can you review this and give feedback ?
Thanks for the detailed proposal! The phased approach for configuration makes a lot of sense.
I have a few suggestions:
- Ensure we follow the Tool Naming Conventions
- In your configuration snippet, you correctly used source kind:
cloud-logging, although in the initial message you mentionedgcloud-logging. Please ensurecloud-loggingis used consistently as the source kind to match our other sources. - Make sure the tool kinds are also consistent. So instead of
gcloud-logging-query, we should usecloud-logging-query. - Our tool naming guidelines suggest using underscores and omitting the product name since it's redundant. So the tool name should be
list_log_namesor evenlist_logs(vslist-log-names). - The directory could be
internal/sources/cloudlogging/, matchinginternal/sources/cloudmonitoring/, etc.
- In your configuration snippet, you correctly used source kind:
- For the actual implementation, can you use the
internal/util/parameterspackage to define the tool inputs for Toolbox to correctly parse and validate arguments from the LLM? - The
start/read/stoppattern for the streaming tool is interesting. A couple of questions on the implementation details:- How will we handle multiple concurrent streams? If
startis called twice, does it spawn two background goroutines? - Since the Source instance is long-lived, we might also need a mechanism to map a specific "session" or "cursor" to the correct background stream if we want to support multiple concurrent users/agents. Thoughts?
- For the
readcommand, do we have an eviction policy or a limit for the buffer to prevent memory issues ifreadisn't called frequently enough?
- How will we handle multiple concurrent streams? If
- Is it possible to ensure the client initialization respects
util.UserAgentFromContext(ctx)so we can track usage correctly?
Overall, this looks good to me. Please feel free to start with the source and the query tools. :)
CC: @averikitsch @Yuan325
@anubhav756 Thanks for the review and feedback, I'll start implementing this promptly!
- Noted, I'll keep in mind the tool naming conventions, I might have mixed up the naming in the two proposals 😄
- Regarding
internal/util/parameters-> I was aware of this, but I added tags in my proposal just for better understanding. util.UserAgentFromContext(ctx)yes implementation will respect it.
Lastly regarding streaming tool:
Stream ownership :
start→ allocates its own goroutine + buffer + context and returns a uniquesessionId.read/stop→ resolved only through thatsessionId, this shall prevent cross-stream collision.
Question: what if agent losses the sessionID ? This will result in an orphan and we cannot rely on the client/agent side to ensure stop every time.
As per my understanding we can introduce a Idle-timeout and if a read operation happens before Idle-timeout session resumes else If no read arrives before the timeout window, the session self-terminates, releasing the goroutine and buffer.
Idle-timeout will definitely have a default value and I think it can also be overridden by passing it as a function parameter when start is called, what do you think ?
Working
- Source and session
Source -> map[sessionId]*Session
Session (per active stream)
ctx + cancel
goroutine (TailLogEntries)
ringBuffer (fixed size)
seqCounter
- Start -> returns
sessionId
Source.sessions[sessionId] = &Session{...}
Session.goroutine starts
Session.lastReadTime = now
- Read +
sessionId
lookup = Source.sessions[sessionId]
validate cursor against lookup.ringBuffer
return entries
lookup.lastReadTime = now
- stop +
sessionId
lookup = Source.sessions[sessionId]
lookup.cancel() // terminates goroutine
delete Source.sessions[id] // frees buffer + state
- idle-timeout watcher
for each session in Source.sessions:
if now - session.lastReadTime > idleTimeout:
session.cancel()
delete Source.sessions[sessionId]
Does an implementation like this resolve your concerns around ownership and memory management?
Thanks for the clarifications @pkalsi97. This sounds great to me!
Yes if we can be sure that there is a default timeout which would avoid orphan sessions then that should be good.
I think the plan looks good to start with the implementation 🙂
CC: @Yuan325 @averikitsch @kurtisvg
@anubhav756 Thanks! Working on it.
Thanks @pkalsi97 A couple of thoughts:
- For the first milestone I would suggest no additional endpoints - unless we have specific requirements: Endpoint override -> public Google endpoints are blocked Scope override -> when issued credentials lack required permissions Quota-project -> when billing must be redirected
- We've seen more MCP clients don't support streaming right now so for the first milestone I would omit this functionality.
- Could you outline the tools and the associated API requests?
Sure for the first milestone I'll follow as suggested. Based on testing implementation locally I have settled on the following approach.
Overview:
Source: cloud-logging-admin (using the logadmin sdk)
I am considering implementing the source inline with bigQuery implementation
- If ADC then initialise the client in the source itself
- If Oauth, use Client-per-token, ClientCreator, ClientCache, RestCache
In every tool call:
- ADC: t.Client is ready at init
- OAuth: t.ClientCreator(token) → creates client per-request (cached) After that we can use the same SDK calls for both.
Tools:
list_resources_types
Returns : []string of resource types
Get Iterator using -> client.ResourceDescriptors(ctx)
resourceType := []string
loop:
d, err = iterator.Next()
if err == Done → exit loop
if err → fail
append(types, d.Type)
return resourceType
list_log_names_by_filter
Returns : []string of log names
inputs:
resourceType → optional string
prefix → optional string
startTime → optional time
max → int
// Prepare filter
filter = join(all non-empty filter parts) with AND
iterator := client.Entries(ctx, logadmin.Filter(filter), logadmin.NewestFirst())
seen = {}
result = []
while result.size < max:
entry, err = iterator.Next()
if err == Done → break
if err → fail
if entry.LogName not in seen:
add to seen
append to result
if result empty → fail
return result
count_log_Entries
Returns : int count for a given log name
inputs:
logName → string (required)
inputTimeWindow → duration (optional; defaults to 30 days if ≤ 0)
// Prepare filter
filter = `logName="<logName>" AND timestamp >= "<startTime>"`
iterator = client.Entries(ctx, logadmin.Filter(filter))
count = 0
loop:
entry, err = iterator.Next()
if err == Done → break
if err → fail
count++
return count
query_logs_by_filterI am thinking of using asummary_fieldsparameter as an optional input, this shall allow callers to whitelist specific fields in the response (e.g., timestamp, severity, payload). Let me know what you think.
inputs:
filter → optional string
limit → int
summaryFields → optional []string
iterator = client.Entries(ctx, opts...)
entries = []
loop:
if limit > 0 and len(entries) >= limit: break
entry, err = iterator.Next()
if err == Done: break
if err: fail with err
append entry to entries
return entries
@averikitsch cc: @anubhav756