MQTT系列:二、初探和使用库

Ubuntu 下编写 MQTT 应用程序

Posted by Jerry Chen on May 18, 2020

本文将简单介绍MQTT协议,并在Ubuntu上安装mosquitto进行初步实验,最后用 libmosquitto(C库)编写我们的客户端程序。

MQTT简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种基于TCP/IP的轻量级协议。客户端1发布消息,经过代理服务器(Broker)发送给订阅主题的客户端2、3、4。

  • 提供 3 种消息的 QoS(Quality of Service):
  • 至多一次传输(Qos:0);
  • 最少一次传输(Qos:1);
  • 只有一次传输(Qos:2);

使用工具推演

安装mosquitto

mosquitto是一个MQTT Broker代理服务器,ubuntu下安装方法如下:

安装完成后生成mosquitto和mosquitto_passwd两个可执行文件;

1
sudo apt-get install mosquitto

安装mosquitto-client

mosquitto-client是MQTT Client客户端,ubuntu下安装方法如下:

安装完成后生成mosquitto_pub和mosquitto_sub两个可执行文件;

1
sudo apt-get install mosquitto-clients

后面我们会使用libmosquitto(C库)编写我们的客户端。

进行推演

  1. 启动代理服务器

    如果报错,查看是否已经启动过;

    1
    2
    
    # 启动,-v 打印更多信息
    mosquitto -v
    

    客户端2、3、4(多开控制台)订阅主题test/#

    #是主题中的通配符;

    执行后会占用终端前台;

    1
    2
    
    # -v 打印更多信息,-t 主题
    mosquitto_sub -v -t test/#
    

  2. 客户端1发布消息

    1
    2
    
    # -t 主题,-m 消息
    mosquitto_pub -t test/hello -m "haha~"
    

默认服务质量QoS为0,可在mosquitto_sub和mosquitto_pub的参数中加入-q 1指定QoS;

编写MQTT客户端

安装成功mosquitto后可以查找到一个动态库libmosquitto.so.1,这就是我们的目标库文件, 在编译程序时需加上-lmosquitto进行链接。

-lmosquitto 需要在 /lib/usr/lib/usr/local/lib其中一个目录下有libmosquitto.so文件,没有就主动ln -s libmosquitto.so.1 /usr/lib/libmosquitto.so一下,或者用gcc -L xxx指定库文件目录。

示例: ` gcc -o mosquitto_client_sub mosquitto_client_sub.c -lmosquitto `

对于api的介绍见官网: libmosquitto api介绍

一般编写流程

以下是订阅客户端的伪代码,很多传参需要去官网查询:

gcc -I指定包含头文件的目录,mosquitto.h文件可去官网或者github进行下载。如果把头文件拷贝到/usr/include/目录中,就不需要指定包含目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <mosquitto.h>
mosquitto_lib_init();
mosq=mosquitto_new();

mosquitto_connect_callback_set();  // 其中进行mosquitto_subscribe();
mosquitto_disconnect_callback_set();
mosquitto_message_callback_set();  // 接收解析消息 并推送mosquitto_publish()

mosquitto_username_pw_set(mosq, "user", "pw");
mosquitto_connect();
while(1){
    mosquitto_loop(mosq, timeout, 1);  // 主功能需要不断循环判断
    // 这里可加其他的条件进行终止循环
}  // 这段如需要永远运行改为:mosquitto_loop_forever(mosq, timeout, 1)

mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

特别注意:mosquitto_loop_start()的含义是开个线程不断调用mosquitto_loop(),这时不应该再去使用多线程编程。如必要使用多线程编程就不能使用mosquitto_loop_start()(应该使用mosquitto_loop()mosquitto_loop_forever()),且需要在调用mosquitto_new()后接着调用mosquitto_threaded_set()使其对多线程进行优化。

编写sub客户端

my_mqtt_sub.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>

#define HOST "localhost"
#define PORT  1883
#define KEEP_ALIVE 60

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
    if(message->payloadlen){
        printf("%s %s", message->topic, (char *)(message->payload));
    }else{
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    int i;
    if(!result){
        /* Subscribe to broker information topics on successful connect. */
        mosquitto_subscribe(mosq, NULL, "mytop:", 2);
    }else{
        fprintf(stderr, "Connect failed\n");
    }
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;
    printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for(i=1; i<qos_count; i++){
        printf(", %d", granted_qos[i]);
    }
    printf("\n");
}

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    /* Pring all log messages regardless of level. */
    printf("%s\n", str);
}

