Skip to content

Commit acf80a7

Browse files
committed
[Monolog] Added ElasticsearchLogstashHandler
1 parent 25f1804 commit acf80a7

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed

src/Symfony/Bridge/Monolog/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
-----
66

77
* The `RouteProcessor` class has been made final
8+
* Added `ElasticsearchLogstashHandler`
89

910
4.3.0
1011
-----
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bridge\Monolog\Handler;
13+
14+
use Monolog\Formatter\FormatterInterface;
15+
use Monolog\Formatter\LogstashFormatter;
16+
use Monolog\Handler\AbstractHandler;
17+
use Monolog\Logger;
18+
use Symfony\Component\HttpClient\HttpClient;
19+
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
20+
use Symfony\Contracts\HttpClient\HttpClientInterface;
21+
22+
/**
23+
* Push logs directly to Elasticsearch and format them according to Logstash specification.
24+
*
25+
* This handler dials directly with the HTTP interface of Elasticsearch. This
26+
* means it will slow down your application if Elasticsearch takes times to
27+
* answer. Even if all HTTP calls are done asynchronously.
28+
*
29+
* In a development environment, it's fine to keep the default configuration:
30+
* for each log, an HTTP request will be made to push the log to Elasticsearch.
31+
*
32+
* In a production environment, it's highly recommended to wrap this handler
33+
* in a handler with buffering capabilities (like the FingersCrossedHandler, or
34+
* BufferHandler) in order to call Elasticsearch only once with a bulk push. For
35+
* even better performance and fault tolerance, a proper ELK (https://www.elastic.co/what-is/elk-stack)
36+
* stack is recommended.
37+
*
38+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
39+
*/
40+
class ElasticsearchLogstashHandler extends AbstractHandler
41+
{
42+
private $endpoint;
43+
private $index;
44+
private $client;
45+
private $responses;
46+
47+
public function __construct(string $endpoint = 'http://127.0.0.1:9200', string $index = 'monolog', HttpClientInterface $client = null, int $level = Logger::DEBUG, bool $bubble = true)
48+
{
49+
if (!interface_exists(HttpClientInterface::class)) {
50+
throw new \LogicException(sprintf('The %s handler needs an HTTP client. Try running "composer require symfony/http-client".', __CLASS__));
51+
}
52+
53+
parent::__construct($level, $bubble);
54+
$this->endpoint = $endpoint;
55+
$this->index = $index;
56+
$this->client = $client ?: HttpClient::create(['timeout' => 1]);
57+
$this->responses = new \SplObjectStorage();
58+
}
59+
60+
public function handle(array $record): bool
61+
{
62+
if (!$this->isHandling($record)) {
63+
return false;
64+
}
65+
66+
$this->sendToElasticsearch([$record]);
67+
68+
return !$this->bubble;
69+
}
70+
71+
public function handleBatch(array $records): void
72+
{
73+
$records = array_filter($records, [$this, 'isHandling']);
74+
75+
if ($records) {
76+
$this->sendToElasticsearch($records);
77+
}
78+
}
79+
80+
protected function getDefaultFormatter(): FormatterInterface
81+
{
82+
return new LogstashFormatter('application', null, null, 'ctxt_', LogstashFormatter::V1);
83+
}
84+
85+
private function sendToElasticsearch(array $records)
86+
{
87+
$formatter = $this->getFormatter();
88+
89+
$body = '';
90+
foreach ($records as $record) {
91+
if ($this->processors) {
92+
foreach ($this->processors as $processor) {
93+
$record = \call_user_func($processor, $record);
94+
}
95+
}
96+
97+
$body .= json_encode([
98+
'index' => [
99+
'_index' => $this->index,
100+
'_type' => '_doc',
101+
],
102+
]);
103+
$body .= "\n";
104+
$body .= $formatter->format($record);
105+
$body .= "\n";
106+
}
107+
108+
$response = $this->client->request('POST', $this->endpoint.'/_bulk', [
109+
'body' => $body,
110+
'headers' => [
111+
'Content-Type' => 'application/json',
112+
],
113+
]);
114+
115+
$this->responses->attach($response);
116+
117+
$this->wait(false);
118+
}
119+
120+
public function __destruct()
121+
{
122+
$this->wait(true);
123+
}
124+
125+
private function wait(bool $errorOnTimeout)
126+
{
127+
$e = null;
128+
129+
foreach ($this->client->stream($this->responses) as $response => $chunk) {
130+
try {
131+
if ($chunk->isTimeout() && !$errorOnTimeout) {
132+
continue;
133+
}
134+
if (!$chunk->isFirst() && !$chunk->isLast()) {
135+
continue;
136+
}
137+
if ($chunk->isLast()) {
138+
$this->responses->detach($response);
139+
}
140+
} catch (ExceptionInterface $e) {
141+
$this->responses->detach($response);
142+
}
143+
}
144+
145+
if ($e) {
146+
error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e));
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)