@@ -77,9 +77,7 @@ extension HTTPClient {
77
77
/// - data: Body `Data` representation.
78
78
public static func data( _ data: Data ) -> Body {
79
79
return Body ( length: data. count) { writer in
80
- var buffer = ByteBufferAllocator ( ) . buffer ( capacity: data. count)
81
- buffer. writeBytes ( data)
82
- return writer. write ( . byteBuffer( buffer) )
80
+ writer. write ( . byteBuffer( ByteBuffer ( bytes: data) ) )
83
81
}
84
82
}
85
83
@@ -89,9 +87,7 @@ extension HTTPClient {
89
87
/// - string: Body `String` representation.
90
88
public static func string( _ string: String ) -> Body {
91
89
return Body ( length: string. utf8. count) { writer in
92
- var buffer = ByteBufferAllocator ( ) . buffer ( capacity: string. utf8. count)
93
- buffer. writeString ( string)
94
- return writer. write ( . byteBuffer( buffer) )
90
+ writer. write ( . byteBuffer( ByteBuffer ( string: string) ) )
95
91
}
96
92
}
97
93
}
@@ -641,7 +637,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
641
637
case head
642
638
case redirected( HTTPResponseHead , URL )
643
639
case body
644
- case end
640
+ case endOrError
645
641
}
646
642
647
643
let task : HTTPClient . Task < Delegate . Response >
@@ -651,6 +647,8 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
651
647
let logger : Logger // We are okay to store the logger here because a TaskHandler is just for one request.
652
648
653
649
var state : State = . idle
650
+ var expectedBodyLength : Int ?
651
+ var actualBodyLength : Int = 0
654
652
var pendingRead = false
655
653
var mayRead = true
656
654
var closing = false {
@@ -771,16 +769,21 @@ extension TaskHandler: ChannelDuplexHandler {
771
769
uri: request. uri)
772
770
var headers = request. headers
773
771
774
- if !request. headers. contains ( name: " Host " ) {
775
- headers. add ( name: " Host " , value: request. host)
772
+ if !request. headers. contains ( name: " host " ) {
773
+ let port = request. port
774
+ var host = request. host
775
+ if !( port == 80 && request. scheme == " http " ) , !( port == 443 && request. scheme == " https " ) {
776
+ host += " : \( port) "
777
+ }
778
+ headers. add ( name: " host " , value: host)
776
779
}
777
780
778
781
do {
779
782
try headers. validate ( method: request. method, body: request. body)
780
783
} catch {
781
784
promise? . fail ( error)
782
785
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
783
- self . state = . end
786
+ self . state = . endOrError
784
787
return
785
788
}
786
789
@@ -794,12 +797,23 @@ extension TaskHandler: ChannelDuplexHandler {
794
797
assert ( head. version == HTTPVersion ( major: 1 , minor: 1 ) ,
795
798
" Sending a request in HTTP version \( head. version) which is unsupported by the above `if` " )
796
799
800
+ let contentLengths = head. headers [ canonicalForm: " content-length " ]
801
+ assert ( contentLengths. count <= 1 )
802
+
803
+ self . expectedBodyLength = contentLengths. first. flatMap { Int ( $0) }
804
+
797
805
context. write ( wrapOutboundOut ( . head( head) ) ) . map {
798
806
self . callOutToDelegateFireAndForget ( value: head, self . delegate. didSendRequestHead)
799
807
} . flatMap {
800
808
self . writeBody ( request: request, context: context)
801
809
} . flatMap {
802
810
context. eventLoop. assertInEventLoop ( )
811
+ if let expectedBodyLength = self . expectedBodyLength, expectedBodyLength != self . actualBodyLength {
812
+ self . state = . endOrError
813
+ let error = HTTPClientError . bodyLengthMismatch
814
+ self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
815
+ return context. eventLoop. makeFailedFuture ( error)
816
+ }
803
817
return context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) )
804
818
} . map {
805
819
context. eventLoop. assertInEventLoop ( )
@@ -808,10 +822,10 @@ extension TaskHandler: ChannelDuplexHandler {
808
822
} . flatMapErrorThrowing { error in
809
823
context. eventLoop. assertInEventLoop ( )
810
824
switch self . state {
811
- case . end :
825
+ case . endOrError :
812
826
break
813
827
default :
814
- self . state = . end
828
+ self . state = . endOrError
815
829
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
816
830
}
817
831
throw error
@@ -828,9 +842,11 @@ extension TaskHandler: ChannelDuplexHandler {
828
842
let promise = self . task. eventLoop. makePromise ( of: Void . self)
829
843
// All writes have to be switched to the channel EL if channel and task ELs differ
830
844
if context. eventLoop. inEventLoop {
845
+ self . actualBodyLength += part. readableBytes
831
846
context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
832
847
} else {
833
848
context. eventLoop. execute {
849
+ self . actualBodyLength += part. readableBytes
834
850
context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
835
851
}
836
852
}
@@ -893,12 +909,12 @@ extension TaskHandler: ChannelDuplexHandler {
893
909
case . end:
894
910
switch self . state {
895
911
case . redirected( let head, let redirectURL) :
896
- self . state = . end
912
+ self . state = . endOrError
897
913
self . task. releaseAssociatedConnection ( delegateType: Delegate . self, closing: self . closing) . whenSuccess {
898
914
self . redirectHandler? . redirect ( status: head. status, to: redirectURL, promise: self . task. promise)
899
915
}
900
916
default :
901
- self . state = . end
917
+ self . state = . endOrError
902
918
self . callOutToDelegate ( promise: self . task. promise, self . delegate. didFinishRequest)
903
919
}
904
920
}
@@ -913,14 +929,14 @@ extension TaskHandler: ChannelDuplexHandler {
913
929
context. read ( )
914
930
}
915
931
case . failure( let error) :
916
- self . state = . end
932
+ self . state = . endOrError
917
933
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
918
934
}
919
935
}
920
936
921
937
func userInboundEventTriggered( context: ChannelHandlerContext , event: Any ) {
922
938
if ( event as? IdleStateHandler . IdleStateEvent) == . read {
923
- self . state = . end
939
+ self . state = . endOrError
924
940
let error = HTTPClientError . readTimeout
925
941
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
926
942
} else {
@@ -930,7 +946,7 @@ extension TaskHandler: ChannelDuplexHandler {
930
946
931
947
func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
932
948
if ( event as? TaskCancelEvent ) != nil {
933
- self . state = . end
949
+ self . state = . endOrError
934
950
let error = HTTPClientError . cancelled
935
951
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
936
952
promise? . succeed ( ( ) )
@@ -941,10 +957,10 @@ extension TaskHandler: ChannelDuplexHandler {
941
957
942
958
func channelInactive( context: ChannelHandlerContext ) {
943
959
switch self . state {
944
- case . end :
960
+ case . endOrError :
945
961
break
946
962
case . body, . head, . idle, . redirected, . sent:
947
- self . state = . end
963
+ self . state = . endOrError
948
964
let error = HTTPClientError . remoteConnectionClosed
949
965
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
950
966
}
@@ -955,7 +971,7 @@ extension TaskHandler: ChannelDuplexHandler {
955
971
switch error {
956
972
case NIOSSLError . uncleanShutdown:
957
973
switch self . state {
958
- case . end :
974
+ case . endOrError :
959
975
/// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection,
960
976
/// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error.
961
977
break
@@ -964,11 +980,11 @@ extension TaskHandler: ChannelDuplexHandler {
964
980
/// We can also ignore this error like `.end`.
965
981
break
966
982
default :
967
- self . state = . end
983
+ self . state = . endOrError
968
984
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
969
985
}
970
986
default :
971
- self . state = . end
987
+ self . state = . endOrError
972
988
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
973
989
}
974
990
}
0 commit comments