-
Notifications
You must be signed in to change notification settings - Fork 125
add response decompression support #86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b0f15a6
35fa5e0
cc18c6d
88b674a
a51fe72
6d96880
b577f79
18438fe
0782d81
8e8c30d
69ec369
2891b7d
dbcfd44
5d59a66
2f1959f
8a95c2a
0e08fdb
dd83963
f1848d2
495b27a
4c4ac19
807761e
f2c8b09
646d3c7
1ed16d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the AsyncHTTPClient open source project | ||
// | ||
// Copyright (c) 2018-2019 Swift Server Working Group and the AsyncHTTPClient project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import CNIOExtrasZlib | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
private enum CompressionAlgorithm: String { | ||
case gzip | ||
case deflate | ||
} | ||
|
||
extension z_stream { | ||
public mutating func inflatePart(input: inout ByteBuffer, allocator: ByteBufferAllocator, consumer: (ByteBuffer) -> Void) { | ||
weissi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
input.readWithUnsafeMutableReadableBytes { dataPtr in | ||
let typedPtr = dataPtr.baseAddress!.assumingMemoryBound(to: UInt8.self) | ||
let typedDataPtr = UnsafeMutableBufferPointer(start: typedPtr, count: dataPtr.count) | ||
|
||
self.avail_in = UInt32(typedDataPtr.count) | ||
self.next_in = typedDataPtr.baseAddress! | ||
|
||
repeat { | ||
var buffer = allocator.buffer(capacity: 16384) | ||
self.inflatePart(to: &buffer) | ||
consumer(buffer) | ||
weissi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} while self.avail_out == 0 | ||
|
||
return Int(self.avail_in) | ||
} | ||
weissi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
public mutating func inflatePart(to buffer: inout ByteBuffer) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API must not be public, as it is extremely unsafe. Please make it private. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, thanks! |
||
buffer.writeWithUnsafeMutableBytes { outputPtr in | ||
let typedOutputPtr = UnsafeMutableBufferPointer(start: outputPtr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: outputPtr.count) | ||
|
||
self.avail_out = UInt32(typedOutputPtr.count) | ||
self.next_out = typedOutputPtr.baseAddress! | ||
|
||
let rc = inflate(&self, Z_NO_FLUSH) | ||
precondition(rc == Z_OK || rc == Z_STREAM_END, "decompression failed: \(rc)") | ||
weissi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return typedOutputPtr.count - Int(self.avail_out) | ||
} | ||
} | ||
} | ||
|
||
final class HTTPResponseDecompressor: ChannelDuplexHandler, RemovableChannelHandler { | ||
weissi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
typealias InboundIn = HTTPClientResponsePart | ||
typealias InboundOut = HTTPClientResponsePart | ||
typealias OutboundIn = HTTPClientRequestPart | ||
typealias OutboundOut = HTTPClientRequestPart | ||
|
||
private enum State { | ||
case empty | ||
case compressed(CompressionAlgorithm, Int) | ||
} | ||
|
||
private var state = State.empty | ||
private var stream = z_stream() | ||
|
||
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) { | ||
let request = self.unwrapOutboundIn(data) | ||
switch request { | ||
case .head(var head): | ||
if head.headers.contains(name: "Accept-Encoding") { | ||
context.write(data, promise: promise) | ||
} else { | ||
head.headers.replaceOrAdd(name: "Accept-Encoding", value: "deflate, gzip") | ||
context.write(self.wrapOutboundOut(.head(head)), promise: promise) | ||
} | ||
default: | ||
context.write(data, promise: promise) | ||
} | ||
} | ||
|
||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) { | ||
switch self.unwrapInboundIn(data) { | ||
case .head(let head): | ||
let algorithm: CompressionAlgorithm? | ||
let contentType = head.headers[canonicalForm: "Content-Encoding"].first?.lowercased() | ||
if contentType == "gzip" { | ||
algorithm = .gzip | ||
} else if contentType == "deflate" { | ||
algorithm = .deflate | ||
} else { | ||
algorithm = nil | ||
} | ||
|
||
let length = head.headers[canonicalForm: "Content-Length"].first.flatMap { Int($0) } | ||
|
||
if let algorithm = algorithm, let length = length { | ||
self.initializeDecoder(encoding: algorithm, length: length) | ||
} | ||
|
||
context.fireChannelRead(data) | ||
case .body(var part): | ||
switch self.state { | ||
case .compressed: | ||
self.stream.inflatePart(input: &part, allocator: context.channel.allocator) { output in | ||
context.fireChannelRead(self.wrapInboundOut(.body(output))) | ||
} | ||
default: | ||
context.fireChannelRead(data) | ||
} | ||
case .end: | ||
deflateEnd(&self.stream) | ||
context.fireChannelRead(data) | ||
} | ||
} | ||
|
||
private func initializeDecoder(encoding: CompressionAlgorithm, length: Int) { | ||
self.state = .compressed(encoding, length) | ||
|
||
self.stream.zalloc = nil | ||
self.stream.zfree = nil | ||
self.stream.opaque = nil | ||
weissi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let window: Int32 | ||
switch encoding { | ||
case .gzip: | ||
window = 15 + 16 | ||
default: | ||
window = 15 | ||
} | ||
|
||
let rc = CNIOExtrasZlib_inflateInit2(&self.stream, window) | ||
precondition(rc == Z_OK, "Unexpected return from zlib init: \(rc)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should probably throw. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed, thanks! |
||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.