int main()
{
    bool session = true;
    struct mosquitto *mosq = NULL;
    
    //libmosquitto 库初始化
    mosquitto_lib_init();
    
    //创建mosquitto客户端
    mosq = mosquitto_new(NULL,session,NULL);
    if(!mosq){
        printf("create client failed..\n");
        mosquitto_lib_cleanup();
        return 1;
    }
    
    //设置回调函数,需要时可使用
    //mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    //mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    
    //客户端连接服务器
    if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){
        fprintf(stderr, "Unable to connect.\n");
        return 1;
    }
    
    //循环处理网络消息
    mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return 0;
}

编写pub客户端

my_mqtt_pub.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>

#define HOST "localhost"
#define PORT  1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE  512

int main()
{
    bool session = true;
    char buff[MSG_MAX_SIZE];
    struct mosquitto *mosq = NULL;
    
    //libmosquitto 库初始化
    mosquitto_lib_init();
    
    //创建mosquitto客户端
    mosq = mosquitto_new(NULL,session,NULL);
    if(!mosq){
        printf("create client failed..\n");
        mosquitto_lib_cleanup();
        return 1;
    }
    
    //连接服务器
    if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){
        fprintf(stderr, "Unable to connect.\n");
        return 1;
    }
    
    //开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息
    int loop = mosquitto_loop_start(mosq);
    if(loop != MOSQ_ERR_SUCCESS)
    {
        printf("mosquitto loop error\n");
        return 1;
    }
    
    while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
    {
        /*发布消息*/
        mosquitto_publish(mosq,NULL,"mytop:",strlen(buff)+1,buff,0,0);
        memset(buff,0,sizeof(buff));
    }
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

实验结果

编译:

1
2
gcc my_mqtt_sub.c -lmosquitto -o my_sub
gcc my_mqtt_pub.c -lmosquitto -o my_pub

测试:

这两个执行文件都是客户端,请确保和MQTT代理服务器连接畅通。这里是localhost上的mosquitto作为代理服务器。

写在后面:

好像默认mosquito会开机自启动,杀掉进程后又会重启,可用以下命令关掉:

1
2
sudo service mosquitto status
sudo service mosquitto stop

模板程序

单线程

单端单收sub.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdio.h>
#include <mosquitto.h>

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1

void my_conn(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result){
        mosquitto_subscribe(mosq, NULL, "#", QOS);
    }else{
        printf("Connect failed!\n");
    }
}

void my_subs(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;
    printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for(i=1; i<qos_count; i++){
        printf(", %d", granted_qos[i]);
    }
    printf("\n");
}

void my_mess(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
    if(message->payloadlen){
        printf("%s %s\n", message->topic, (char *)(message->payload));
    }else{
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

int main(int argc, char *argv[]){
	printf("hello sub!\n");
	
	struct mosquitto *mosq = NULL;
	
	mosquitto_lib_init();
	mosq = mosquitto_new(NULL, true, NULL);
	
	mosquitto_connect_callback_set(mosq, my_conn);
	mosquitto_subscribe_callback_set(mosq, my_subs);
	mosquitto_message_callback_set(mosq, my_mess);
	
	mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
	while(1){
		mosquitto_loop(mosq, -1, 1);
	}
	mosquitto_destroy(mosq);
	mosquitto_lib_cleanup();
	return 0;
}
单端单发pub.c

该程序使用:mypub <topic> <messages>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1
#define RETAIN		0

#define MSG_MAX_SIZE	2048

int main(int argc, char *argv[]){
	printf("hello pub!\n");
	
	char *topic = malloc(256);
	char *msg = malloc(MSG_MAX_SIZE);
	memset(topic, 0, 256);
	memset(msg, 0, MSG_MAX_SIZE);
	
	if(argc >= 3){
		memcpy(topic, argv[1], strlen(argv[1]));
		memcpy(msg, argv[2], strlen(argv[2]));
	}
	else if(argc == 2){
		memcpy(topic, "topic", 5); //默认值
		memcpy(topic, argv[1], strlen(argv[1]));
	}
	else{
		memcpy(topic, "topic", 5); //默认值
	}
	
	struct mosquitto *mosq = NULL;
	
	mosquitto_lib_init();
	mosq = mosquitto_new(NULL, true, NULL);
	
	mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
	mosquitto_publish(mosq, NULL, topic, strlen(msg), msg, QOS, RETAIN);
	//如果是QoS大于0,就需要加循环处理接收
	int i = 0;
	while(i ++ < QOS){
		mosquitto_loop(mosq, -1, 1);
	}
	mosquitto_disconnect(mosq);

	mosquitto_destroy(mosq);
	mosquitto_lib_cleanup();
	free(msg);
	return 0;
}

注意:以上提到的发布qos大于0,需要加mosquitto_loop对代理服务器的消息进行处理;可以通过打印log进行查看,如果不加,就收不到服务器的ack和comp。虽然很多情况下不加通讯也成功,但是失去了qos设定的意义。关于循环的次数,我这里是随便设定了一个,按需设定吧。

多线程

单端自发自收one.c

自收自发程序,主线程发,另一线程收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>

#define PEERID		"my_one"
#define SESSION		true

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1
#define RETAIN		0

struct mosquitto *mosq = NULL;

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
    if(message->payloadlen){
        printf("====>recv:%s %s\n", message->topic, (char *)message->payload);
    }else{
        printf("%s (null)\n", message->topic);
    }

    mosquitto_publish(mosq, NULL, "top/result", sizeof("loveresult"), "loveresult", QOS, RETAIN);
    sleep(2);

    fflush(stdout);
}

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result){
        mosquitto_subscribe(mosq, NULL, "top/love", QOS);
    }else{
        fprintf(stderr, "Connect failed\n");
    }
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for(i=1; i<qos_count; i++){
        printf(", %d", granted_qos[i]);
    }
    printf("\n");
}

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("====>log:%s\n", str);
}

