0 Replies Latest reply on Dec 23, 2016 5:01 AM by jerryhh

    MQTT QOS2 lost packet

    jerryhh

      Hi,

      I am working with MQTT and use the WICED SDK 4.0.1

       

      I found a problem on QOS 2:

      After I call wiced_mqtt_publish( mqtt_object, p_topic, p_data, data_len, qos ), the callback function been invoked, but p_event_info->type is WICED_MQTT_EVENT_TYPE_UNKNOWN. 

      And there are some packets lost.

       

       

      Here is my code:

      static wiced_mqtt_security_t security;
      static wiced_mqtt_callback_t callback = mqtt_connection_event_cb;
      static wiced_mqtt_event_info_t expected_event;
      static wiced_ip_address_t    broker_address;
      static wiced_mqtt_object_t mqtt_object;
      static wiced_semaphore_t semaphore;
      static sems_mqtt_receive_msg_callback_t receive_msg_handler = NULL;
      wiced_result_t sems_mqtt_init()
      {
       wiced_result_t ret = WICED_SUCCESS;
       security.ca_cert_len = 0;
      // uint32_t  size_out;
          /* Read root CA certificate (self certified) from resources*/
      //    resource_get_readonly_buffer( &resources_apps_DIR_secure_mqtt_DIR_secure_mqtt_root_cacert_cer, 0, MQTT_MAX_RESOURCE_SIZE, &size_out, (const void **) &security.ca_cert );
      //    security.ca_cert_len = size_out;
       /* Memory allocated for mqtt object*/
       mqtt_object = (wiced_mqtt_object_t) malloc( WICED_MQTT_OBJECT_MEMORY_SIZE_REQUIREMENT );
       if ( mqtt_object == NULL )
       {
        WPRINT_APP_ERROR(("Dont have memory to allocate for mqtt object...\n"));
        ret = WICED_OUT_OF_HEAP_SPACE;
        return ret;
       }
          /* Bringup the network interface */
       if (wiced_network_is_ip_up(WICED_STA_INTERFACE) == WICED_FALSE)
       {
        ret = wiced_network_up( WICED_STA_INTERFACE, WICED_USE_EXTERNAL_DHCP_SERVER, NULL );
        if (ret != WICED_SUCCESS)
        {
         free(mqtt_object);
         return ret;
        }
       }
       /* DNS Look up */
          WPRINT_APP_INFO( ( "Resolving IP address of MQTT broker...\n" ) );
          ret = wiced_hostname_lookup( SEMS_MQTT_BROKER_ADDRESS, &broker_address, 10000 );
       WPRINT_APP_INFO(("Resolved Broker IP: %u.%u.%u.%u\n\n", (uint8_t)(GET_IPV4_ADDRESS(broker_address) >> 24),
                           (uint8_t)(GET_IPV4_ADDRESS(broker_address) >> 16),
                           (uint8_t)(GET_IPV4_ADDRESS(broker_address) >> 8),
                           (uint8_t)(GET_IPV4_ADDRESS(broker_address) >> 0)));
       if (ret != WICED_SUCCESS || broker_address.ip.v4 == 0)
       {
        free(mqtt_object);
              WPRINT_APP_INFO(("Error in resolving DNS\n"));
              return ret;
       }
       ret = wiced_mqtt_init( mqtt_object );
       if (ret != WICED_SUCCESS)
       {
        WPRINT_APP_INFO(("Error in init MQTT\n"));
        free(mqtt_object);
        return ret;
       }
          ret = wiced_rtos_init_semaphore( &semaphore );
          if (ret != WICED_SUCCESS)
          {
        WPRINT_APP_INFO(("Error in init semaphore\n"));
        free(mqtt_object);
           wiced_mqtt_deinit(mqtt_object);
          }
          return WICED_SUCCESS;
      }
      
      wiced_result_t sems_mqtt_connect(uint8_t* p_client_id, uint8_t* p_password, wiced_mqtt_conn_err_code_t* p_err_code)
      {
       wiced_result_t ret = WICED_SUCCESS;
       /* MQTT Connect */
       wiced_mqtt_pkt_connect_t conn_info;
       memset( &conn_info, 0, sizeof(wiced_mqtt_pkt_connect_t));
       conn_info.port_number = 0; //Default port;
       conn_info.mqtt_version = WICED_MQTT_PROTOCOL_VER4;
       conn_info.clean_session = 1;
       conn_info.client_id = p_client_id;
       conn_info.keep_alive = 120;
       conn_info.password = p_password;
       conn_info.username = NULL;
       conn_info.peer_cn = NULL;
       wiced_mqtt_security_t* p_security = NULL;
       if(security.ca_cert_len == 0)
       {
        p_security = NULL;
       }
       else {
        p_security = &security;
       }
       ret = wiced_mqtt_connect(mqtt_object, &broker_address, WICED_STA_INTERFACE, callback, p_security, &conn_info);
       if ( ret != WICED_SUCCESS )
       {
        return ret;
       }
       ret = mqtt_wait_for( WICED_MQTT_EVENT_TYPE_CONNECT_REQ_STATUS, WICED_MQTT_TIMEOUT );
       if ( ret != WICED_SUCCESS )
       {
        return ret;
       }
       *p_err_code = expected_event.data.err_code;
       return ret;
      }
      
      wiced_result_t sems_mqtt_publish(uint8_t qos, uint8_t *p_topic, uint8_t *p_data, uint32_t data_len, wiced_mqtt_msgid_t *p_msg_id)
      {
       *p_msg_id = wiced_mqtt_publish( mqtt_object, p_topic, p_data, data_len, qos );
       if ( *p_msg_id == 0)
       {
        return WICED_ERROR;
       }
       wiced_result_t ret = mqtt_wait_for( WICED_MQTT_EVENT_TYPE_PUBLISHED, WICED_MQTT_TIMEOUT );
       if (ret != WICED_ERROR)
       {
        return ret;
       }
       *p_msg_id = expected_event.data.msgid;
       return ret;
      }
      
      static wiced_result_t mqtt_connection_event_cb( wiced_mqtt_object_t sender, wiced_mqtt_event_info_t *p_event_info )
      {
       if (mqtt_object != sender)
       {
        return WICED_BADARG;
       }
       WPRINT_APP_INFO(( "Got MQTT Event. Type: %d\n", p_event_info->type ));
       switch ( p_event_info->type )
       {
        case WICED_MQTT_EVENT_TYPE_CONNECT_REQ_STATUS:
        case WICED_MQTT_EVENT_TYPE_DISCONNECTED:
        case WICED_MQTT_EVENT_TYPE_PUBLISHED:
        case WICED_MQTT_EVENT_TYPE_SUBCRIBED:
        case WICED_MQTT_EVENT_TYPE_UNSUBSCRIBED:
        {
         expected_event = *p_event_info;
         wiced_rtos_set_semaphore( &semaphore );
        }
         break;
        case WICED_MQTT_EVENT_TYPE_PUBLISH_MSG_RECEIVED:
        {
         wiced_mqtt_topic_msg_t msg = p_event_info->data.pub_recvd;
      //   WPRINT_APP_INFO(( "[MQTT] Received %.*s  for TOPIC : %.*s\n", (int) msg.data_len, msg.data, (int) msg.topic_len, msg.topic ));
         if (receive_msg_handler != NULL)
         {
          WPRINT_APP_INFO(( "[MQTT] Receve handler invoke \n"));
          receive_msg_handler(&msg);
         } else
         {
          WPRINT_APP_INFO(( "[MQTT] No receve handler \n"));
         }
        }
         break;
        default:
         break;
       }
       return WICED_SUCCESS;
      }
      
      static wiced_result_t mqtt_wait_for( wiced_mqtt_event_type_t event_type, uint32_t timeout )
      {
          if ( wiced_rtos_get_semaphore( &semaphore, timeout ) != WICED_SUCCESS )
          {
           WPRINT_APP_INFO(( "[MQTT] Time out \n"));
              return WICED_ERROR;
          }
          else
          {
              if ( event_type != expected_event.type )
              {
                  return WICED_BADARG;
              }
          }
          return WICED_SUCCESS;
      }