Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dsm): implement avro schemas for avsc package #4726

Merged
merged 41 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
eb12e15
add dsm schema tracking
wconti27 Sep 13, 2024
87c5e5a
add protobuf schemas support for DSM
wconti27 Sep 18, 2024
4b187c4
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 19, 2024
6b8b0b3
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 19, 2024
a9cf1fc
revert fetch change
wconti27 Sep 19, 2024
38c1419
fix tests
wconti27 Sep 19, 2024
214a679
fix test
wconti27 Sep 19, 2024
c5e65b2
more testing
wconti27 Sep 19, 2024
be68bd5
clean up tests
wconti27 Sep 19, 2024
8186fbd
ensure shcmeas only added if dsm is enabled
wconti27 Sep 19, 2024
7f56ce0
more tests
wconti27 Sep 23, 2024
40a2fe8
increase schema sampler cache
wconti27 Sep 23, 2024
4fa3e81
fix sampling
wconti27 Sep 24, 2024
79e4945
fix sampling
wconti27 Sep 24, 2024
6028c36
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 24, 2024
af12edb
fix protobuf google-api
wconti27 Sep 24, 2024
48fa7ff
using tracing channel and abstract schemaPlugin
wconti27 Sep 25, 2024
d61cdbc
implement avsc schemas
wconti27 Sep 25, 2024
0cf5246
Merge branch 'master' into conti/implement-avro-schemas
wconti27 Sep 25, 2024
2748d35
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 26, 2024
237c316
only use specific trace channels
wconti27 Sep 27, 2024
6b01e44
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 27, 2024
3709f7a
merge
wconti27 Sep 27, 2024
3f07fe0
Merge branch 'conti/support-protobuf-schemas' into conti/implement-av…
wconti27 Sep 27, 2024
1ec86fe
revert tracingChannel change
wconti27 Sep 27, 2024
1f42aab
fix channels
wconti27 Sep 27, 2024
da15fc1
fix proto lint
wconti27 Oct 3, 2024
8836d37
Merge branch 'conti/support-protobuf-schemas' into conti/implement-av…
wconti27 Oct 3, 2024
dc264a7
clean up files
wconti27 Oct 3, 2024
7505359
use normal channel not tracing channel
wconti27 Oct 3, 2024
b294d0e
more cleanup
wconti27 Oct 3, 2024
0882cf8
Merge branch 'conti/support-protobuf-schemas' into conti/implement-av…
wconti27 Oct 3, 2024
a376885
with versions
wconti27 Oct 3, 2024
2adeb42
fix protobuf tests and use version range for testing
wconti27 Oct 3, 2024
3f1d5fd
Merge branch 'conti/support-protobuf-schemas' into conti/implement-av…
wconti27 Oct 3, 2024
4424307
fix avsc tests
wconti27 Oct 3, 2024
af18e69
add necessary docs
wconti27 Oct 3, 2024
00c4ade
Merge branch 'conti/support-protobuf-schemas' into conti/implement-av…
wconti27 Oct 3, 2024
37c94d4
add avsc docs
wconti27 Oct 3, 2024
0343bca
Merge branch 'master' into conti/implement-avro-schemas
wconti27 Oct 4, 2024
9c23e18
dont use try catch and instead log schema failure to extract field
wconti27 Oct 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ jobs:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test-and-upstream

avsc:
runs-on: ubuntu-latest
env:
PLUGINS: avsc
DD_DATA_STREAMS_ENABLED: true
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test-and-upstream

