From 089e5520b5f0241754f0773da243682a2d290da3 Mon Sep 17 00:00:00 2001 From: Plamen Totev Date: Mon, 19 Dec 2016 21:29:41 +0200 Subject: [PATCH] Fix setRecompressAddedZips has no effect on the resulting archive To check if a given entry is zip file or not the first four bit should be read. There is an issue when entries are being added asynchronously. Because it's faster to submit entries than to compress them, input streams are opened faster than they are closed. Eventually this could lead to too many open files error. There was a workaround for this issue. Wrapper around the `InputStreamSupplier` so the first four bytes are read when the entry is compressed and not when it's submitted. That solves the too many open files problem, but unfortunately has no effect on the resulting archive. The compression method is determined when the entry is submitted so changing it afterwards has no effect. Use the newly introduced `ZipArchiveEntryRequestSupplier` to supply the whole `ZipArchiveEntryRequest` (including the compression method) when the entry is compressed. --- .../plexus/archiver/jar/JarArchiver.java | 3 +- .../archiver/zip/AbstractZipArchiver.java | 48 +------ .../archiver/zip/ConcurrentJarCreator.java | 124 +++++++++++++++++- .../plexus/archiver/zip/ZipArchiverTest.java | 49 +++++++ 4 files changed, 175 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/codehaus/plexus/archiver/jar/JarArchiver.java b/src/main/java/org/codehaus/plexus/archiver/jar/JarArchiver.java index 90f321563..fa0775e5c 100644 --- a/src/main/java/org/codehaus/plexus/archiver/jar/JarArchiver.java +++ b/src/main/java/org/codehaus/plexus/archiver/jar/JarArchiver.java @@ -569,7 +569,8 @@ protected boolean createEmptyZip( File zipFile ) { zipArchiveOutputStream.setMethod( ZipArchiveOutputStream.STORED ); } - ConcurrentJarCreator ps = new ConcurrentJarCreator( Runtime.getRuntime().availableProcessors() ); + ConcurrentJarCreator ps = + new ConcurrentJarCreator( isRecompressAddedZips(), Runtime.getRuntime().availableProcessors() ); initZipOutputStream( ps ); finalizeZipOutputStream( ps ); } diff --git a/src/main/java/org/codehaus/plexus/archiver/zip/AbstractZipArchiver.java b/src/main/java/org/codehaus/plexus/archiver/zip/AbstractZipArchiver.java index c69f69c2c..7eb664b0e 100755 --- a/src/main/java/org/codehaus/plexus/archiver/zip/AbstractZipArchiver.java +++ b/src/main/java/org/codehaus/plexus/archiver/zip/AbstractZipArchiver.java @@ -322,7 +322,7 @@ private void createArchiveMain() zipArchiveOutputStream.setMethod( doCompress ? ZipArchiveOutputStream.DEFLATED : ZipArchiveOutputStream.STORED ); - zOut = new ConcurrentJarCreator( Runtime.getRuntime().availableProcessors() ); + zOut = new ConcurrentJarCreator( recompressAddedZips, Runtime.getRuntime().availableProcessors() ); } initZipOutputStream( zOut ); @@ -508,21 +508,11 @@ protected void zipFile( InputStreamSupplier in, ConcurrentJarCreator zOut, Strin } else { - zOut.addArchiveEntry( ze, wrappedRecompressor( ze, in ), addInParallel ); + zOut.addArchiveEntry( ze, in, addInParallel ); } } } - private InputStream maybeSequence( byte[] header, int hdrBytes, InputStream in ) - { - return hdrBytes > 0 ? new SequenceInputStream( new ByteArrayInputStream( header, 0, hdrBytes ), in ) : in; - } - - private boolean isZipHeader( byte[] header ) - { - return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4; - } - /** * Method that gets called when adding from java.io.File instances. *

@@ -661,40 +651,6 @@ protected void zipDir( PlexusIoResource dir, ConcurrentJarCreator zOut, String v } } - private InputStreamSupplier wrappedRecompressor( final ZipArchiveEntry ze, final InputStreamSupplier other ) - { - - return new InputStreamSupplier() - { - - @Override - public InputStream get() - { - InputStream is = other.get(); - byte[] header = new byte[ 4 ]; - try - { - int read = is.read( header ); - boolean compressThis = doCompress; - if ( !recompressAddedZips && isZipHeader( header ) ) - { - compressThis = false; - } - - ze.setMethod( compressThis ? ZipArchiveEntry.DEFLATED : ZipArchiveEntry.STORED ); - - return maybeSequence( header, read, is ); - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } - - } - - }; - } - protected InputStreamSupplier createInputStreamSupplier( final InputStream inputStream ) { return new InputStreamSupplier() diff --git a/src/main/java/org/codehaus/plexus/archiver/zip/ConcurrentJarCreator.java b/src/main/java/org/codehaus/plexus/archiver/zip/ConcurrentJarCreator.java index b3220ce7f..f33a42540 100644 --- a/src/main/java/org/codehaus/plexus/archiver/zip/ConcurrentJarCreator.java +++ b/src/main/java/org/codehaus/plexus/archiver/zip/ConcurrentJarCreator.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.zip.Deflater; @@ -28,15 +29,21 @@ import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream; import org.apache.commons.compress.archivers.zip.StreamCompressor; import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequestSupplier; import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; import org.apache.commons.compress.parallel.InputStreamSupplier; import org.apache.commons.compress.parallel.ScatterGatherBackingStore; import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; +import org.codehaus.plexus.util.IOUtil; + import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; public class ConcurrentJarCreator { + private final boolean compressAddedZips; + private final ScatterZipOutputStream directories; private final ScatterZipOutputStream metaInfDir; @@ -77,8 +84,44 @@ public static ScatterZipOutputStream createDeferred( return new ScatterZipOutputStream( bs, sc ); } + /** + * Creates a new {@code ConcurrentJarCreator} instance. + *

+ * {@code ConcurrentJarCreator} creates zip files using several concurrent threads. + *

+ * This constructor has the same effect as + * {@link #ConcurrentJarCreator(boolean, int) ConcurrentJarCreator(true, nThreads) } + * + * @param nThreads The number of concurrent thread used to create the archive + * + * @throws IOException + */ public ConcurrentJarCreator( int nThreads ) throws IOException { + this( true, nThreads ); + } + + /** + * Creates a new {@code ConcurrentJarCreator} instance. + *

+ * {@code ConcurrentJarCreator} creates zip files using several concurrent threads. + * Entries that are already zip file could be just stored or compressed again. + * + * @param compressAddedZips Indicates if entries that are zip files should be compressed. + * If set to {@code false} entries that are zip files will be added using + * {@link ZipEntry#STORED} method. + * If set to {@code true} entries that are zip files will be added using + * the compression method indicated by the {@code ZipArchiveEntry} passed + * to {@link #addArchiveEntry(ZipArchiveEntry, InputStreamSupplier, boolean)}. + * The compression method for all entries that are not zip files will not be changed + * regardless of the value of this parameter + * @param nThreads The number of concurrent thread used to create the archive + * + * @throws IOException + */ + public ConcurrentJarCreator( boolean compressAddedZips, int nThreads ) throws IOException + { + this.compressAddedZips = compressAddedZips; ScatterGatherBackingStoreSupplier defaultSupplier = new DeferredSupplier( 100000000 / nThreads ); directories = createDeferred( defaultSupplier ); manifest = createDeferred( defaultSupplier ); @@ -146,11 +189,11 @@ else if ( "META-INF/MANIFEST.MF".equals( zipArchiveEntry.getName() ) ) } else if ( addInParallel ) { - parallelScatterZipCreator.addArchiveEntry( zipArchiveEntry, source ); + parallelScatterZipCreator.addArchiveEntry( createEntrySupplier( zipArchiveEntry, source ) ); } else { - synchronousEntries.addArchiveEntry( createZipArchiveEntryRequest( zipArchiveEntry, source ) ); + synchronousEntries.addArchiveEntry( createEntry( zipArchiveEntry, source ) ); } } @@ -195,4 +238,81 @@ public String getStatisticsMessage() return parallelScatterZipCreator.getStatisticsMessage() + " Zip Close: " + zipCloseElapsed + "ms"; } + private ZipArchiveEntryRequestSupplier createEntrySupplier( final ZipArchiveEntry zipArchiveEntry, + final InputStreamSupplier inputStreamSupplier ) + { + + return new ZipArchiveEntryRequestSupplier() + { + + @Override + public ZipArchiveEntryRequest get() + { + try + { + return createEntry( zipArchiveEntry, inputStreamSupplier ); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + } + + }; + } + + private ZipArchiveEntryRequest createEntry( final ZipArchiveEntry zipArchiveEntry, + final InputStreamSupplier inputStreamSupplier ) throws IOException + { + // if we re-compress the zip files there is no need to look at the input stream + + if ( compressAddedZips ) + { + return createZipArchiveEntryRequest( zipArchiveEntry, inputStreamSupplier ); + } + + // otherwise we should inspect the first four bites to see if the input stream is zip file or not + + InputStream is = inputStreamSupplier.get(); + byte[] header = new byte[4]; + try + { + int read = is.read( header ); + int compressionMethod = zipArchiveEntry.getMethod(); + if ( isZipHeader( header ) ) { + compressionMethod = ZipEntry.STORED; + } + + zipArchiveEntry.setMethod( compressionMethod ); + + return createZipArchiveEntryRequest( zipArchiveEntry, prependBytesToStream( header, read, is ) ); + } + catch ( IOException e ) + { + IOUtil.close( is ); + throw e; + } + } + + private boolean isZipHeader( byte[] header ) + { + return header[0] == 0x50 && header[1] == 0x4b && header[2] == 3 && header[3] == 4; + } + + private InputStreamSupplier prependBytesToStream( final byte[] bytes, final int len, final InputStream stream ) + { + return new InputStreamSupplier() { + + @Override + public InputStream get() + { + return len > 0 + ? new SequenceInputStream( new ByteArrayInputStream( bytes, 0, len ), stream ) + : stream; + } + + }; + + } + } diff --git a/src/test/java/org/codehaus/plexus/archiver/zip/ZipArchiverTest.java b/src/test/java/org/codehaus/plexus/archiver/zip/ZipArchiverTest.java index 197c5afe5..42a12c62a 100644 --- a/src/test/java/org/codehaus/plexus/archiver/zip/ZipArchiverTest.java +++ b/src/test/java/org/codehaus/plexus/archiver/zip/ZipArchiverTest.java @@ -26,6 +26,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; @@ -341,6 +342,54 @@ public void testCreateArchive() createArchive( archiver ); } + public void testRecompressAddedZips() throws Exception + { + // check that by default the zip archives are re-compressed + + final File zipFileRecompress = getTestFile( "target/output/recompress-added-zips.zip" ); + final ZipArchiver zipArchiverRecompress = getZipArchiver( zipFileRecompress ); + zipArchiverRecompress.addDirectory( getTestFile( "src/test/jars" ) ); + FileUtils.removePath( zipFileRecompress.getPath() ); + zipArchiverRecompress.createArchive(); + + final ZipFile zfRecompress = new ZipFile( zipFileRecompress ); + assertEquals( ZipEntry.DEFLATED, zfRecompress.getEntry( "test.zip" ).getMethod() ); + assertEquals( ZipEntry.DEFLATED, zfRecompress.getEntry( "test.jar" ).getMethod() ); + assertEquals( ZipEntry.DEFLATED, zfRecompress.getEntry( "test.rar" ).getMethod() ); + assertEquals( ZipEntry.DEFLATED, zfRecompress.getEntry( "test.tar.gz" ).getMethod() ); + zfRecompress.close(); + + // make sure the zip files are not re-compressed when recompressAddedZips is set to false + + final File zipFileDontRecompress = getTestFile( "target/output/dont-recompress-added-zips.zip" ); + ZipArchiver zipArchiver = getZipArchiver( zipFileDontRecompress ); + zipArchiver.addDirectory( getTestFile( "src/test/jars" ) ); + zipArchiver.setRecompressAddedZips( false ); + FileUtils.removePath( zipFileDontRecompress.getPath() ); + zipArchiver.createArchive(); + + final ZipFile zfDontRecompress = new ZipFile( zipFileDontRecompress ); + final ZipArchiveEntry zipEntry = zfDontRecompress.getEntry( "test.zip" ); + final ZipArchiveEntry jarEntry = zfDontRecompress.getEntry( "test.jar" ); + final ZipArchiveEntry rarEntry = zfDontRecompress.getEntry( "test.rar" ); + final ZipArchiveEntry tarEntry = zfDontRecompress.getEntry( "test.tar.gz" ); + // check if only zip files are not compressed... + assertEquals( ZipEntry.STORED, zipEntry.getMethod() ); + assertEquals( ZipEntry.STORED, jarEntry.getMethod() ); + assertEquals( ZipEntry.STORED, rarEntry.getMethod() ); + assertEquals( ZipEntry.DEFLATED, tarEntry.getMethod() ); + // ...and no file is corrupted in the process + assertTrue( IOUtil.contentEquals( new FileInputStream( getTestFile( "src/test/jars/test.zip" ) ), + zfDontRecompress.getInputStream( zipEntry ) ) ); + assertTrue( IOUtil.contentEquals( new FileInputStream( getTestFile( "src/test/jars/test.jar" ) ), + zfDontRecompress.getInputStream( jarEntry ) ) ); + assertTrue( IOUtil.contentEquals( new FileInputStream( getTestFile( "src/test/jars/test.rar" ) ), + zfDontRecompress.getInputStream( rarEntry ) ) ); + assertTrue( IOUtil.contentEquals( new FileInputStream( getTestFile( "src/test/jars/test.tar.gz" ) ), + zfDontRecompress.getInputStream( tarEntry ) ) ); + zfDontRecompress.close(); + } + public void testAddArchivedFileSet() throws Exception {