Skip to content

fix(ourlogs): Add a class which batches groups of logs together. #4229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
14246d6
Rename import to sentry_sdk.logger, export Log type
colin-sentry Apr 1, 2025
a2eaa8a
fix test
colin-sentry Apr 1, 2025
db66c57
feat(ourlogs): Batch logs into a single envelope
colin-sentry Apr 1, 2025
1e9b617
null gate
colin-sentry Apr 1, 2025
1d2253b
improve typing in test
colin-sentry Apr 1, 2025
cd5ddd5
fix more tests
colin-sentry Apr 1, 2025
7564711
ci
colin-sentry Apr 2, 2025
edde92a
rename before_emit_log to before_send_log
colin-sentry Apr 2, 2025
20c90f5
Preventing flushing again right away after first flush
antonpirker Apr 2, 2025
8bc0de8
using safe_repr
antonpirker Apr 2, 2025
5318850
more usages of safe_repr
colin-sentry Apr 2, 2025
709d504
Merge branch 'master' into log_batcher
antonpirker Apr 2, 2025
e589594
comment
antonpirker Apr 2, 2025
8a482c4
Merge branch 'log_batcher' of github.com:getsentry/sentry-python into…
antonpirker Apr 2, 2025
45104df
Rename import to sentry_sdk.logger, export Log type
colin-sentry Apr 1, 2025
a7de23a
fix test
colin-sentry Apr 1, 2025
4b01d3c
feat(ourlogs): Batch logs into a single envelope
colin-sentry Apr 1, 2025
3d2c650
null gate
colin-sentry Apr 1, 2025
00be0a2
improve typing in test
colin-sentry Apr 1, 2025
36bc4db
fix more tests
colin-sentry Apr 1, 2025
a20c826
ci
colin-sentry Apr 2, 2025
ef651bc
rename before_emit_log to before_send_log
colin-sentry Apr 2, 2025
9a3f887
Preventing flushing again right away after first flush
antonpirker Apr 2, 2025
43617ba
using safe_repr
antonpirker Apr 2, 2025
2bb23a2
comment
antonpirker Apr 2, 2025
1a029e3
more usages of safe_repr
colin-sentry Apr 2, 2025
945dc46
Merge branch 'log_batcher' of github.com:getsentry/sentry-python into…
antonpirker Apr 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sentry_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"start_transaction",
"trace",
"monitor",
"_experimental_logger",
"logger",
]

# Initialize the debug support after everything is loaded
Expand Down
142 changes: 142 additions & 0 deletions sentry_sdk/_log_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
import random
import threading
from datetime import datetime, timezone
from typing import Optional, List, Callable, TYPE_CHECKING, Any

from sentry_sdk.utils import format_timestamp, safe_repr
from sentry_sdk.envelope import Envelope

if TYPE_CHECKING:
from sentry_sdk._types import Log


class LogBatcher:
MAX_LOGS_BEFORE_FLUSH = 100
FLUSH_WAIT_TIME = 5.0

def __init__(
self,
capture_func, # type: Callable[[Envelope], None]
):
# type: (...) -> None
self._log_buffer = [] # type: List[Log]
self._capture_func = capture_func
self._running = True
self._lock = threading.Lock()

self._flush_event = threading.Event() # type: threading.Event

self._flusher = None # type: Optional[threading.Thread]
self._flusher_pid = None # type: Optional[int]

def _ensure_thread(self):
# type: (...) -> bool
"""For forking processes we might need to restart this thread.
This ensures that our process actually has that thread running.
"""
if not self._running:
return False

pid = os.getpid()
if self._flusher_pid == pid:
return True

with self._lock:
# Recheck to make sure another thread didn't get here and start the
# the flusher in the meantime
if self._flusher_pid == pid:
return True

self._flusher_pid = pid

self._flusher = threading.Thread(target=self._flush_loop)
self._flusher.daemon = True

try:
self._flusher.start()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self._running = False
return False

return True