void mosq_init()
{
    mosquitto_lib_init();
    mosq = mosquitto_new(PEERID, SESSION, NULL);
    if(!mosq){
        fprintf(stderr, "Error: Out of memory.\n");
        exit(-1);
    }
    mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    //mosquitto_will_set(mosq,"top/exit", sizeof("livewill"), "livewill", QOS, RETAIN);
    mosquitto_threaded_set(mosq, 1);  // 多线程优化
}

void *pthread_sub(void *arg)
{
    int toserver = -1;
    int timeout = 0;
    
    while(toserver){
        toserver = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
        if(toserver){
            timeout++;
            fprintf(stderr, "Unable to connect server [%d] times.\n", timeout);
            if(timeout > 3){
                fprintf(stderr, "Unable to connect server, exit.\n" );
                pthread_exit(NULL);
            }
            sleep(10);
        }
    }

    mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	pthread_t sub_id = 0;
	
    mosq_init();

    pthread_create(&sub_id, NULL, pthread_sub, NULL);
    pthread_detach(sub_id);

    while(1){
        mosquitto_publish(mosq, NULL, "top/love", sizeof("love"), "love", QOS, RETAIN);
        //如果是QoS大于0,就需要加循环处理接收
        int i = 0;
        while(i ++ < QOS){
            mosquitto_loop(mosq, -1, 1);
        }
    }
    return 0;
}
单端多收sub.c

多线程接收,每来一条消息都会都会新建一个线程进行处理,适合任务繁重的情况。

注意,这个慎用或者不能用,消息不密集才可使用,貌似一个进程仅能新建253个线程。但是像tcp那样有固定连接对象的可以使用这种方法,也就是每当客户端连接可以建一个线程,但是不能每个数据包来了都建立一个线程

topic:top/love

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <mosquitto.h>

#define PEERID		"my_sub_thread"
#define SESSION		true

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1
#define RETAIN		0

struct mosquitto *mosq = NULL;

void *pthread_sub_do(void *arg)
{
	struct mosquitto_message *message = arg;
	
	if(message->payloadlen){
        printf("====>recv:%s %s\n", message->topic, (char *)(message->payload));
    }else{
        printf("%s (null)\n", message->topic);
    }
	
	fflush(stdout);
	free(message->topic);
	free(message->payload);
	free(message);
	pthread_exit(NULL);
}

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
	struct mosquitto_message *trans = malloc(sizeof(struct mosquitto_message));
	memcpy(trans, message, sizeof(struct mosquitto_message));
	trans->topic = malloc(strlen(message->topic) + 1);
	trans->payload = malloc(strlen(message->payload) + 1);
	memcpy(trans->topic, message->topic, strlen(message->topic) + 1);
	memcpy(trans->payload, message->payload, strlen(message->payload) + 1);
	
	pthread_t sub_id = 0;
	pthread_create(&sub_id, NULL, pthread_sub_do, (void *)trans);
    pthread_detach(sub_id);
}

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result){
        mosquitto_subscribe(mosq, NULL, "top/love", QOS);
    }else{
        fprintf(stderr, "Connect failed\n");
    }
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for(i=1; i < qos_count; i++){
        printf(", %d", granted_qos[i]);
    }
    printf("\n");
}

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("====>log:%s\n", str);
}

void mosq_init()
{
    mosquitto_lib_init();
    mosq = mosquitto_new(PEERID, SESSION, NULL);
    if(!mosq){
        fprintf(stderr, "Error: Out of memory.\n");
        exit(-1);
    }
    mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    //mosquitto_will_set(mosq,"top/exit", sizeof("livewill"), "livewill", QOS, RETAIN);
    mosquitto_threaded_set(mosq, 1);
}

int main(int argc, char *argv[])
{
    mosq_init();
	mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
	
	mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}
