Add basic telemetry support

This patch adds support for Opentelemetry. If
OTEL_EXPORTER_OTLP_ENDPOINT env variable is defined, it will send traces
there. Otherwise there is no change.

The whole compose is wrapped in a single span. Nested under that are
spans for operations that involve a remote server.

* Talking to CTS
* Sending API requests to Koji
* Any git repo clone

Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>

(cherry picked from commit c15ddbc946cc6a820dfb2f0bbacb72ca118100ba)
This commit is contained in:
Lubomír Sedlář 2023-09-04 15:37:57 +03:00 committed by Stepan Oksanichenko
parent 49a3e6cd12
commit feffd284a4
22 changed files with 314 additions and 67 deletions

View File

@ -50,6 +50,7 @@ from pungi.util import (
translate_path_raw,
)
from pungi.metadata import compose_to_composeinfo
from pungi.otel import tracing
try:
# This is available since productmd >= 1.18
@ -130,6 +131,7 @@ def cts_auth(pungi_conf):
cts_oidc_client_id = os.environ.get(
"CTS_OIDC_CLIENT_ID", ""
) or pungi_conf.get("cts_oidc_client_id", "")
with tracing.span("obtain-oidc-token"):
token = retry_request(
"post",
cts_oidc_token_url,
@ -194,6 +196,7 @@ def get_compose_info(
"parent_compose_ids": parent_compose_ids,
"respin_of": respin_of,
}
with tracing.span("create-compose-in-cts"):
with cts_auth(conf) as authentication:
rv = retry_request("post", url, json_data=data, auth=authentication)
@ -231,6 +234,7 @@ def update_compose_url(compose_id, compose_dir, conf):
"action": "set_url",
"compose_url": compose_url,
}
with tracing.span("update-compose-url"):
with cts_auth(conf) as authentication:
return retry_request("patch", url, json_data=data, auth=authentication)
@ -373,6 +377,7 @@ class Compose(kobo.log.LoggingBase):
self.ci_base.load(
os.path.join(self.paths.work.topdir(arch="global"), "composeinfo-base.json")
)
tracing.set_attribute("compose_id", self.compose_id)
self.supported = supported
if (
@ -557,6 +562,7 @@ class Compose(kobo.log.LoggingBase):
old_status = self.get_status()
if stat_msg == old_status:
return
tracing.set_attribute("compose_status", stat_msg)
if old_status == "FINISHED":
msg = "Could not modify a FINISHED compose: %s" % self.topdir
self.log_error(msg)

199
pungi/otel.py Normal file
View File

@ -0,0 +1,199 @@
import itertools
import os
from contextlib import contextmanager
"""
This module contains two classes with the same interface. An instance of one of
them is available as `tracing`. Which class is instantiated is selected
depending on whether environment variables configuring OTel are configured.
"""
class DummyTracing:
"""A dummy tracing module that doesn't actually do anything."""
@contextmanager
def span(self, *args, **kwargs):
yield
def set_attribute(self, name, value):
pass
def force_flush(self):
pass
def instrument_xmlrpc_proxy(self, proxy):
return proxy
def get_traceparent(self):
return None
def set_context(self, traceparent):
pass
class OtelTracing:
"""This class implements the actual integration with opentelemetry."""
def __init__(self):
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.instrumentation.requests import RequestsInstrumentor
otel_endpoint = os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"]
provider = TracerProvider(
resource=Resource(attributes={"service.name": "pungi"})
)
if "console" == otel_endpoint:
# This is for debugging the tracing locally.
self.processor = BatchSpanProcessor(ConsoleSpanExporter())
else:
self.processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(self.processor)
trace.set_tracer_provider(provider)
self.tracer = trace.get_tracer(__name__)
traceparent = os.environ.get("TRACEPARENT")
if traceparent:
self.set_context(traceparent)
RequestsInstrumentor().instrument()
@contextmanager
def span(self, name, **attributes):
"""Create a new span as a child of the current one. Attributes can be
passed via kwargs."""
with self.tracer.start_as_current_span(name, attributes=attributes) as span:
yield span
def get_traceparent(self):
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
)
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
return carrier["traceparent"]
def set_attribute(self, name, value):
"""Set an attribute on the current span."""
from opentelemetry import trace
span = trace.get_current_span()
span.set_attribute(name, value)
def force_flush(self):
"""Ensure all spans and traces are sent out. Call this before the
process exits."""
self.processor.force_flush()
def instrument_xmlrpc_proxy(self, proxy):
return InstrumentedClientSession(proxy)
def set_context(self, traceparent):
"""Configure current context to match the given traceparent."""
from opentelemetry import context
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
)
ctx = TraceContextTextMapPropagator().extract(
carrier={"traceparent": traceparent}
)
context.attach(ctx)
class InstrumentedClientSession:
"""Wrapper around koji.ClientSession that creates spans for each API call.
RequestsInstrumentor can create spans at the HTTP requests level, but since
those all go the same XML-RPC endpoint, they are not very informative.
Multicall is not handled very well here. The spans will only have a
`multicall` boolean attribute, but they don't carry any additional data
that could group them.
Koji ClientSession supports three ways of making multicalls, but Pungi only
uses one, and that one is supported here.
Supported:
c.multicall = True
c.getBuild(1)
c.getBuild(2)
results = c.multiCall()
Not supported:
with c.multicall() as m:
r1 = m.getBuild(1)
r2 = m.getBuild(2)
Also not supported:
m = c.multicall()
r1 = m.getBuild(1)
r2 = m.getBuild(2)
m.call_all()
"""
def __init__(self, session):
self.session = session
def _name(self, name):
"""Helper for generating span names."""
return "%s.%s" % (self.session.__class__.__name__, name)
@property
def system(self):
"""This is only ever used to get list of available API calls. It is
rather awkward though. Ideally we wouldn't really trace this at all,
but there's the underlying POST request to the hub, which is quite
confusing in the trace if there is no additional context."""
return self.session.system
@property
def multicall(self):
return self.session.multicall
@multicall.setter
def multicall(self, value):
self.session.multicall = value
def __getattr__(self, name):
return self._instrument_method(name, getattr(self.session, name))
def _instrument_method(self, name, callable):
def wrapper(*args, **kwargs):
with tracing.span(self._name(name)) as span:
span.set_attribute("arguments", _format_args(args, kwargs))
if self.session.multicall:
tracing.set_attribute("multicall", True)
return callable(*args, **kwargs)
return wrapper
def _format_args(args, kwargs):
"""Turn args+kwargs into a single string. OTel could choke on more
complicated data."""
return ", ".join(
itertools.chain(
(repr(arg) for arg in args),
(f"{key}={value!r}" for key, value in kwargs.items()),
)
)
if "OTEL_EXPORTER_OTLP_ENDPOINT" in os.environ:
tracing = OtelTracing()
else:
tracing = DummyTracing()

