@@ -505,7 +505,7 @@ def disconnect(self):
505
505
self ._sock .send (MQTT_DISCONNECT )
506
506
if self .logger :
507
507
self .logger .debug ("Closing socket" )
508
- self ._sock . close ()
508
+ self ._free_sockets ()
509
509
self ._is_connected = False
510
510
self ._subscribed_topics = None
511
511
if self .on_disconnect is not None :
@@ -516,6 +516,7 @@ def ping(self):
516
516
there is an active network connection.
517
517
"""
518
518
self .is_connected ()
519
+ buf = self ._rx_buffer
519
520
if self .logger :
520
521
self .logger .debug ("Sending PINGREQ" )
521
522
self ._sock .send (MQTT_PINGREQ )
@@ -524,8 +525,8 @@ def ping(self):
524
525
while True :
525
526
op = self ._wait_for_msg ()
526
527
if op == 208 :
527
- ping_resp = self ._sock . recv ( 2 )
528
- if ping_resp [0 ] != 0x00 :
528
+ self ._recv_into ( buf , 2 )
529
+ if buf [0 ] != 0x00 :
529
530
raise MMQTTException ("PINGRESP not returned from broker." )
530
531
return
531
532
@@ -579,6 +580,7 @@ def publish(self, topic, msg, retain=False, qos=0):
579
580
0 <= qos <= 1
580
581
), "Quality of Service Level 2 is unsupported by this library."
581
582
583
+ buf = self ._rx_buffer
582
584
# fixed header. [3.3.1.2], [3.3.1.3]
583
585
pub_hdr_fixed = bytearray ([0x30 | retain | qos << 1 ])
584
586
@@ -621,13 +623,14 @@ def publish(self, topic, msg, retain=False, qos=0):
621
623
while True :
622
624
op = self ._wait_for_msg ()
623
625
if op == 0x40 :
624
- sz = self ._sock .recv (1 )
625
- assert sz == b"\x02 "
626
- rcv_pid = self ._sock .recv (2 )
627
- rcv_pid = rcv_pid [0 ] << 0x08 | rcv_pid [1 ]
628
- if pid == rcv_pid :
626
+ self ._recv_into (buf , 1 )
627
+ assert buf == b"\x02 "
628
+ # rcv_pid = self._sock.recv(2)
629
+ self ._recv_into (buf , 2 )
630
+ buf = buf [0 ] << 0x08 | buf [1 ]
631
+ if pid == buf :
629
632
if self .on_publish is not None :
630
- self .on_publish (self , self ._user_data , topic , rcv_pid )
633
+ self .on_publish (self , self ._user_data , topic , buf )
631
634
return
632
635
633
636
def subscribe (self , topic , qos = 0 ):
@@ -686,6 +689,8 @@ def subscribe(self, topic, qos=0):
686
689
self ._check_qos (q )
687
690
self ._check_topic (t )
688
691
topics .append ((t , q ))
692
+ # Rx buffer
693
+ buf = self ._rx_buffer
689
694
# Assemble packet
690
695
packet_length = 2 + (2 * len (topics )) + (1 * len (topics ))
691
696
packet_length += sum (len (topic ) for topic , qos in topics )
@@ -707,9 +712,9 @@ def subscribe(self, topic, qos=0):
707
712
while True :
708
713
op = self ._wait_for_msg ()
709
714
if op == 0x90 :
710
- rc = self ._sock . recv ( 4 )
711
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
712
- if rc [3 ] == 0x80 :
715
+ self ._recv_into ( buf , 4 )
716
+ assert buf [1 ] == packet [2 ] and buf [2 ] == packet [3 ]
717
+ if buf [3 ] == 0x80 :
713
718
raise MMQTTException ("SUBACK Failure!" )
714
719
for t , q in topics :
715
720
if self .on_subscribe is not None :
@@ -751,6 +756,8 @@ def unsubscribe(self, topic):
751
756
raise MMQTTException (
752
757
"Topic must be subscribed to before attempting unsubscribe."
753
758
)
759
+ # Rx buffer
760
+ buf = self ._rx_buffer
754
761
# Assemble packet
755
762
packet_length = 2 + (2 * len (topics ))
756
763
packet_length += sum (len (topic ) for topic in topics )
@@ -770,12 +777,12 @@ def unsubscribe(self, topic):
770
777
while True :
771
778
op = self ._wait_for_msg ()
772
779
if op == 176 :
773
- return_code = self ._sock . recv ( 3 )
774
- assert return_code [0 ] == 0x02
780
+ self ._recv_into ( buf , 3 )
781
+ assert buf [0 ] == 0x02
775
782
# [MQTT-3.32]
776
783
assert (
777
- return_code [1 ] == packet_id_bytes [0 ]
778
- and return_code [2 ] == packet_id_bytes [1 ]
784
+ buf [1 ] == packet_id_bytes [0 ]
785
+ and buf [2 ] == packet_id_bytes [1 ]
779
786
)
780
787
for t in topics :
781
788
if self .on_unsubscribe is not None :
0 commit comments