From 5581395dfeb79931af2e76b7651d06a3c63bc0d8 Mon Sep 17 00:00:00 2001 From: Jason Zaugg Date: Tue, 25 May 2021 22:12:16 +1000 Subject: [PATCH] Deduplicate historical Java version tag and upgrade client library The new way of extracting the Java version doesn't include the internal build number, so remove that from historical data. First, upgrade the InfluxDB client library. This fixes two issues that I reported long ago with Nginx reverse proxy and basic authentication, so remove the workarounds. It also has a new feature to talk to the server in a binary protocal (Message Pack). Apart from being more efficient (which doesn't really matter much to us) it also doesn't lose track of int vs float data types when doing a bulk query (like we do in the migrator) which avoids needing to manually narrow "sampleCount" to a integral type. --- build.sbt | 2 +- .../main/java/scala/bench/DataMigrator.java | 32 ++++++++----------- .../src/main/java/scala/bench/Database.java | 15 +-------- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/build.sbt b/build.sbt index 825a231..45c3a85 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,7 @@ lazy val infrastructure = addJmh(project).settings( autoScalaLibrary := false, crossPaths := false, libraryDependencies ++= Seq( - "org.influxdb" % "influxdb-java" % "2.5", // TODO update to 2.6 when released for fix for https://github.com/influxdata/influxdb-java/issues/269 + "org.influxdb" % "influxdb-java" % "2.21", "org.eclipse.jgit" % "org.eclipse.jgit" % "4.6.0.201612231935-r", "com.google.guava" % "guava" % "21.0", "org.apache.commons" % "commons-lang3" % "3.5", diff --git a/infrastructure/src/main/java/scala/bench/DataMigrator.java b/infrastructure/src/main/java/scala/bench/DataMigrator.java index a83b455..72a5aa6 100644 --- a/infrastructure/src/main/java/scala/bench/DataMigrator.java +++ b/infrastructure/src/main/java/scala/bench/DataMigrator.java @@ -5,6 +5,7 @@ import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import org.influxdb.impl.TimeUtil; import java.time.Instant; import java.util.*; @@ -31,39 +32,32 @@ public static void main(String[] args) { //time written //---- ------- //0 14076 - String oldMeasure = "result_backup_20210519"; + String oldMeasure = "result_backup_20210525"; String newMeasure = "result"; - QueryResult queryResult = influxDB.query(new Query("select * from " + oldMeasure + " group by *", "scala_benchmark")); for (QueryResult.Result result : queryResult.getResults()) { for (QueryResult.Series series : result.getSeries()) { - List newFieldNames = new ArrayList<>(series.getColumns()); - int javaVersionIndex = newFieldNames.indexOf(JAVA_VERSION_TAG_NAME); - newFieldNames.remove(javaVersionIndex); - assert (newFieldNames.get(0).equals("time")); - newFieldNames.remove(0); Point.Builder builder = Point.measurement(newMeasure); Map newTags = new HashMap<>(series.getTags()); + String javaVersion = newTags.get(JAVA_VERSION_TAG_NAME); + if (javaVersion.equals("1.8.0_131-b11")) { + newTags.put(JAVA_VERSION_TAG_NAME, "1.8.0_131"); + } + + assert (series.getValues().size() == 1); List newValues = new ArrayList<>(series.getValues().get(0)); - Object removed = newValues.remove(javaVersionIndex); - String time = (String) newValues.remove(0); - newTags.put(JAVA_VERSION_TAG_NAME, (String) removed); - newTags.entrySet().removeIf(x -> x.getValue() == null || x.getValue().equals("")); builder.tag(newTags); + + List newFieldNames = new ArrayList<>(series.getColumns()); LinkedHashMap newFieldsMap = new LinkedHashMap<>(); assert (newFieldNames.size() == newValues.size()); for (int i = 0; i < newFieldNames.size(); i++) { String fieldName = newFieldNames.get(i); - boolean isLong = fieldName.equals("sampleCount"); - if (isLong) { - newFieldsMap.put(fieldName, ((Number) newValues.get(i)).longValue()); - } else { - newFieldsMap.put(fieldName, newValues.get(i)); - } + newFieldsMap.put(fieldName, newValues.get(i)); } builder.fields(newFieldsMap); - Instant parse = Instant.parse(time); - builder.time(parse.toEpochMilli(), TimeUnit.MILLISECONDS); + long epochMillis = (long) newValues.remove(0) / 1000L / 1000L; + builder.time(epochMillis, TimeUnit.MILLISECONDS); Point point = builder.build(); batchPoints.point(point); } diff --git a/infrastructure/src/main/java/scala/bench/Database.java b/infrastructure/src/main/java/scala/bench/Database.java index fef36d5..8929656 100644 --- a/infrastructure/src/main/java/scala/bench/Database.java +++ b/infrastructure/src/main/java/scala/bench/Database.java @@ -30,20 +30,7 @@ public static InfluxDB connectDb() { client.connectTimeout(10, TimeUnit.SECONDS); client.readTimeout(120, TimeUnit.SECONDS); client.writeTimeout(120, TimeUnit.SECONDS); - - // workaround https://github.com/influxdata/influxdb-java/issues/268 - client.addNetworkInterceptor(chain -> { - HttpUrl.Builder fixedUrl = chain.request().url().newBuilder().encodedPath("/influx/" + chain.request().url().encodedPath().replaceFirst("/influxdb", "")); - return chain.proceed(chain.request().newBuilder().url(fixedUrl.build()).build()); - }); - - client.authenticator((route, response) -> { - String credential = Credentials.basic(influxUser, influxPassword); - return response.request().newBuilder() - .header("Authorization", credential) - .build(); - }); - InfluxDB influxDB = InfluxDBFactory.connect(influxUrl, influxUser, influxPassword, client); + InfluxDB influxDB = InfluxDBFactory.connect(influxUrl, influxUser, influxPassword, client, InfluxDB.ResponseFormat.MSGPACK); // influxDB.setLogLevel(InfluxDB.LogLevel.FULL); return influxDB; }