aws-sdk:
strategy:
matrix:
Expand Down
2 changes: 2 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracer.use('pg', {
<h5 id="amqplib"></h5>
<h5 id="amqplib-tags"></h5>
<h5 id="amqplib-config"></h5>
<h5 id="avsc"></h5>
<h5 id="aws-sdk"></h5>
<h5 id="aws-sdk-tags"></h5>
<h5 id="aws-sdk-config"></h5>
Expand Down Expand Up @@ -102,6 +103,7 @@ tracer.use('pg', {

* [amqp10](./interfaces/export_.plugins.amqp10.html)
* [amqplib](./interfaces/export_.plugins.amqplib.html)
* [avsc](./interfaces/export_.plugins.avsc.html)
* [aws-sdk](./interfaces/export_.plugins.aws_sdk.html)
* [bluebird](./interfaces/export_.plugins.bluebird.html)
* [couchbase](./interfaces/export_.plugins.couchbase.html)
Expand Down
1 change: 1 addition & 0 deletions docs/add-redirects.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ echo "writing redirects..."
declare -a plugins=(
"amqp10"
"amqplib"
"avsc"
"aws_sdk"
"bluebird"
"couchbase"
Expand Down
1 change: 1 addition & 0 deletions docs/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ const openSearchOptions: plugins.opensearch = {

tracer.use('amqp10');
tracer.use('amqplib');
tracer.use('avsc');
tracer.use('aws-sdk');
tracer.use('aws-sdk', awsSdkOptions);
tracer.use('bunyan');
Expand Down
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ interface Plugins {
"amqp10": tracer.plugins.amqp10;
"amqplib": tracer.plugins.amqplib;
"apollo": tracer.plugins.apollo;
"avsc": tracer.plugins.avsc;
"aws-sdk": tracer.plugins.aws_sdk;
"bunyan": tracer.plugins.bunyan;
"cassandra-driver": tracer.plugins.cassandra_driver;
Expand Down Expand Up @@ -1192,6 +1193,12 @@ declare namespace tracer {
signature?: boolean;
}

/**
* This plugin automatically patches the [avsc](https://github.com/mtth/avsc) module
* to collect avro message schemas when Datastreams Monitoring is enabled.
*/
interface avsc extends Integration {}

/**
* This plugin automatically instruments the
* [aws-sdk](https://github.com/aws/aws-sdk-js) module.
Expand Down
37 changes: 37 additions & 0 deletions packages/datadog-instrumentations/src/avsc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const shimmer = require('../../datadog-shimmer')
const { addHook } = require('./helpers/instrument')

const dc = require('dc-polyfill')
const serializeChannel = dc.channel('apm:avsc:serialize-start')
const deserializeChannel = dc.channel('apm:avsc:deserialize-end')

function wrapSerialization (Type) {
shimmer.wrap(Type.prototype, 'toBuffer', original => function () {
if (!serializeChannel.hasSubscribers) {
return original.apply(this, arguments)
}
serializeChannel.publish({ messageClass: this })
return original.apply(this, arguments)
})
}

function wrapDeserialization (Type) {
shimmer.wrap(Type.prototype, 'fromBuffer', original => function () {
if (!deserializeChannel.hasSubscribers) {
return original.apply(this, arguments)
}
const result = original.apply(this, arguments)
deserializeChannel.publish({ messageClass: result })
return result
})
}

addHook({
name: 'avsc',
versions: ['>=5.0.0']
}, avro => {
wrapDeserialization(avro.Type)
wrapSerialization(avro.Type)

return avro
})
1 change: 1 addition & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module.exports = {
aerospike: () => require('../aerospike'),
amqp10: () => require('../amqp10'),
amqplib: () => require('../amqplib'),
avsc: () => require('../avsc'),
'aws-sdk': () => require('../aws-sdk'),
bluebird: () => require('../bluebird'),
'body-parser': () => require('../body-parser'),
Expand Down
9 changes: 9 additions & 0 deletions packages/datadog-plugin-avsc/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const SchemaPlugin = require('../../dd-trace/src/plugins/schema')
const SchemaExtractor = require('./schema_iterator')

class AvscPlugin extends SchemaPlugin {
static get id () { return 'avsc' }
static get schemaExtractor () { return SchemaExtractor }
}

module.exports = AvscPlugin
169 changes: 169 additions & 0 deletions packages/datadog-plugin-avsc/src/schema_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
const AVRO = 'avro'
const {
SCHEMA_DEFINITION,
SCHEMA_ID,
SCHEMA_NAME,
SCHEMA_OPERATION,
SCHEMA_WEIGHT,
SCHEMA_TYPE
} = require('../../dd-trace/src/constants')
const log = require('../../dd-trace/src/log')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')

class SchemaExtractor {
constructor (schema) {
this.schema = schema
}

static getType (type) {
const typeMapping = {
string: 'string',
int: 'integer',
long: 'integer',
float: 'number',
double: 'number',
boolean: 'boolean',
bytes: 'string',
record: 'object',
enum: 'string',
array: 'array',
map: 'object',
fixed: 'string'
}
const typeName = type.typeName ?? type.name ?? type
return typeName === 'null' ? typeName : typeMapping[typeName] || 'string'
}

static extractProperty (field, schemaName, fieldName, builder, depth) {
let array = false
let type
let format
let enumValues
let description
let ref

const fieldType = field.type?.types ?? field.type?.typeName ?? field.type

if (Array.isArray(fieldType)) {
// Union Type
type = 'union[' + fieldType.map(t => SchemaExtractor.getType(t.type || t)).join(',') + ']'
} else if (fieldType === 'array') {
// Array Type
array = true
const nestedType = field.type.itemsType.typeName
type = SchemaExtractor.getType(nestedType)
} else if (fieldType === 'record') {
// Nested Record Type
type = 'object'
ref = `#/components/schemas/${field.type.name}`
if (!SchemaExtractor.extractSchema(field.type, builder, depth + 1, this)) {
return false
}
} else if (fieldType === 'enum') {
enumValues = []
let i = 0
type = 'string'
while (field.type.symbols[i]) {
enumValues.push(field.type.symbols[i])
i += 1
}
} else {
// Primitive type
type = SchemaExtractor.getType(fieldType.type || fieldType)
if (fieldType === 'bytes') {
format = 'byte'
} else if (fieldType === 'int') {
format = 'int32'
} else if (fieldType === 'long') {
format = 'int64'
} else if (fieldType === 'float') {
format = 'float'
} else if (fieldType === 'double') {
format = 'double'
}
}

return builder.addProperty(schemaName, fieldName, array, type, description, ref, format, enumValues)
}

static extractSchema (schema, builder, depth, extractor) {
depth += 1
const schemaName = schema.name
if (extractor) {
// if we already have a defined extractor, this is a nested schema. create a new extractor for the nested
// schema, ensure it is added to our schema builder's cache, and replace the builders iterator with our
// nested schema iterator / extractor. Once complete, add the new schema to our builder's schemas.
const nestedSchemaExtractor = new SchemaExtractor(schema)
builder.iterator = nestedSchemaExtractor
const nestedSchema = SchemaBuilder.getSchema(schemaName, nestedSchemaExtractor, builder)
for (const nestedSubSchemaName in nestedSchema.components.schemas) {
if (nestedSchema.components.schemas.hasOwnProperty(nestedSubSchemaName)) {
builder.schema.components.schemas[nestedSubSchemaName] = nestedSchema.components.schemas[nestedSubSchemaName]
}
}
return true
} else {
if (!builder.shouldExtractSchema(schemaName, depth)) {
return false
}
for (const field of schema.fields) {
if (!this.extractProperty(field, schemaName, field.name, builder, depth)) {
log.warn(`DSM: Unable to extract field with name: ${field.name} from Avro schema with name: ${schemaName}`)
}
}
}
return true
}

static extractSchemas (descriptor, dataStreamsProcessor) {
return dataStreamsProcessor.getSchema(descriptor.name, new SchemaExtractor(descriptor))
}

iterateOverSchema (builder) {
this.constructor.extractSchema(this.schema, builder, 0)
}

static attachSchemaOnSpan (args, span, operation, tracer) {
const { messageClass } = args
const descriptor = messageClass?.constructor?.type ?? messageClass

if (!descriptor || !span) {
return
}

if (span.context()._tags[SCHEMA_TYPE] && operation === 'serialization') {
// we have already added a schema to this span, this call is an encode of nested schema types
return
}

span.setTag(SCHEMA_TYPE, AVRO)
span.setTag(SCHEMA_NAME, descriptor.name)
span.setTag(SCHEMA_OPERATION, operation)

if (!tracer._dataStreamsProcessor.canSampleSchema(operation)) {
return
}

// if the span is unsampled, do not sample the schema
if (!tracer._prioritySampler.isSampled(span)) {
return
}

const weight = tracer._dataStreamsProcessor.trySampleSchema(operation)
if (weight === 0) {
return
}

const schemaData = SchemaBuilder.getSchemaDefinition(
this.extractSchemas(descriptor, tracer._dataStreamsProcessor)
)

span.setTag(SCHEMA_DEFINITION, schemaData.definition)
span.setTag(SCHEMA_WEIGHT, weight)
span.setTag(SCHEMA_ID, schemaData.id)
}
}

module.exports = SchemaExtractor
31 changes: 31 additions & 0 deletions packages/datadog-plugin-avsc/test/helpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const fs = require('fs')

async function loadMessage (avro, messageTypeName) {
if (messageTypeName === 'User') {
// Read and parse the Avro schema
const schema = JSON.parse(fs.readFileSync('packages/datadog-plugin-avsc/test/schemas/user.avsc', 'utf8'))

// Create a file and write Avro data
const filePath = 'packages/datadog-plugin-avsc/test/schemas/users.avro'

return {
schema,
path: filePath
}
} else if (messageTypeName === 'AdvancedUser') {
// Read and parse the Avro schema
const schema = JSON.parse(fs.readFileSync('packages/datadog-plugin-avsc/test/schemas/advanced_user.avsc', 'utf8'))

// Create a file and write Avro data
const filePath = 'packages/datadog-plugin-avsc/test/schemas/advanced_users.avro'

return {
schema,
path: filePath
}
}
}

module.exports = {
loadMessage
}
Loading
Loading