Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion sdk/go/trainloop-llm-logging/go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,82 @@
module github.com/trainloop/evals/sdk/go/trainloop-llm-logging

go 1.20
go 1.24

toolchain go1.24.2

require gopkg.in/yaml.v3 v3.0.1

require (
cel.dev/expr v0.24.0 // indirect
cloud.google.com/go v0.121.4 // indirect
cloud.google.com/go/auth v0.16.3 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.7.0 // indirect
cloud.google.com/go/iam v1.5.2 // indirect
cloud.google.com/go/monitoring v1.24.2 // indirect
cloud.google.com/go/storage v1.55.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect
github.com/aws/aws-sdk-go v1.55.7 // indirect
github.com/aws/aws-sdk-go-v2 v1.36.5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect
github.com/aws/aws-sdk-go-v2/config v1.29.17 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.70 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.84 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.17 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.84.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 // indirect
github.com/aws/smithy-go v1.22.4 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-jose/go-jose/v4 v4.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/wire v0.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
github.com/zeebo/errs v1.4.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.37.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
gocloud.dev v0.43.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.242.0 // indirect
google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
)
203 changes: 203 additions & 0 deletions sdk/go/trainloop-llm-logging/go.sum

Large diffs are not rendered by default.

248 changes: 248 additions & 0 deletions sdk/go/trainloop-llm-logging/internal/storage/cloud_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package storage

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/s3blob"

"github.com/trainloop/evals/sdk/go/trainloop-llm-logging/internal/logger"
"github.com/trainloop/evals/sdk/go/trainloop-llm-logging/internal/types"
)

var log = logger.CreateLogger("trainloop-cloud-storage")

// openBucket opens a blob bucket for the given path
func openBucket(ctx context.Context, dir string) (*blob.Bucket, error) {
switch {
case strings.HasPrefix(dir, "s3://"),
strings.HasPrefix(dir, "gs://"),
strings.HasPrefix(dir, "gcp://"),
strings.HasPrefix(dir, "file://"):
return blob.OpenBucket(ctx, dir)
default:
// bare local path ⇒ turn it into file:// URL
abs, err := filepath.Abs(dir)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path: %w", err)
}
abs = filepath.ToSlash(abs)
return blob.OpenBucket(ctx, "file://"+abs)
}
}

// nowISO returns current time in ISO 8601 format
func nowISO() string {
return time.Now().UTC().Format(time.RFC3339)
}

// UpdateRegistry updates the _registry.json file using cloud storage
func UpdateRegistry(ctx context.Context, dataDir string, loc types.LLMCallLocation, tag string) error {
log.Debug("Updating registry with cloud storage: %s", dataDir)

bucket, err := openBucket(ctx, dataDir)
if err != nil {
return fmt.Errorf("failed to open bucket: %w", err)
}
defer bucket.Close()

registryKey := "_registry.json"

// Load existing registry
var reg types.Registry
exists, err := bucket.Exists(ctx, registryKey)
if err != nil {
log.Warn("Error checking if registry exists: %v", err)
}

if exists {
data, err := bucket.ReadAll(ctx, registryKey)
if err != nil {
log.Warn("Failed to read existing registry: %v", err)
} else if len(data) == 0 || string(data) == "{}" {
log.Debug("Registry file is empty or '{}', initializing new registry.")
reg.Schema = 1
reg.Files = make(map[string]map[string]types.RegistryEntry)
} else if err := json.Unmarshal(data, &reg); err != nil {
log.Error("Corrupt registry file - recreating. Error: %v", err)
reg.Schema = 1
reg.Files = make(map[string]map[string]types.RegistryEntry)
} else if reg.Files == nil {
reg.Files = make(map[string]map[string]types.RegistryEntry)
}
} else {
log.Debug("Registry file not found, creating new registry.")
reg.Schema = 1
reg.Files = make(map[string]map[string]types.RegistryEntry)
}

// Sanitize location
if loc.File == "" {
loc.File = "unknown"
}
if loc.LineNumber == "" {
loc.LineNumber = "0"
}

// Update registry
if _, ok := reg.Files[loc.File]; !ok {
reg.Files[loc.File] = make(map[string]types.RegistryEntry)
}

fileEntries := reg.Files[loc.File]
now := nowISO()

entry, exists := fileEntries[loc.LineNumber]
if exists {
if entry.Tag != tag {
entry.Tag = tag
}
entry.LastSeen = now
entry.Count++
} else {
entry = types.RegistryEntry{
LineNumber: loc.LineNumber,
Tag: tag,
FirstSeen: now,
LastSeen: now,
Count: 1,
}
}
fileEntries[loc.LineNumber] = entry

// Write registry back to storage
data, err := json.MarshalIndent(reg, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal registry: %w", err)
}

w, err := bucket.NewWriter(ctx, registryKey, nil)
if err != nil {
return fmt.Errorf("failed to create writer for registry: %w", err)
}

if _, err := w.Write(data); err != nil {
w.Close()
return fmt.Errorf("failed to write registry data: %w", err)
}

if err := w.Close(); err != nil {
return fmt.Errorf("failed to close registry writer: %w", err)
}

log.Debug("Registry written to cloud storage - %s:%s = %s (count=%d)", loc.File, loc.LineNumber, entry.Tag, entry.Count)
return nil
}