def _flush_loop(self):
# type: (...) -> None
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def add(
self,
log, # type: Log
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
self._log_buffer.append(log)
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
self._flush_event.set()

def kill(self):
# type: (...) -> None
if self._flusher is None:
return

self._running = False
self._flush_event.set()
self._flusher = None

def flush(self):
# type: (...) -> None
self._flush()

@staticmethod
def _log_to_otel(log):
# type: (Log) -> Any
def format_attribute(key, val):
# type: (str, int | float | str | bool) -> Any
if isinstance(val, bool):
return {"key": key, "value": {"boolValue": val}}
if isinstance(val, int):
return {"key": key, "value": {"intValue": str(val)}}
if isinstance(val, float):
return {"key": key, "value": {"doubleValue": val}}
if isinstance(val, str):
return {"key": key, "value": {"stringValue": val}}
return {"key": key, "value": {"stringValue": safe_repr(val)}}

otel_log = {
"severityText": log["severity_text"],
"severityNumber": log["severity_number"],
"body": {"stringValue": log["body"]},
"timeUnixNano": str(log["time_unix_nano"]),
"attributes": [
format_attribute(k, v) for (k, v) in log["attributes"].items()
],
}

if "trace_id" in log:
otel_log["traceId"] = log["trace_id"]

return otel_log

def _flush(self):
# type: (...) -> Optional[Envelope]

envelope = Envelope(
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
)
with self._lock:
for log in self._log_buffer:
envelope.add_log(self._log_to_otel(log))
self._log_buffer.clear()
if envelope.items:
self._capture_func(envelope)
return envelope
return None
62 changes: 19 additions & 43 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import uuid
import random
Expand Down Expand Up @@ -65,6 +64,7 @@
from sentry_sdk.session import Session
from sentry_sdk.spotlight import SpotlightClient
from sentry_sdk.transport import Transport
from sentry_sdk._log_batcher import LogBatcher

I = TypeVar("I", bound=Integration) # noqa: E741

Expand Down Expand Up @@ -178,6 +178,7 @@ def __init__(self, options=None):
self.transport = None # type: Optional[Transport]
self.monitor = None # type: Optional[Monitor]
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
self.log_batcher = None # type: Optional[LogBatcher]

def __getstate__(self, *args, **kwargs):
# type: (*Any, **Any) -> Any
Expand Down Expand Up @@ -375,6 +376,12 @@ def _capture_envelope(envelope):
"Metrics not supported on Python 3.6 and lower with gevent."
)

self.log_batcher = None
if experiments.get("enable_logs", False):
from sentry_sdk._log_batcher import LogBatcher

