genai-toolbox icon indicating copy to clipboard operation
genai-toolbox copied to clipboard

Feature Request: Add new source type cloud-logging for querying Google Cloud logs

Open vksinha10 opened this issue 2 months ago • 7 comments

Prerequisites

What are you trying to do that currently feels hard or impossible?

Add a new source type called cloud-logging to the MCP Toolbox that enables agents and tools to directly query and analyze logs from Google Cloud Logging (formerly Stackdriver).

This source should allow users to:

Query logs using Cloud Logging’s advanced filters.

Stream or batch retrieve log entries for a given resource, project, or time window.

Support integration with analysis or monitoring tools in MCP.

Suggested Solution(s)

sources: gcp-logs: kind: cloud-logging project: my-gcp-project resource_type: "gce_instance" # optional filter: "severity>=ERROR" time_range: "1h" # e.g., last 1 hour mode: "query" # options: query | stream credentials: use_default_credentials: true

Alternatives Considered

No response

Additional Details

The MCP Toolbox should recognize kind: cloud-logging as a valid source type.

It should authenticate via Application Default Credentials or provided service account JSON key.

Enable:

Query mode: fetch log entries matching the filter and time range.

Stream mode: continuously stream new logs in real-time.

Return log entries in structured format (JSON with timestamp, resource, severity, message).

vksinha10 avatar Oct 23 '25 12:10 vksinha10

@vksinha10 Thanks for opening the FR. We would absolutely be happy to accept contributions to the project. Do you think you could share a design doc/issue that we can review?

CC: @kurtisvg

anubhav756 avatar Nov 03 '25 16:11 anubhav756

@anubhav756

Proposal

Source: gcloud-logging Will Support Auth (Application Default Credentials, OAuth for user-delegated access)

Tools (Use official Google Cloud Logging Go SDK)

  1. gcloud-logging-query
  2. gcloud-logging-stream
  3. gcloud-logging-list-log-names (helps in discovery of what to query)
  4. gcloud-logging-list-resources (helps build filters)
  5. gcloud-logging-count (Useful for fast triage without pulling full logs)

Deliverables

  1. Source -> source.go & source_test.go
  2. tools (for each tool) -> tool.go & tool_test.go
  3. Docs Update (index, source, tools)
  4. prebuiltconfigs
  5. Integration test of source and tools
  6. updated ci

If this looks good to you, I can start work on this and follow up with a PR shortly.

pkalsi97 avatar Nov 11 '25 06:11 pkalsi97

Thanks for outlining the proposal. This looks like a great starting point.

To move forward, could you please provide a more detailed API specification for the proposed tools? Specifically, it would be helpful to understand the expected parameters for each of the gcloud-logging tools (query, stream, list-log-names, list-resources, and count).

A few other points to clarify:

  • The proposal mentions support for ADC and OAuth. Are there any other specific auth requirements or considerations we need to account for?
  • Could you elaborate on the configuration settings at the source level? Would it be feasible to have a phased approach, where Phase 1 includes all inputs at the tool level, and we can introduce source-level configurations in a later phase.
  • Your proposal includes tools for both querying and streaming. Could you elaborate on the expected behavior and use cases for each?
  • Your proposal mentions using the official Google Cloud Logging Go SDK. Are there also plans to call the REST API directly?

Once we have a clearer picture of these details, we can move forward with a more formal design and implementation plan.

CC: @averikitsch

anubhav756 avatar Nov 12 '25 10:11 anubhav756

@anubhav756 sure, I'll be providing details shortly.

pkalsi97 avatar Nov 12 '25 14:11 pkalsi97

Source

Your proposal mentions using the official Google Cloud Logging Go SDK. Are there also plans to call the REST API directly?

logadmin exposes only list/query operations. logging/apiv2 exposes the streaming RPC. These will provide ability to handle auth, retries, pagination etc. I think the use case can be fulfilled well by utilising the above and I don't see a need to call the REST API directly, incase I am missing out on anything please give your input on this.

The proposal mentions support for ADC and OAuth. Are there any other specific auth requirements or considerations we need to account for?

