close
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
914b098
suggested improvements for instrument tracing
birschick-bq May 14, 2025
b905383
correct lint error
birschick-bq May 14, 2025
6e55063
Merge branch 'main' into dev/birschick-bq/go-otel-instrumentation
birschick-bq May 20, 2025
f74fbea
Merge branch 'main' into dev/birschick-bq/go-otel-instrumentation
birschick-bq May 28, 2025
5fef80d
more instrumentation
birschick-bq May 28, 2025
89df674
more instrumentation
birschick-bq May 28, 2025
dbf395d
Merge branch 'main' into dev/birschick-bq/go-otel-instrumentation
birschick-bq May 28, 2025
aad451c
correct event name text
birschick-bq May 28, 2025
d19d537
correct event name text
birschick-bq May 28, 2025
1d04237
revert unused changes
birschick-bq May 29, 2025
f933c2b
revert unused changes 2
birschick-bq May 29, 2025
3e0680e
code review changes to flatten instrumentation
birschick-bq May 29, 2025
67ab733
correct lint issue
birschick-bq May 29, 2025
a08e550
improvements
birschick-bq May 30, 2025
9586687
improvements 2
birschick-bq May 30, 2025
374e176
code review - semantic convention keys
birschick-bq May 30, 2025
686803e
remove duplicate code
birschick-bq May 30, 2025
1048915
moved StartSpan/EndSpan to driver/internal
birschick-bq May 30, 2025
88d05c4
removed unused change
birschick-bq May 30, 2025
cc81119
improve error handling
birschick-bq Jun 2, 2025
935ee6e
improve error handling 2
birschick-bq Jun 2, 2025
fe221e4
update github.com/snowflakedb/gosnowflake to version v1.14.1 - add in…
birschick-bq Jun 4, 2025
f3078c0
revise handling of setting trace attributes for driver info
birschick-bq Jun 4, 2025
aafc4bf
remove ToLower
birschick-bq Jun 5, 2025
0b02e3e
Merge branch 'main' into dev/birschick-bq/go-otel-instrumentation
birschick-bq Jun 5, 2025
3039d05
PR comment changes for attribute.Key and error handling
birschick-bq Jun 6, 2025
aaccb98
Merge branch 'main' into dev/birschick-bq/go-otel-instrumentation
birschick-bq Jun 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions go/adbc/driver/internal/driverbase/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
"strings"

"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/internal"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -148,7 +151,9 @@ func (base *ConnectionImplBase) Rollback(context.Context) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "Rollback")
}

func (base *ConnectionImplBase) GetInfo(ctx context.Context, infoCodes []adbc.InfoCode) (array.RecordReader, error) {
func (base *ConnectionImplBase) GetInfo(ctx context.Context, infoCodes []adbc.InfoCode) (reader array.RecordReader, err error) {
_, span := internal.StartSpan(ctx, "ConnectionImplBase.GetInfo", base)
defer internal.EndSpan(span, err)

if len(infoCodes) == 0 {
infoCodes = base.DriverInfo.InfoSupportedCodes()
Expand Down Expand Up @@ -198,13 +203,16 @@ func (base *ConnectionImplBase) GetInfo(ctx context.Context, infoCodes []adbc.In
boolInfoBldr.AppendNull()
}
default:
return nil, fmt.Errorf("no defined type code for info_value of type %T", v)
err = fmt.Errorf("no defined type code for info_value of type %T", v)
return nil, err
}
}

final := bldr.NewRecord()
defer final.Release()
return array.NewRecordReader(adbc.GetInfoSchema, []arrow.Record{final})

reader, err = array.NewRecordReader(adbc.GetInfoSchema, []arrow.Record{final})
return reader, err
}