View File

@ -23,7 +23,7 @@ import shutil
import re
from copy import copy
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from kobo.shortcuts import run, force_list
import kobo.rpmlib
from productmd.images import Image
@ -39,6 +39,7 @@ from pungi.wrappers.scm import get_file_from_scm
from pungi.wrappers import kojiwrapper
from pungi.phases.base import PhaseBase
from pungi.runroot import Runroot, download_and_extract_archive
from pungi.threading import TelemetryWorkerThread as WorkerThread
class BuildinstallPhase(PhaseBase):

View File

@ -24,7 +24,7 @@ import json
import productmd.treeinfo
from productmd.images import Image
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from kobo.shortcuts import run, relative_path, compute_file_checksums
from pungi.wrappers import iso
@ -43,6 +43,7 @@ from pungi.util import (
from pungi.media_split import MediaSplitter, convert_media_size
from pungi.compose_metadata.discinfo import read_discinfo, write_discinfo
from pungi.runroot import Runroot
from pungi.threading import TelemetryWorkerThread as WorkerThread
from .. import createiso

View File

@ -27,7 +27,7 @@ import xml.dom.minidom
import productmd.modules
import productmd.rpms
from kobo.shortcuts import relative_path, run
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from ..module_util import Modulemd, collect_module_defaults, collect_module_obsoletes
from ..util import (
@ -38,6 +38,7 @@ from ..util import (
from ..wrappers.createrepo import CreaterepoWrapper
from ..wrappers.scm import get_dir_from_scm
from .base import PhaseBase
from ..threading import TelemetryWorkerThread as WorkerThread
CACHE_TOPDIR = "/var/cache/pungi/createrepo_c/"
createrepo_lock = threading.Lock()

View File

@ -18,7 +18,8 @@ import hashlib
import json
from kobo.shortcuts import force_list
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from pungi.threading import TelemetryWorkerThread as WorkerThread
import productmd.treeinfo
from productmd.extra_files import ExtraFiles

View File

@ -13,7 +13,8 @@ from pungi.util import as_local_file, translate_path, get_repo_urls, version_gen
from pungi.phases import base
from pungi.linker import Linker
from pungi.wrappers.kojiwrapper import KojiWrapper
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from pungi.threading import TelemetryWorkerThread as WorkerThread
from kobo.shortcuts import force_list
from productmd.images import Image
from productmd.rpms import Rpms

View File

@ -2,12 +2,13 @@
import os
import re
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from .base import ConfigGuardedPhase, PhaseLoggerMixin
from .. import util
from ..wrappers import kojiwrapper
from ..phases.osbs import add_metadata
from ..threading import TelemetryWorkerThread as WorkerThread
class ImageContainerPhase(PhaseLoggerMixin, ConfigGuardedPhase):

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import os
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from kobo import shortcuts
from productmd.images import Image
@ -10,6 +10,7 @@ from .. import util
from ..linker import Linker
from ..wrappers import kojiwrapper
from .image_build import EXTENSIONS
from ..threading import TelemetryWorkerThread as WorkerThread
KIWIEXTENSIONS = [
("vhd-compressed", ["vhdfixed.xz"], "vhd.xz"),

View File

@ -9,8 +9,9 @@ from pungi.util import translate_path, get_repo_urls
from pungi.phases.base import ConfigGuardedPhase, ImageConfigMixin, PhaseLoggerMixin
from pungi.linker import Linker
from pungi.wrappers.kojiwrapper import KojiWrapper
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from productmd.images import Image
from pungi.threading import TelemetryWorkerThread as WorkerThread
class LiveMediaPhase(PhaseLoggerMixin, ImageConfigMixin, ConfigGuardedPhase):

View File

@ -5,7 +5,7 @@ import copy
import fnmatch
import json
import os
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from kobo import shortcuts
from productmd.rpms import Rpms
@ -13,6 +13,7 @@ from .base import ConfigGuardedPhase, PhaseLoggerMixin
from .. import util
from ..wrappers import kojiwrapper
from ..wrappers.scm import get_file_from_scm
from ..threading import TelemetryWorkerThread as WorkerThread
class OSBSPhase(PhaseLoggerMixin, ConfigGuardedPhase):

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import os
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from kobo import shortcuts
from productmd.images import Image
@ -10,6 +10,7 @@ from .. import util
from ..linker import Linker
from ..wrappers import kojiwrapper
from .image_build import EXTENSIONS
from ..threading import TelemetryWorkerThread as WorkerThread
# copy and modify EXTENSIONS with some that osbuild produces but which
# do not exist as `koji image-build` formats

View File

@ -4,7 +4,7 @@ import copy
import json
import os
from kobo import shortcuts
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from collections import OrderedDict
from pungi.arch_utils import getBaseArch
@ -14,6 +14,7 @@ from .. import util
from ..ostree.utils import get_ref_from_treefile, get_commitid_from_commitid_file
from ..util import get_repo_dicts, translate_path
from ..wrappers import scm
from ..threading import TelemetryWorkerThread as WorkerThread
class OSTreePhase(ConfigGuardedPhase):

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import os
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
import shlex
import shutil
from productmd import images
@ -20,6 +20,7 @@ from ..util import (
)
from ..wrappers import iso, lorax, scm
from ..runroot import Runroot
from ..threading import TelemetryWorkerThread as WorkerThread
class OstreeInstallerPhase(PhaseLoggerMixin, ConfigGuardedPhase):

View File

@ -33,11 +33,12 @@ import kobo.pkgset
import kobo.rpmlib
from kobo.shortcuts import compute_file_checksums
from kobo.threads import WorkerThread, ThreadPool
from kobo.threads import ThreadPool
from pungi.util import pkg_is_srpm, copy_all
from pungi.arch import get_valid_arches, is_excluded
from pungi.errors import UnsignedPackagesError
from pungi.threading import TelemetryWorkerThread as WorkerThread
class ExtendedRpmWrapper(kobo.pkgset.SimpleRpmWrapper):
@ -547,7 +548,7 @@ class KojiPackageSet(PackageSetBase):
pathinfo = self.koji_wrapper.koji_module.pathinfo
paths = []
if "getRPMChecksums" in self.koji_proxy.system.listMethods():
if "getRPMChecksums" in self.koji_wrapper.koji_methods:
def checksum_validator(keyname, pkg_path):
checksums = self.koji_proxy.getRPMChecksums(

View File

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
from kobo import shortcuts
from kobo.threads import ThreadPool, WorkerThread
from kobo.threads import ThreadPool
from pungi.threading import TelemetryWorkerThread as WorkerThread
class WeaverPhase(object):

View File

@ -23,6 +23,7 @@ from pungi import get_full_version, util
from pungi.errors import UnsignedPackagesError
from pungi.wrappers import kojiwrapper
from pungi.util import rmtree
from pungi.otel import tracing
# force C locales
@ -652,6 +653,7 @@ def cli_main():
signal.signal(signal.SIGINT, sigterm_handler)
signal.signal(signal.SIGTERM, sigterm_handler)
with tracing.span("run-compose"):
try:
main()
except (Exception, KeyboardInterrupt) as ex:
@ -671,3 +673,5 @@ def cli_main():
process_id = os.getpid()
directoy_to_remove = "/tmp/pungi-temp-git-repos-" + str(process_id) + "/"
rmtree(directoy_to_remove)
# Wait for all traces to be sent...
tracing.force_flush()

21
pungi/threading.py Normal file
View File

@ -0,0 +1,21 @@
from kobo.threads import WorkerThread
from .otel import tracing
class TelemetryWorkerThread(WorkerThread):
"""
Subclass of WorkerThread that captures current context when the thread is
created, and restores the context in the new thread.
A regular WorkerThread would start from an empty context, leading to any
spans created in the thread disconnected from the overall trace.
"""
def __init__(self, *args, **kwargs):
self.traceparent = tracing.get_traceparent()
super(TelemetryWorkerThread, self).__init__(*args, **kwargs)
def run(self, *args, **kwargs):
tracing.set_context(self.traceparent)
super(TelemetryWorkerThread, self).run(*args, **kwargs)

View File

@ -32,9 +32,11 @@ import functools
import kobo.conf
from kobo.shortcuts import run, force_list
from kobo.threads import WorkerThread, ThreadPool
from kobo.threads import ThreadPool
from productmd.common import get_major_version
from pungi.module_util import Modulemd
from pungi.otel import tracing
from pungi.threading import TelemetryWorkerThread as WorkerThread
# Patterns that match all names of debuginfo packages
DEBUG_PATTERNS = ["*-debuginfo", "*-debuginfo-*", "*-debugsource"]
@ -877,6 +879,7 @@ def retry(timeout=120, interval=30, wait_on=Exception):
@retry(wait_on=RuntimeError)
def git_ls_remote(baseurl, ref, credential_helper=None):
with tracing.span("git-ls-remote", baseurl=baseurl, ref=ref):
cmd = ["git"]
if credential_helper:
cmd.extend(["-c", "credential.useHttpPath=true"])

View File

@ -34,6 +34,7 @@ from datetime import timedelta
from .kojimock import KojiMock
from .. import util
from ..otel import tracing
from ..arch_utils import getBaseArch
@ -68,9 +69,11 @@ class KojiWrapper(object):
value = getattr(self.koji_module.config, key, None)
if value is not None:
session_opts[key] = value
self.koji_proxy = koji.ClientSession(
self.koji_module.config.server, session_opts
self.koji_proxy = tracing.instrument_xmlrpc_proxy(
koji.ClientSession(self.koji_module.config.server, session_opts)
)
with tracing.span("koji.system.listMethods"):
self.koji_methods = self.koji_proxy.system.listMethods()
# This retry should be removed once https://pagure.io/koji/issue/3170 is
# fixed and released.
@ -1011,6 +1014,7 @@ class KojiDownloadProxy:
os.utime(destination_file)
return destination_file
with tracing.span("download-rpm", url=url):
return self._atomic_download(url, destination_file, validator)
def get_file(self, path, validator=None):

View File

@ -28,6 +28,7 @@ import kobo.log
from kobo.shortcuts import run, force_list
from pungi.util import explode_rpm_package, makedirs, copy_all, temp_dir, retry
from .kojiwrapper import KojiWrapper
from ..otel import tracing
lock = threading.Lock()
@ -229,6 +230,7 @@ class GitWrapper(ScmBase):
tmp_dir = self.get_temp_repo_path(scm_root, scm_branch)
if not os.path.isdir(tmp_dir):
makedirs(tmp_dir)
with tracing.span("git-clone", repo=scm_root, ref=scm_branch):
self._clone(scm_root, scm_branch, tmp_dir)
self.run_process_command(tmp_dir)
return tmp_dir
@ -377,6 +379,7 @@ class ContainerImageScmWrapper(ScmBase):
self.log_debug(
"Exporting container %s to %s: %s", scm_root, target_dir, cmd
)
with tracing.span("skopeo-copy", arch=arch, image=scm_root):
self.retry_run(cmd, can_fail=False)
except RuntimeError as e:
self.log_error(

View File

@ -130,14 +130,6 @@ class PkgsetCompareMixin(object):
self.assertEqual({}, actual)
class DummySystem(object):
def __init__(self):
self.methods = ["_listapi", "Dummy", "getRPM", "getRPMChecksums"]
def listMethods(self):
return self.methods
@ddt.ddt
@mock.patch("pungi.phases.pkgset.pkgsets.ReaderPool", new=FakePool)
@mock.patch("kobo.pkgset.FileCache", new=MockFileCache)
@ -166,7 +158,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
self.koji_downloader = helpers.FSKojiDownloader()
self.koji_wrapper = mock.Mock()
self.koji_wrapper.koji_proxy.listTaggedRPMS.return_value = self.tagged_rpms
self.koji_wrapper.koji_proxy.system = DummySystem()
self.koji_wrapper.koji_methods = ["getRPM", "getRPMChecksums"]
self.koji_wrapper.koji_module.pathinfo = self.path_info
def _touch_files(self, filenames):