001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.utils;
020
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.Buffer;
025import java.nio.ByteBuffer;
026import java.nio.ByteOrder;
027import java.nio.channels.ClosedChannelException;
028import java.nio.channels.WritableByteChannel;
029import java.util.concurrent.atomic.AtomicBoolean;
030
031/**
032 * This class supports writing to an OutputStream or WritableByteChannel in fixed length blocks.
033 * <p>It can be be used to support output to devices such as tape drives that require output in this
034 * format. If the final block does not have enough content to fill an entire block, the output will
035 * be padded to a full block size.</p>
036 *
037 * <p>This class can be used to support TAR,PAX, and CPIO blocked output to character special devices.
038 * It is not recommended that this class be used unless writing to such devices, as the padding
039 * serves no useful purpose in such cases.</p>
040 *
041 * <p>This class should normally wrap a FileOutputStream or associated WritableByteChannel directly.
042 * If there is an intervening filter that modified the output, such as a CompressorOutputStream, or
043 * performs its own buffering, such as BufferedOutputStream,  output to the device may
044 * no longer be of the specified size.</p>
045 *
046 * <p>Any content written to this stream should be self-delimiting and should tolerate any padding
047 * added to fill the last block.</p>
048 *
049 * @since 1.15
050 */
051public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel {
052
053    private final WritableByteChannel out;
054    private final int blockSize;
055    private final ByteBuffer buffer;
056    private final AtomicBoolean closed = new AtomicBoolean(false);
057
058    /**
059     * Create a fixed length block output stream with given destination stream and block size
060     * @param os   The stream to wrap.
061     * @param blockSize The block size to use.
062     */
063    public FixedLengthBlockOutputStream(final OutputStream os, final int blockSize) {
064        if (os instanceof FileOutputStream) {
065            final FileOutputStream fileOutputStream = (FileOutputStream) os;
066            out = fileOutputStream.getChannel();
067            buffer = ByteBuffer.allocateDirect(blockSize);
068        } else {
069            out = new BufferAtATimeOutputChannel(os);
070            buffer = ByteBuffer.allocate(blockSize);
071        }
072        this.blockSize = blockSize;
073    }
074     /**
075      * Create a fixed length block output stream with given destination writable byte channel and block size
076     * @param out   The writable byte channel to wrap.
077     * @param blockSize The block size to use.
078     */
079    public FixedLengthBlockOutputStream(final WritableByteChannel out, final int blockSize) {
080        this.out = out;
081        this.blockSize = blockSize;
082        this.buffer = ByteBuffer.allocateDirect(blockSize);
083    }
084
085    private void maybeFlush() throws IOException {
086        if (!buffer.hasRemaining()) {
087            writeBlock();
088        }
089    }
090
091    private void writeBlock() throws IOException {
092        ((Buffer)buffer).flip();
093        final int i = out.write(buffer);
094        final boolean hasRemaining = buffer.hasRemaining();
095        if (i != blockSize || hasRemaining) {
096            final String msg = String
097                .format("Failed to write %,d bytes atomically. Only wrote  %,d",
098                    blockSize, i);
099            throw new IOException(msg);
100        }
101        ((Buffer)buffer).clear();
102    }
103
104    @Override
105    public void write(final int b) throws IOException {
106        if (!isOpen()) {
107            throw new ClosedChannelException();
108        }
109        buffer.put((byte) b);
110        maybeFlush();
111    }
112
113    @Override
114    public void write(final byte[] b, final int offset, final int length) throws IOException {
115        if (!isOpen()) {
116            throw new ClosedChannelException();
117        }
118        int off = offset;
119        int len = length;
120        while (len > 0) {
121            final int n = Math.min(len, buffer.remaining());
122            buffer.put(b, off, n);
123            maybeFlush();
124            len -= n;
125            off += n;
126        }
127    }
128
129    @Override
130    public int write(final ByteBuffer src) throws IOException {
131        if (!isOpen()) {
132            throw new ClosedChannelException();
133        }
134        final int srcRemaining = src.remaining();
135
136        if (srcRemaining < buffer.remaining()) {
137            // if don't have enough bytes in src to fill up a block we must buffer
138            buffer.put(src);
139        } else {
140            int srcLeft = srcRemaining;
141            final int savedLimit = src.limit();
142            // If we're not at the start of buffer, we have some bytes already  buffered
143            // fill up the reset of buffer and write the block.
144            if (buffer.position() != 0) {
145                final int n = buffer.remaining();
146                ((Buffer)src).limit(src.position() + n);
147                buffer.put(src);
148                writeBlock();
149                srcLeft -= n;
150            }
151            // whilst we have enough bytes in src for complete blocks,
152            // write them directly from src without copying them to buffer
153            while (srcLeft >= blockSize) {
154                ((Buffer)src).limit(src.position() + blockSize);
155                out.write(src);
156                srcLeft -= blockSize;
157            }
158            // copy any remaining bytes into buffer
159            ((Buffer)src).limit(savedLimit);
160            buffer.put(src);
161        }
162        return srcRemaining;
163    }
164
165    @Override
166    public boolean isOpen() {
167        if (!out.isOpen()) {
168            closed.set(true);
169        }
170        return !closed.get();
171    }
172
173    /**
174     * Potentially pads and then writes the current block to the underlying stream.
175     * @throws IOException if writing fails
176     */
177    public void flushBlock() throws IOException {
178        if (buffer.position() != 0) {
179            padBlock();
180            writeBlock();
181        }
182    }
183
184    @Override
185    public void close() throws IOException {
186        if (closed.compareAndSet(false, true)) {
187            try {
188                flushBlock();
189            } finally {
190                out.close();
191            }
192        }
193    }
194
195    private void padBlock() {
196        buffer.order(ByteOrder.nativeOrder());
197        int bytesToWrite = buffer.remaining();
198        if (bytesToWrite > 8) {
199            final int align = buffer.position() & 7;
200            if (align != 0) {
201                final int limit = 8 - align;
202                for (int i = 0; i < limit; i++) {
203                    buffer.put((byte) 0);
204                }
205                bytesToWrite -= limit;
206            }
207
208            while (bytesToWrite >= 8) {
209                buffer.putLong(0L);
210                bytesToWrite -= 8;
211            }
212        }
213        while (buffer.hasRemaining()) {
214            buffer.put((byte) 0);
215        }
216    }
217
218    /**
219     * Helper class to provide channel wrapper for arbitrary output stream that doesn't alter the
220     * size of writes.  We can't use Channels.newChannel, because for non FileOutputStreams, it
221     * breaks up writes into 8KB max chunks. Since the purpose of this class is to always write
222     * complete blocks, we need to write a simple class to take care of it.
223     */
224    private static class BufferAtATimeOutputChannel implements WritableByteChannel {
225
226        private final OutputStream out;
227        private final AtomicBoolean closed = new AtomicBoolean(false);
228
229        private BufferAtATimeOutputChannel(final OutputStream out) {
230            this.out = out;
231        }
232
233        @Override
234        public int write(final ByteBuffer buffer) throws IOException {
235            if (!isOpen()) {
236                throw new ClosedChannelException();
237            }
238            if (!buffer.hasArray()) {
239                throw new IOException("Direct buffer somehow written to BufferAtATimeOutputChannel");
240            }
241
242            try {
243                final int pos = buffer.position();
244                final int len = buffer.limit() - pos;
245                out.write(buffer.array(), buffer.arrayOffset() + pos, len);
246                ((Buffer)buffer).position(buffer.limit());
247                return len;
248            } catch (final IOException e) {
249                try {
250                    close();
251                } catch (final IOException ignored) { //NOSONAR
252                }
253                throw e;
254            }
255        }
256
257        @Override
258        public boolean isOpen() {
259            return !closed.get();
260        }
261
262        @Override
263        public void close() throws IOException {
264            if (closed.compareAndSet(false, true)) {
265                out.close();
266            }
267        }
268
269    }
270
271
272}