func (base *ConnectionImplBase) Close() error {
Expand Down Expand Up @@ -365,6 +373,23 @@ func (cnxn *ConnectionImplBase) StartSpan(
return cnxn.Tracer.Start(ctx, spanName, opts...)
}

func (cnxn *ConnectionImplBase) GetInitialSpanAttributes() []attribute.KeyValue {
return getInitialSpanAttributes(cnxn.DriverInfo)
}

func getInitialSpanAttributes(driverInfo *DriverInfo) []attribute.KeyValue {
var attrs []attribute.KeyValue
var systemName = driverInfo.GetName()
if value, ok := driverInfo.GetInfoForInfoCode(adbc.InfoVendorName); ok {
if s, ok := value.(string); ok {
systemName = s
}
}
attrs = append(attrs, semconv.DBSystemNameKey.String(systemName))

return attrs
}

// GetObjects implements Connection.
func (cnxn *connection) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (array.RecordReader, error) {
helper := cnxn.dbObjectsEnumerator
Expand Down
41 changes: 37 additions & 4 deletions go/adbc/driver/internal/driverbase/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package driverbase

import (
"context"
"errors"
"log/slog"
"os"
"strings"
Expand All @@ -28,12 +29,13 @@ import (
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-go/v18/arrow/memory"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
Comment thread
birschick-bq marked this conversation as resolved.
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -105,6 +107,7 @@ type DatabaseImplBase struct {
Tracer trace.Tracer

tracerShutdownFunc func(context.Context) error
traceParent string
}

// NewDatabaseImplBase instantiates DatabaseImplBase.
Expand Down Expand Up @@ -184,6 +187,27 @@ func (base *DatabaseImplBase) SetOptions(options map[string]string) error {
return nil
}

func (d *DatabaseImplBase) GetInitialSpanAttributes() []attribute.KeyValue {
return getInitialSpanAttributes(d.DriverInfo)
}

func (d *DatabaseImplBase) GetTraceParent() (traceParent string) {
return d.traceParent
}

func (d *DatabaseImplBase) SetTraceParent(traceParent string) {
d.traceParent = traceParent
}

func (d *DatabaseImplBase) StartSpan(
ctx context.Context,
spanName string,
opts ...trace.SpanStartOption,
) (context.Context, trace.Span) {
ctx, _ = maybeAddTraceParent(ctx, d, nil)
return d.Tracer.Start(ctx, spanName, opts...)
}

// database is the implementation of adbc.Database.
type database struct {
DatabaseImpl
Expand Down Expand Up @@ -384,19 +408,28 @@ func newAdbcFileExporter(driverName string) (*stdouttrace.Exporter, error) {

func newTracerProvider(exporters ...sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) {
// Ensure default SDK resource and the required service name are set.
mergedResource, err := resource.Merge(
tracerResource, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(driverNamespace),
),
)
if err != nil {
return nil, err
if errors.Is(err, resource.ErrSchemaURLConflict) {
// If unable to merge with the default resource (conflicting ShhemaURL),
// use just our resource
tracerResource = resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(driverNamespace),
)
} else {
return nil, err
}
}

opts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(mergedResource),
sdktrace.WithResource(tracerResource),
}
for _, exporter := range exporters {
opts = append(opts, sdktrace.WithBatcher(exporter))
Expand Down
58 changes: 58 additions & 0 deletions go/adbc/driver/internal/driverbase/driver_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sort"

"github.com/apache/arrow-adbc/go/adbc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
Expand All @@ -43,6 +45,39 @@ var infoValueTypeCodeForInfoCode = map[adbc.InfoCode]adbc.InfoValueTypeCode{
adbc.InfoVendorSubstraitMaxVersion: adbc.InfoValueStringType,
}

const (
// namespace prefix
otelInfoSemConv attribute.Key = "apache.arrow.adbc.info."

// The database vendor/product name (e.g. the server name) (type: utf8)
otelSemConvInfoVendorName attribute.Key = otelInfoSemConv + "vendor.name"
// The database vendor/product version (type: utf8)
otelSemConvInfoVendorVersion attribute.Key = otelInfoSemConv + "vendor.version"
// The database vendor/product Arrow library version (type: utf8)
otelSemConvInfoVendorArrowVersion attribute.Key = otelInfoSemConv + "vendor.arrow.version"
// Indicates whether SQL queries are supported (type: bool).
otelSemConvInfoVendorSql attribute.Key = otelInfoSemConv + "vendor.sql"
// The driver name (type: utf8)
otelSemConvInfoDriverName attribute.Key = otelInfoSemConv + "driver.name"
// The driver version (type: utf8)
otelSemConvInfoDriverVersion attribute.Key = otelInfoSemConv + "driver.version"
// The driver Arrow library version (type: utf8)
otelSemConvInfoDriverArrowVersion attribute.Key = otelInfoSemConv + "driver.arrow.version"
// The driver ADBC API version (type: int64)
otelSemConvInfoDriverAdbcVersion attribute.Key = otelInfoSemConv + "driver.adbc.version"
)

var otelAttrForInfoCode = map[adbc.InfoCode]attribute.Key{
adbc.InfoVendorName: otelSemConvInfoVendorName,
adbc.InfoVendorVersion: otelSemConvInfoVendorVersion,
adbc.InfoVendorArrowVersion: otelSemConvInfoVendorArrowVersion,
adbc.InfoDriverName: otelSemConvInfoDriverName,
adbc.InfoDriverVersion: otelSemConvInfoDriverVersion,
adbc.InfoDriverArrowVersion: otelSemConvInfoDriverArrowVersion,
adbc.InfoDriverADBCVersion: otelSemConvInfoDriverAdbcVersion,
adbc.InfoVendorSql: otelSemConvInfoVendorSql,
}

func DefaultDriverInfo(name string) *DriverInfo {
defaultInfoVendorName := name
defaultInfoDriverName := fmt.Sprintf("ADBC %s Driver - Go", name)
Expand Down Expand Up @@ -121,3 +156,26 @@ func (di *DriverInfo) GetInfoForInfoCode(code adbc.InfoCode) (any, bool) {
val, ok := di.info[code]
return val, ok
}

func SetOTelDriverInfoAttributes(driverInfo *DriverInfo, span trace.Span) {
attrs := []attribute.KeyValue{}
codes := driverInfo.InfoSupportedCodes()
for _, code := range codes {
if attr, ok := otelAttrForInfoCode[code]; ok {
if attrVal, ok := driverInfo.GetInfoForInfoCode(code); ok {
if attrVal == nil {
continue
}
switch v := attrVal.(type) {
case string:
attrs = append(attrs, attr.String(v))
case bool:
attrs = append(attrs, attr.Bool(v))
case int64:
attrs = append(attrs, attr.Int64(v))
}
}
}
}
span.SetAttributes(attrs...)
}
9 changes: 6 additions & 3 deletions go/adbc/driver/internal/driverbase/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"github.com/apache/arrow-adbc/go/adbc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -123,8 +124,10 @@ func (st *StatementImplBase) StartSpan(
spanName string,
opts ...trace.SpanStartOption,
) (context.Context, trace.Span) {
var span trace.Span
ctx, _ = maybeAddTraceParent(ctx, st.cnxn, st)
ctx, span = st.Tracer.Start(ctx, spanName, opts...)
return ctx, span
return st.Tracer.Start(ctx, spanName, opts...)
}

func (st *StatementImplBase) GetInitialSpanAttributes() []attribute.KeyValue {
return st.cnxn.GetInitialSpanAttributes()
}
28 changes: 23 additions & 5 deletions go/adbc/driver/internal/shared_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -745,14 +745,32 @@ func ToXdbcDataType(dt arrow.DataType) (xdbcType XdbcDataType) {
}
}

func SetErrorOnSpan(span trace.Span, err error) bool {
// Starts a trace.Span with the given spanName for the tracing object with
// the given ctx context.
func StartSpan(ctx context.Context, spanName string, tracing adbc.OTelTracing, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
if tracing == nil {
return ctx, trace.SpanFromContext(ctx)
}

attrs := tracing.GetInitialSpanAttributes()
attrs = append(attrs, semconv.DBOperationName(spanName))
opts = append(opts, trace.WithAttributes(attrs...))

return tracing.StartSpan(ctx, spanName, opts...)
}

// Ends the given span. If err is not nil, then the
// error is recorded and the status is set appropriately.
// Otherwise, the status is set to Ok.
func EndSpan(span trace.Span, err error, options ...trace.SpanEndOption) {
if err != nil {
span.RecordError(err)
if adbcError, ok := err.(adbc.Error); ok {
span.SetAttributes(attribute.String("error.type", adbcError.Code.String()))
span.SetAttributes(semconv.ErrorTypeKey.String(adbcError.Code.String()))
}
span.SetStatus(codes.Error, err.Error())
return true
} else {
span.SetStatus(codes.Ok, "")
}
return false
span.End(options...)
}
Loading
Loading