Skip to content

#50 implement self-repairing mechanism for cases where binary data is corrupted #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,20 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import javax.sql.DataSource;

Expand Down Expand Up @@ -681,19 +684,31 @@ private static String getArchitecture()
* @param stream A stream with the postgres binaries.
* @param targetDir The directory to extract the content to.
*/
private static void extractTxz(InputStream stream, String targetDir) throws IOException {
private static void extractTxz(InputStream stream, File targetDir) throws IOException {
try (
XZInputStream xzIn = new XZInputStream(stream);
TarArchiveInputStream tarIn = new TarArchiveInputStream(xzIn)
) {
final Set<File> dirsToUpdate = new HashSet<>();
final Phaser phaser = new Phaser(1);
TarArchiveEntry entry;

while ((entry = tarIn.getNextTarEntry()) != null) {
final String individualFile = entry.getName();
final File fsObject = new File(targetDir, individualFile);

if (entry.isSymbolicLink() || entry.isLink()) {
if (fsObject.exists()) {
fsObject.setLastModified(System.currentTimeMillis());

File parentDir = fsObject.getParentFile();
while (parentDir != null) {
dirsToUpdate.add(parentDir);
if (targetDir.equals(parentDir)) {
break;
}
parentDir = parentDir.getParentFile();
}
} else if (entry.isSymbolicLink() || entry.isLink()) {
Path target = FileSystems.getDefault().getPath(entry.getLinkName());
Files.createSymbolicLink(fsObject.toPath(), target);
} else if (entry.isFile()) {
Expand Down Expand Up @@ -743,6 +758,10 @@ private void closeChannel(Channel channel) {
}
}

for (File updatedDir : dirsToUpdate) {
updatedDir.setLastModified(System.currentTimeMillis());
}

phaser.arriveAndAwaitAdvance();
}
}
Expand Down Expand Up @@ -786,31 +805,28 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver, File over
final File unpackLockFile = new File(pgDir, LOCK_FILE_NAME);
final File pgDirExists = new File(pgDir, ".exists");

if (!pgDirExists.exists()) {
if (!isPgBinReady(pgDirExists)) {
try (FileOutputStream lockStream = new FileOutputStream(unpackLockFile);
FileLock unpackLock = lockStream.getChannel().tryLock()) {
if (unpackLock != null) {
try {
if (pgDirExists.exists()) {
throw new IllegalStateException("unpack lock acquired but .exists file is present " + pgDirExists);
}
LOG.info("Extracting Postgres...");
try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) {
extractTxz(bais, pgDir.getPath());
extractTxz(bais, pgDir);
}
if (!pgDirExists.createNewFile()) {
throw new IllegalStateException("couldn't make .exists file " + pgDirExists);
pgDirExists.setLastModified(System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error("while unpacking Postgres", e);
}
} else {
// the other guy is unpacking for us.
int maxAttempts = 60;
while (!pgDirExists.exists() && --maxAttempts > 0) {
while (!isPgBinReady(pgDirExists) && --maxAttempts > 0) {
Thread.sleep(1000L);
}
if (!pgDirExists.exists()) {
if (!isPgBinReady(pgDirExists)) {
throw new IllegalStateException("Waited 60 seconds for postgres to be unpacked but it never finished!");
}
}
Expand All @@ -834,6 +850,18 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver, File over
}
}

public static boolean isPgBinReady(File pgDirExists) {
if (!pgDirExists.exists()) {
return false;
}

File parentDir = pgDirExists.getParentFile();
File[] otherFiles = Optional.ofNullable(parentDir.listFiles(file -> !file.equals(pgDirExists))).orElseGet(() -> new File[0]);

long contentLastModified = Stream.of(otherFiles).mapToLong(File::lastModified).max().orElse(Long.MAX_VALUE);
return parentDir.lastModified() <= pgDirExists.lastModified() && contentLastModified <= pgDirExists.lastModified();
}

@Override
public String toString()
{
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/simplelogger.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.slf4j.simpleLogger.showDateTime=true