For Auth, ADC and OAuth seems sufficient but is we take a look at the client initialisation layer, they also support optional environment-driven options like Endpoint override, Scope override and Quota-project these will be necessary in case of transport and permission controls required in restricted networks/constrained credential setups.

Endpoint override -> public Google endpoints are blocked Scope override -> when issued credentials lack required permissions Quota-project -> when billing must be redirected

This is a minimal configuration required for both query and stream client.

sources:
  cloud-logging:
    kind: cloud-logging
    name: logs
    defaultProject: my-project

    useClientOAuth: false            # ADC vs user-token
    endpoint: ""                     # optional override
    scopes: []                       # optional override
    quotaProject: ""                 # optional override

Note: There will be two separate source types, one initializes a logadmin client, the other initializes a logging/apiv2 client.

Could you elaborate on the configuration settings at the source level? Would it be feasible to have a phased approach, where Phase 1 includes all inputs at the tool level, and we can introduce source-level configurations in a later phase.

Source Level inputs - project, auth, Endpoint override, Scope override, Quota-project Tool-level inputs - per-operation parameters only: filters, limits, time ranges, stream commands.

A phased approach is feasible IMO. Phase 1: we can put everything operational in the tools and keep the source minimal

Tools

A more detailed API specification for the proposed tools, expected parameters for each of the gcloud-logging tools (query, stream, list-log-names, list-resources, and count)

  1. query
// Input 
type QueryInput struct {
    Filter    string  `json:"filter"`
    Limit     int     `json:"limit"`
    Order     string  `json:"order,omitempty"`     
    StartTime *string `json:"startTime,omitempty"` 
    EndTime   *string `json:"endTime,omitempty"`   
}

// Output
type QueryOutput struct {
    Entries []QueryEntry `json:"entries"`
}

type QueryEntry struct {
    LogName   string                 `json:"logName"`
    Timestamp string                 `json:"timestamp"`
    Severity  string                 `json:"severity"`
    Resource  QueryResource          `json:"resource"`
    JSON      map[string]any         `json:"jsonPayload"`
    Text      string                 `json:"textPayload"`
}

type QueryResource struct {
    Type   string            `json:"type"`
    Labels map[string]string `json:"labels"`
}


  1. list-log-names
// Input
type ListLogNamesInput struct {
    Filter *string `json:"filter,omitempty"`
}

// Output
type ListLogNamesOutput struct {
    LogNames []string `json:"logNames"`
}
  1. list-resources
// Input
type ListResourcesInput struct{}

// Output
type ListResourcesOutput struct {
    Resources []ResourceDescriptor `json:"resources"`
}

type ResourceDescriptor struct {
    Type   string            `json:"type"`
    Labels map[string]string `json:"labels"`
}
  1. count
type CountInput struct {
    Filter    string  `json:"filter"`
    StartTime *string `json:"startTime,omitempty"`
    EndTime   *string `json:"endTime,omitempty"`
}

type CountOutput struct {
    Count int `json:"count"`
}
  1. Stream
// Input will handle start, read and stop
type StreamInput struct {
    Command string  `json:"command"`            // "start" | "read" | "stop"
    Filter  *string `json:"filter,omitempty"`   // only for start
    Cursor  *int    `json:"cursor,omitempty"`   // only for read
}

// Output
type StreamOutput struct {
    Status     string         `json:"status,omitempty"`     // start/stop
    NextCursor *int           `json:"nextCursor,omitempty"` // start/read
    Entries    []StreamEntry  `json:"entries,omitempty"`    // read
}

type StreamEntry struct {
    Seq       int                 `json:"seq"`
    LogName   string              `json:"logName"`
    Timestamp string              `json:"timestamp"`
    Severity  string              `json:"severity"`
    Resource  StreamResource      `json:"resource"`
    JSON      map[string]any      `json:"jsonPayload"`
    Text      string              `json:"textPayload"`
}

type StreamResource struct {
    Type   string            `json:"type"`
    Labels map[string]string `json:"labels"`
}

