@@ -460,6 +460,7 @@ def publish(self, topic, msg, retain=False, qos=0):
460
460
461
461
remaining_length = 2 + len (msg ) + len (topic )
462
462
if qos > 0 :
463
+ pid = self ._pid
463
464
remaining_length += 2
464
465
pub_hdr_var .append (0x00 )
465
466
pub_hdr_var .append (self ._pid )
@@ -478,13 +479,34 @@ def publish(self, topic, msg, retain=False, qos=0):
478
479
else :
479
480
pub_hdr_fixed .append (remaining_length )
480
481
481
- print ('pub_hdr_fixed' , pub_hdr_fixed )
482
- print ('pub_hdr_var' , pub_hdr_var )
483
-
482
+ if self .logger is not None :
483
+ self .logger .debug (
484
+ "Sending PUBLISH\n Topic: {0}\n Msg: {1}\
485
+ \n QoS: {2}\n Retain? {3}" .format (
486
+ topic , msg , qos , retain
487
+ )
488
+ )
484
489
self ._sock .send (pub_hdr_fixed )
485
490
self ._sock .send (pub_hdr_var )
491
+ if self .on_publish is not None :
492
+ self .on_publish (self , self .user_data , topic , self ._pid )
486
493
self ._sock .send (msg )
487
-
494
+ if qos == 1 :
495
+ while True :
496
+ op = self ._wait_for_msg ()
497
+ if op == 0x40 :
498
+ sz = self ._sock .recv (1 )
499
+ assert sz == b"\x02 "
500
+ rcv_pid = self ._sock .recv (2 )
501
+ rcv_pid = rcv_pid [0 ] << 0x08 | rcv_pid [1 ]
502
+ if pid == rcv_pid :
503
+ if self .on_publish is not None :
504
+ self .on_publish (self , self .user_data , topic , rcv_pid )
505
+ return
506
+ elif qos == 2 :
507
+ assert 0
508
+ if self .on_publish is not None :
509
+ self .on_publish (self , self .user_data , topic , rcv_pid )
488
510
489
511
def subscribe (self , topic , qos = 0 ):
490
512
"""Subscribes to a topic on the MQTT Broker.
0 commit comments