self.log_batcher = LogBatcher(capture_func=_capture_envelope)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
raise ValueError(
Expand Down Expand Up @@ -451,6 +458,7 @@ def _capture_envelope(envelope):
if (
self.monitor
or self.metrics_aggregator
or self.log_batcher
or has_profiling_enabled(self.options)
or isinstance(self.transport, BaseHttpTransport)
):
Expand Down Expand Up @@ -868,15 +876,11 @@ def capture_event(

def _capture_experimental_log(self, current_scope, log):
# type: (Scope, Log) -> None
logs_enabled = self.options["_experiments"].get("enable_sentry_logs", False)
logs_enabled = self.options["_experiments"].get("enable_logs", False)
if not logs_enabled:
return
isolation_scope = current_scope.get_isolation_scope()

headers = {
"sent_at": format_timestamp(datetime.now(timezone.utc)),
} # type: dict[str, object]

environment = self.options.get("environment")
if environment is not None and "sentry.environment" not in log["attributes"]:
log["attributes"]["sentry.environment"] = environment
Expand Down Expand Up @@ -913,46 +917,14 @@ def _capture_experimental_log(self, current_scope, log):
f'[Sentry Logs] {log["body"]}',
)

envelope = Envelope(headers=headers)

before_emit_log = self.options["_experiments"].get("before_emit_log")
if before_emit_log is not None:
log = before_emit_log(log, {})
before_send_log = self.options["_experiments"].get("before_send_log")
if before_send_log is not None:
log = before_send_log(log, {})
if log is None:
return

def format_attribute(key, val):
# type: (str, int | float | str | bool) -> Any
if isinstance(val, bool):
return {"key": key, "value": {"boolValue": val}}
if isinstance(val, int):
return {"key": key, "value": {"intValue": str(val)}}
if isinstance(val, float):
return {"key": key, "value": {"doubleValue": val}}
if isinstance(val, str):
return {"key": key, "value": {"stringValue": val}}
return {"key": key, "value": {"stringValue": json.dumps(val)}}

otel_log = {
"severityText": log["severity_text"],
"severityNumber": log["severity_number"],
"body": {"stringValue": log["body"]},
"timeUnixNano": str(log["time_unix_nano"]),
"attributes": [
format_attribute(k, v) for (k, v) in log["attributes"].items()
],
}

if "trace_id" in log:
otel_log["traceId"] = log["trace_id"]

envelope.add_log(otel_log) # TODO: batch these

if self.spotlight:
self.spotlight.capture_envelope(envelope)

if self.transport is not None:
self.transport.capture_envelope(envelope)
if self.log_batcher:
self.log_batcher.add(log)

def capture_session(
self, session # type: Session
Expand Down Expand Up @@ -1006,6 +978,8 @@ def close(
self.session_flusher.kill()
if self.metrics_aggregator is not None:
self.metrics_aggregator.kill()
if self.log_batcher is not None:
self.log_batcher.kill()
if self.monitor:
self.monitor.kill()
self.transport.kill()
Expand All @@ -1030,6 +1004,8 @@ def flush(
self.session_flusher.flush()
if self.metrics_aggregator is not None:
self.metrics_aggregator.flush()
if self.log_batcher is not None:
self.log_batcher.flush()
self.transport.flush(timeout=timeout, callback=callback)

def __enter__(self):
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class CompressionAlgo(Enum):
Callable[[str, MetricValue, MeasurementUnit, MetricTags], bool]
],
"metric_code_locations": Optional[bool],
"enable_sentry_logs": Optional[bool],
"enable_logs": Optional[bool],
},
total=False,
)
Expand Down
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def emit(self, record):
if not client.is_active():
return

if not client.options["_experiments"].get("enable_sentry_logs", False):
if not client.options["_experiments"].get("enable_logs", False):
return

SentryLogsHandler._capture_log_from_record(client, record)
Expand All @@ -365,7 +365,12 @@ def _capture_log_from_record(client, record):
if isinstance(record.args, tuple):
for i, arg in enumerate(record.args):
attrs[f"sentry.message.parameters.{i}"] = (
arg if isinstance(arg, str) else safe_repr(arg)
arg
if isinstance(arg, str)
or isinstance(arg, float)
or isinstance(arg, int)
or isinstance(arg, bool)
else safe_repr(arg)
)
if record.lineno:
attrs["code.line.number"] = record.lineno
Expand Down
17 changes: 16 additions & 1 deletion sentry_sdk/_experimental_logger.py → sentry_sdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

from sentry_sdk import get_client, get_current_scope
from sentry_sdk.utils import safe_repr


def _capture_log(severity_text, severity_number, template, **kwargs):
Expand All @@ -19,6 +20,20 @@ def _capture_log(severity_text, severity_number, template, **kwargs):
for k, v in kwargs.items():
attrs[f"sentry.message.parameters.{k}"] = v

attrs = {
k: (
v
if (
isinstance(v, str)
or isinstance(v, int)
or isinstance(v, bool)
or isinstance(v, float)
)
else safe_repr(v)
)
for (k, v) in attrs.items()
}

# noinspection PyProtectedMember
client._capture_experimental_log(
scope,
Expand All @@ -36,6 +51,6 @@ def _capture_log(severity_text, severity_number, template, **kwargs):
trace = functools.partial(_capture_log, "trace", 1)
debug = functools.partial(_capture_log, "debug", 5)
info = functools.partial(_capture_log, "info", 9)
warn = functools.partial(_capture_log, "warn", 13)
warning = functools.partial(_capture_log, "warning", 13)
error = functools.partial(_capture_log, "error", 17)
fatal = functools.partial(_capture_log, "fatal", 21)
5 changes: 3 additions & 2 deletions sentry_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from sentry_sdk._types import Event, EventDataCategory, Hint
from sentry_sdk._types import Event, EventDataCategory, Hint, Log
else:
from typing import Any

Expand All @@ -20,5 +20,6 @@
Event = Any
EventDataCategory = Any
Hint = Any
Log = Any

__all__ = ("Event", "EventDataCategory", "Hint")
__all__ = ("Event", "EventDataCategory", "Hint", "Log")
Loading
Loading