// SaveSamples appends samples to a JSONL event file using cloud storage
func SaveSamples(ctx context.Context, dataDir string, samples []types.CollectedSample) error {
if len(samples) == 0 {
return nil
}

log.Debug("Saving %d samples to cloud storage: %s", len(samples), dataDir)

bucket, err := openBucket(ctx, dataDir)
if err != nil {
return fmt.Errorf("failed to open bucket: %w", err)
}
defer bucket.Close()

nowMs := time.Now().UnixNano() / 1e6
windowMs := int64(10 * 60 * 1000) // 10 minutes in milliseconds

// Find the most recent timestamped file
var latestTimestampMs int64 = 0

// List files in events/ directory
eventPrefix := "events/"
iter := bucket.List(&blob.ListOptions{Prefix: eventPrefix})
var timestamps []int64

for {
obj, err := iter.Next(ctx)
if err != nil {
break // End of list or error
}

if strings.HasSuffix(obj.Key, ".jsonl") {
filename := strings.TrimPrefix(obj.Key, eventPrefix)
tsStr := strings.TrimSuffix(filename, ".jsonl")
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err == nil {
timestamps = append(timestamps, ts)
}
}
}

if len(timestamps) > 0 {
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] > timestamps[j] })
latestTimestampMs = timestamps[0]
}

targetTimestampMs := nowMs
if latestTimestampMs > 0 && (nowMs-latestTimestampMs) < windowMs {
targetTimestampMs = latestTimestampMs
}

// Prepare JSONL content
var lines []string
for _, s := range samples {
jsonData, err := json.Marshal(s)
if err != nil {
log.Warn("Failed to marshal sample to JSON: %v, Sample: %+v", err, s)
continue
}
lines = append(lines, string(jsonData))
}

if len(lines) == 0 {
return nil
}

content := strings.Join(lines, "\n") + "\n"
eventKey := fmt.Sprintf("events/%d.jsonl", targetTimestampMs)

// Check if file exists, if so append to it
exists, err := bucket.Exists(ctx, eventKey)
if err != nil {
log.Warn("Error checking if event file exists: %v", err)
}

if exists {
// Read existing content and append
existingData, err := bucket.ReadAll(ctx, eventKey)
if err != nil {
log.Warn("Failed to read existing event file: %v", err)
} else {
content = string(existingData) + content
}
}

// Write the content
w, err := bucket.NewWriter(ctx, eventKey, nil)
if err != nil {
return fmt.Errorf("failed to create writer for event file: %w", err)
}

if _, err := w.Write([]byte(content)); err != nil {
w.Close()
return fmt.Errorf("failed to write event data: %w", err)
}

if err := w.Close(); err != nil {
return fmt.Errorf("failed to close event writer: %w", err)
}

log.Debug("Saved %d samples to cloud storage: %s", len(samples), eventKey)
return nil
}
Loading
Loading