Skip to content

Add asynchronous flush support #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ build
out
.settings
.temp
bin
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Generate CloudWatch metrics embedded within structured log events. The embedded metrics will be extracted so that you can visualize and alarm on them for real-time incident detection. This allows you to monitor aggregated values while preserving the detailed log event context that generates them.
- [Use Cases](#use-cases)
- [Usage](#usage)
- [Graceful Shutdown](#graceful-shutdown)
- [API](#api)
- [Examples](#examples)
- [Development](#development)
Expand All @@ -25,7 +26,6 @@ Generate CloudWatch metrics embedded within structured log events. The embedded

To use a metric logger, you need to manually create and flush the logger.


```java
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
Expand All @@ -44,6 +44,30 @@ class Example {

You can find the artifact location and examples of how to include it in your project at [Maven Central](https://search.maven.org/artifact/software.amazon.cloudwatchlogs/aws-embedded-metrics)

## Graceful Shutdown

**Since:** 2.0.0-beta-1

In any environment, other than AWS Lambda, we recommend running an out-of-process agent (the CloudWatch Agent or
FireLens / Fluent-Bit) to collect the EMF events. When using an out-of-process agent, this package will buffer the data
asynchronously in process to handle any transient communication issues with the agent. This means that when the `MetricsLogger`
gets flushed, data may not be safely persisted yet. To gracefully shutdown the environment, you can call shutdown on the
environment's sink. A full example can be found in the [`examples`](examples) directory.

```java
// create an environment singleton, this should be re-used across loggers
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());

MetricsLogger logger = new MetricsLogger(environment);
logger.setDimensions(DimensionSet.of("Operation", "ProcessRecords"));
logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS);
logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8");
logger.flush();

// flush the sink, waiting up to 10s before giving up
environment.getSink().shutdown().orTimeout(10_000L, TimeUnit.MILLISECONDS);
```

## API

### MetricsLogger
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ allprojects {
targetCompatibility = '1.8'
}

version = '1.0.4'
version = '2.0.0-beta-1'
}

java {
Expand Down Expand Up @@ -64,6 +64,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.11.1'
implementation 'org.slf4j:slf4j-api:1.7.30'
implementation 'org.javatuples:javatuples:1.2'

// Use JUnit test framework
testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54'
Expand Down
2 changes: 1 addition & 1 deletion buildspecs/buildspec.canary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ env:
phases:
install:
runtime-versions:
java: corretto8
java: corretto11
commands:
# start docker
# https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files
Expand Down
2 changes: 1 addition & 1 deletion buildspecs/buildspec.release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
phases:
install:
runtime-versions:
java: corretto8
java: corretto11
build:
commands:
- ./gradlew publish
2 changes: 1 addition & 1 deletion buildspecs/buildspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ env:
phases:
install:
runtime-versions:
java: corretto8
java: corretto11
commands:
# start docker
# https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files
Expand Down
17 changes: 15 additions & 2 deletions examples/agent/src/main/java/agent/App.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package agent;

import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
import software.amazon.cloudwatchlogs.emf.environment.Environment;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
import software.amazon.cloudwatchlogs.emf.model.Unit;

import java.util.concurrent.TimeUnit;

public class App {

public static void main(String[] args) {
MetricsLogger logger = new MetricsLogger();
logger.putDimensions(DimensionSet.of("Operation", "Agent"));
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
emitMetric(environment);
emitMetric(environment);
emitMetric(environment);
environment.getSink().shutdown().orTimeout(360_000L, TimeUnit.MILLISECONDS);
}

private static void emitMetric(Environment environment) {
MetricsLogger logger = new MetricsLogger(environment);
logger.setDimensions(DimensionSet.of("Operation", "Agent"));
logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS);
logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8");
logger.flush();
Expand Down
16 changes: 16 additions & 0 deletions examples/ecs-firelens/src/main/java/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.ECSEnvironment;
import software.amazon.cloudwatchlogs.emf.environment.Environment;
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.Unit;
import sun.misc.Signal;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class App {

private static final Environment env = new ECSEnvironment(EnvironmentConfigurationProvider.getConfig());

public static void main(String[] args) throws Exception {
registerShutdownHook();

int portNumber = 8000;
HttpServer server = HttpServer.create(new InetSocketAddress(8000), 0);
Expand All @@ -37,6 +45,14 @@ public static void main(String[] args) throws Exception {
server.start();
}

private static void registerShutdownHook() {
// https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/
Signal.handle(new Signal("TERM"), sig -> {
env.getSink().shutdown().orTimeout(1_000L, TimeUnit.MILLISECONDS);
System.exit(0);
});
}

static class SimpleHandler implements HttpHandler {
@Override
public void handle(HttpExchange he) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import software.amazon.awssdk.services.cloudwatch.model.Statistic;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
import software.amazon.cloudwatchlogs.emf.environment.Environment;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
import software.amazon.cloudwatchlogs.emf.model.Unit;
Expand All @@ -56,56 +57,64 @@ public void setUp() {

@Test(timeout = 120_000)
public void testSingleFlushOverTCP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "TCP-SingleFlush";
int expectedSamples = 1;
config.setAgentEndpoint("tcp://127.0.0.1:25888");

logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

@Test(timeout = 300_000)
public void testMultipleFlushesOverTCP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "TCP-MultipleFlushes";
int expectedSamples = 3;
config.setAgentEndpoint("tcp://127.0.0.1:25888");

logMetric(metricName);
logMetric(metricName);
logMetric(env, metricName);
logMetric(env, metricName);
Thread.sleep(500);
logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

@Test(timeout = 120_000)
public void testSingleFlushOverUDP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "UDP-SingleFlush";
int expectedSamples = 1;
config.setAgentEndpoint("udp://127.0.0.1:25888");

logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

@Test(timeout = 300_000)
public void testMultipleFlushOverUDP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "UDP-MultipleFlush";
int expectedSamples = 3;
config.setAgentEndpoint("udp://127.0.0.1:25888");

logMetric(metricName);
logMetric(metricName);
logMetric(env, metricName);
logMetric(env, metricName);
Thread.sleep(500);
logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

private void logMetric(String metricName) {
MetricsLogger logger = new MetricsLogger(new EnvironmentProvider());
private void logMetric(Environment env, String metricName) {
MetricsLogger logger = new MetricsLogger(env);
logger.putDimensions(dimensions);
logger.putMetric(metricName, 100, Unit.MILLISECONDS);
logger.flush();
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,27 @@ public class Constants {
public static final int MAX_METRICS_PER_EVENT = 100;

public static final int MAX_DATAPOINTS_PER_METRIC = 100;

/**
* The max number of messages to hold in memory in case of transient socket errors. The maximum
* message size is 256 KB meaning the maximum size of this buffer would be 25,600 MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25,600 KB?

*/
public static final int DEFAULT_ASYNC_BUFFER_SIZE = 100;

/**
* How many times to retry an individual message. We eventually give up vs. retrying
* indefinitely in case there is something inherent to the message that is causing the failures.
* Giving up results in data loss, but also helps us reduce the risk of a poison pill blocking
* all process telemetry.
*/
public static final int MAX_ATTEMPTS_PER_MESSAGE = 100;

/** Starting backoff millis when a transient socket failure is encountered. */
public static final int MIN_BACKOFF_MILLIS = 50;

/** Max backoff millis when a transient socket failure is encountered. */
public static final int MAX_BACKOFF_MILLIS = 2000;

/** Maximum amount of random jitter to apply to retries */
public static final int MAX_BACKOFF_JITTER = 20;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.environment.Environments;
import software.amazon.cloudwatchlogs.emf.util.StringUtils;

Expand Down Expand Up @@ -54,6 +56,9 @@ public class Configuration {
*/
@Setter Environments environmentOverride;

/** Queue length for asynchronous sinks. */
@Setter @Getter int asyncBufferSize = Constants.DEFAULT_ASYNC_BUFFER_SIZE;

public Optional<String> getServiceName() {
return getStringOptional(serviceName);
}
Expand Down Expand Up @@ -85,6 +90,6 @@ private Optional<String> getStringOptional(String value) {
if (StringUtils.isNullOrEmpty(value)) {
return Optional.empty();
}
return Optional.ofNullable(value);
return Optional.of(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public class ConfigurationKeys {
public static final String LOG_STREAM_NAME = "LOG_STREAM_NAME";
public static final String AGENT_ENDPOINT = "AGENT_ENDPOINT";
public static final String ENVIRONMENT_OVERRIDE = "ENVIRONMENT";
public static final String ASYNC_BUFFER_SIZE = "ASYNC_BUFFER_SIZE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package software.amazon.cloudwatchlogs.emf.config;

import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.environment.Environments;
import software.amazon.cloudwatchlogs.emf.util.StringUtils;

Expand All @@ -27,21 +28,21 @@ protected EnvironmentConfigurationProvider() {}

public static Configuration getConfig() {
if (config == null) {
config =
new Configuration(
getEnvVar(ConfigurationKeys.SERVICE_NAME),
getEnvVar(ConfigurationKeys.SERVICE_TYPE),
getEnvVar(ConfigurationKeys.LOG_GROUP_NAME),
getEnvVar(ConfigurationKeys.LOG_STREAM_NAME),
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
getEnvironmentOverride());
config = createConfig();
}
return config;
}

private static String getEnvVar(String key) {
String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key);
return getEnv(name);
static Configuration createConfig() {
return new Configuration(
getEnvVar(ConfigurationKeys.SERVICE_NAME),
getEnvVar(ConfigurationKeys.SERVICE_TYPE),
getEnvVar(ConfigurationKeys.LOG_GROUP_NAME),
getEnvVar(ConfigurationKeys.LOG_STREAM_NAME),
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
getEnvironmentOverride(),
getIntOrDefault(
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE));
}

private static Environments getEnvironmentOverride() {
Expand All @@ -57,6 +58,24 @@ private static Environments getEnvironmentOverride() {
}
}

private static int getIntOrDefault(String key, int defaultValue) {
String value = getEnvVar(key);
if (StringUtils.isNullOrEmpty(value)) {
return defaultValue;
}

try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return defaultValue;
}
}

private static String getEnvVar(String key) {
String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key);
return getEnv(name);
}

private static String getEnv(String name) {
return SystemWrapper.getenv(name);
}
Expand Down
Loading