Note: As per my understanding Streaming is stateful and long lived, so we cannot implement a tool that starts streaming when the project initialises and only stops when project stops.

A single tool with start, read, stop provides the bridge:

  • start will open the TailLogEntries stream in a background goroutine owned by the source.
  • read will return buffered entries since the last cursor
  • stop will cancel the stream context and terminate the background goroutine, preventing ongoing billable reads.

Your proposal includes tools for both querying and streaming. Could you elaborate on the expected behavior and use cases for each?

Query: will pull a set of log based on filter and other inputs can be useful to access when agent needs a point in time inspection like analysing recent errors, extract patterns from a fixed slice of logs.

Stream: Can be used by agent to monitor events as they happen like deployment etc and trigger follow up actions, can react to operational signals (deployments, crashes, scaling events).

@anubhav756 @averikitsch Please Review this when ever you see fit, I am looking forward to your valuable input on this,

pkalsi97 avatar Nov 13 '25 08:11 pkalsi97

@anubhav756 It's been a while can you review this and give feedback ?

pkalsi97 avatar Nov 25 '25 05:11 pkalsi97

Thanks for the detailed proposal! The phased approach for configuration makes a lot of sense.

I have a few suggestions:

  • Ensure we follow the Tool Naming Conventions
    • In your configuration snippet, you correctly used source kind: cloud-logging, although in the initial message you mentioned gcloud-logging. Please ensure cloud-logging is used consistently as the source kind to match our other sources.
    • Make sure the tool kinds are also consistent. So instead of gcloud-logging-query, we should use cloud-logging-query.
    • Our tool naming guidelines suggest using underscores and omitting the product name since it's redundant. So the tool name should be list_log_names or even list_logs (vs list-log-names).
    • The directory could be internal/sources/cloudlogging/, matching internal/sources/cloudmonitoring/, etc.
  • For the actual implementation, can you use the internal/util/parameters package to define the tool inputs for Toolbox to correctly parse and validate arguments from the LLM?
  • The start/read/stop pattern for the streaming tool is interesting. A couple of questions on the implementation details:
    • How will we handle multiple concurrent streams? If start is called twice, does it spawn two background goroutines?
    • Since the Source instance is long-lived, we might also need a mechanism to map a specific "session" or "cursor" to the correct background stream if we want to support multiple concurrent users/agents. Thoughts?
    • For the read command, do we have an eviction policy or a limit for the buffer to prevent memory issues if read isn't called frequently enough?
  • Is it possible to ensure the client initialization respects util.UserAgentFromContext(ctx) so we can track usage correctly?

Overall, this looks good to me. Please feel free to start with the source and the query tools. :)

CC: @averikitsch @Yuan325

anubhav756 avatar Dec 02 '25 11:12 anubhav756

@anubhav756 Thanks for the review and feedback, I'll start implementing this promptly!

  1. Noted, I'll keep in mind the tool naming conventions, I might have mixed up the naming in the two proposals 😄
  2. Regarding internal/util/parameters -> I was aware of this, but I added tags in my proposal just for better understanding.
  3. util.UserAgentFromContext(ctx) yes implementation will respect it.

Lastly regarding streaming tool:

Stream ownership :

  1. start → allocates its own goroutine + buffer + context and returns a unique sessionId.
  2. read/stop → resolved only through that sessionId, this shall prevent cross-stream collision.

Question: what if agent losses the sessionID ? This will result in an orphan and we cannot rely on the client/agent side to ensure stop every time.

As per my understanding we can introduce a Idle-timeout and if a read operation happens before Idle-timeout session resumes else If no read arrives before the timeout window, the session self-terminates, releasing the goroutine and buffer.

Idle-timeout will definitely have a default value and I think it can also be overridden by passing it as a function parameter when start is called, what do you think ?

Working

  1. Source and session
Source -> map[sessionId]*Session

Session (per active stream)
    ctx + cancel
    goroutine (TailLogEntries)
    ringBuffer (fixed size)
    seqCounter
  1. Start -> returns sessionId
Source.sessions[sessionId] = &Session{...}
Session.goroutine starts
Session.lastReadTime = now
  1. Read + sessionId