单端多发pub.c

多线程发布,主线程mosquitto_loop处理回复,其他线程发布消息。

topic:top/love

payload:love

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>

#define PEERID		"my_pub_thread"
#define SESSION		true

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1
#define RETAIN		0

struct mosquitto *mosq = NULL;

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("====>log:%s\n", str);
}

void mosq_init()
{
    mosquitto_lib_init();
    mosq = mosquitto_new(PEERID, SESSION, NULL);
    if(!mosq){
        fprintf(stderr, "Error: Out of memory.\n");
        exit(-1);
    }
    mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_threaded_set(mosq, 1);  // 多线程优化
}

void *pthread_pub(void *arg)
{
    while(1){
        mosquitto_publish(mosq, NULL, "top/love", sizeof("love"), "love", QOS, RETAIN);
        //如果是QoS大于0,就需要加循环处理接收
        int i = 0;
        while(i ++ < QOS){
            mosquitto_loop(mosq, -1, 1);
        }
    }
    pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	int i = 0;
	pthread_t *ppub_id = malloc(sizeof(pthread_t));
	
    mosq_init();
	mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);

	for(i = 0; i < 5; i++){  // 多个线程
		pthread_t pub_id;
		pthread_create(ppub_id + i, NULL, pthread_pub, NULL);
		pthread_detach(*(ppub_id + i));
	}

    while(1){
		mosquitto_loop(mosq, -1, 1);  // 必须加这个,否则收不到代理回复
    }
	
	mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
	free(ppub_id);
	ppub_id = NULL;
    return 0;
}
多端单收nsub.c

多线程模拟多个sub客户端:

topic:top/love

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <mosquitto.h>

#define SESSION		true

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1
#define RETAIN		0

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
	if(message->payloadlen){
        printf("%s %s\n", message->topic, (char *)(message->payload));
    }else{
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result){
        mosquitto_subscribe(mosq, NULL, "top/love", QOS);
    }else{
        fprintf(stderr, "Connect failed\n");
    }
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for(i=1; i < qos_count; i++){
        printf(", %d", granted_qos[i]);
    }
    printf("\n");
}

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("====>log:%s\n", str);
}

void *pthread_nsub(void *arg)
{
	struct mosquitto *mosq = NULL;
	mosq = mosquitto_new(NULL, SESSION, NULL);
    if(!mosq){
        fprintf(stderr, "Error: Out of memory.\n");
    }
	
    //mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    //mosquitto_will_set(mosq,"top/exit", sizeof("livewill"), "livewill", QOS, RETAIN);
    //mosquitto_threaded_set(mosq, 1);//对于单个客户端而言是单线程的故不用加这句话
	
	mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
	
	mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
	pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	int i = 0;
	mosquitto_lib_init();
	for(i = 0; i < 5; i++){
		pthread_t sub_id = 0;
		pthread_create(&sub_id, NULL, pthread_nsub, NULL);
		pthread_detach(sub_id);
	}
	while(1);
	mosquitto_lib_cleanup();
    return 0;
}
多端单发npub.c

多线程模拟多个发送客户端:

topic:top/love

payload:love

这里的npub(5客户端每个发1条),如nsub(5客户端订阅)已连接,那么broker会发送25条给nsub。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>

#define SESSION		true

#define HOST		"localhost"
#define PORT		1883
#define KEEP_ALIVE	60

#define QOS			1
#define RETAIN		0

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    printf("====>log:%s\n", str);
}

void *pthread_npub(void *arg)
{
    struct mosquitto *mosq = NULL;

	mosq = mosquitto_new(NULL, SESSION, NULL);
    if(!mosq){
        fprintf(stderr, "Error: Out of memory.\n");
    }
	
	//mosquitto_log_callback_set(mosq, my_log_callback);
    //mosquitto_threaded_set(mosq, 1);//对于单个客户端而言是单线程的故不用加这句话
	
	mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
	
	mosquitto_publish(mosq, NULL, "top/love", sizeof("love"), "love", QOS, RETAIN);
	//如果是QoS大于0,就需要加循环处理接收
	int i = 0;
	while(i ++ < QOS){
		mosquitto_loop(mosq, -1, 1);
	}

    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
	pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	int i = 0;
	pthread_t *ppub_id = malloc(5 * sizeof(pthread_t));
	
	mosquitto_lib_init();
	
	for(i = 0; i < 5; i++){
		pthread_create(ppub_id + i, NULL, pthread_npub, NULL);
	}
	for(i = 0; i < 5; i++){
		pthread_join(*(ppub_id + i), NULL);
	}
	
	mosquitto_lib_cleanup();
	free(ppub_id);
	ppub_id = NULL;
    return 0;
}