lookup = Source.sessions[sessionId]
validate cursor against lookup.ringBuffer
return entries
lookup.lastReadTime = now
  1. stop + sessionId
lookup = Source.sessions[sessionId]
lookup.cancel()                // terminates goroutine
delete Source.sessions[id]    // frees buffer + state
  1. idle-timeout watcher
for each session in Source.sessions:
    if now - session.lastReadTime > idleTimeout:
        session.cancel()
        delete Source.sessions[sessionId]

Does an implementation like this resolve your concerns around ownership and memory management?

pkalsi97 avatar Dec 02 '25 12:12 pkalsi97

Thanks for the clarifications @pkalsi97. This sounds great to me!

Yes if we can be sure that there is a default timeout which would avoid orphan sessions then that should be good.

I think the plan looks good to start with the implementation 🙂

CC: @Yuan325 @averikitsch @kurtisvg

anubhav756 avatar Dec 03 '25 07:12 anubhav756

@anubhav756 Thanks! Working on it.

pkalsi97 avatar Dec 03 '25 07:12 pkalsi97

Thanks @pkalsi97 A couple of thoughts:

  1. For the first milestone I would suggest no additional endpoints - unless we have specific requirements: Endpoint override -> public Google endpoints are blocked Scope override -> when issued credentials lack required permissions Quota-project -> when billing must be redirected
  2. We've seen more MCP clients don't support streaming right now so for the first milestone I would omit this functionality.
  3. Could you outline the tools and the associated API requests?

averikitsch avatar Dec 04 '25 20:12 averikitsch

Sure for the first milestone I'll follow as suggested. Based on testing implementation locally I have settled on the following approach.

Overview:

Source: cloud-logging-admin (using the logadmin sdk) I am considering implementing the source inline with bigQuery implementation

  • If ADC then initialise the client in the source itself
  • If Oauth, use Client-per-token, ClientCreator, ClientCache, RestCache

In every tool call:

  • ADC: t.Client is ready at init
  • OAuth: t.ClientCreator(token) → creates client per-request (cached) After that we can use the same SDK calls for both.

Tools:

  1. list_resources_types

Returns : []string of resource types

Get Iterator using -> client.ResourceDescriptors(ctx)

resourceType := []string
loop:
    d, err = iterator.Next()
    if err == Done → exit loop
    if err → fail
    append(types, d.Type)
return resourceType
  1. list_log_names_by_filter

Returns : []string of log names

inputs:
    resourceType → optional string
    prefix       → optional string
    startTime    → optional time
    max          → int

// Prepare filter
filter = join(all non-empty filter parts) with AND

iterator := client.Entries(ctx, logadmin.Filter(filter), logadmin.NewestFirst())

seen = {}
result = []

while result.size < max:
    entry, err = iterator.Next()
    if err == Done → break
    if err → fail

    if entry.LogName not in seen:
        add to seen
        append to result

if result empty → fail

return result

  1. count_log_Entries

Returns : int count for a given log name

inputs:
    logName        → string (required)
    inputTimeWindow → duration (optional; defaults to 30 days if ≤ 0)

// Prepare filter
filter = `logName="<logName>" AND timestamp >= "<startTime>"`

iterator = client.Entries(ctx, logadmin.Filter(filter))

count = 0
    loop:
        entry, err = iterator.Next()
        if err == Done → break
        if err → fail
        count++

    return count
  1. query_logs_by_filter I am thinking of using a summary_fields parameter as an optional input, this shall allow callers to whitelist specific fields in the response (e.g., timestamp, severity, payload). Let me know what you think.
inputs:
    filter   → optional string
    limit    → int
    summaryFields → optional []string 


    iterator = client.Entries(ctx, opts...)

    entries = []
    loop:
        if limit > 0 and len(entries) >= limit: break
        entry, err = iterator.Next()
        if err == Done: break
        if err: fail with err
        append entry to entries

    return entries

@averikitsch cc: @anubhav756

pkalsi97 avatar Dec 05 '25 07:12 pkalsi97