本文将简单介绍MQTT协议,并在Ubuntu上安装mosquitto进行初步实验,最后用 libmosquitto(C库)编写我们的客户端程序。
MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种基于TCP/IP的轻量级协议。客户端1发布消息,经过代理服务器(Broker)发送给订阅主题的客户端2、3、4。
- 提供 3 种消息的 QoS(Quality of Service):
- 至多一次传输(Qos:0);
- 最少一次传输(Qos:1);
- 只有一次传输(Qos:2);
使用工具推演
安装mosquitto
mosquitto是一个MQTT Broker代理服务器,ubuntu下安装方法如下:
安装完成后生成mosquitto和mosquitto_passwd两个可执行文件;
1
sudo apt-get install mosquitto
安装mosquitto-client
mosquitto-client是MQTT Client客户端,ubuntu下安装方法如下:
安装完成后生成mosquitto_pub和mosquitto_sub两个可执行文件;
1
sudo apt-get install mosquitto-clients
后面我们会使用libmosquitto(C库)编写我们的客户端。
进行推演
-
启动代理服务器
如果报错,查看是否已经启动过;
1 2
# 启动,-v 打印更多信息 mosquitto -v
客户端2、3、4(多开控制台)订阅主题
test/#
#是主题中的通配符;
执行后会占用终端前台;
1 2
# -v 打印更多信息,-t 主题 mosquitto_sub -v -t test/#
-
客户端1发布消息
1 2
# -t 主题,-m 消息 mosquitto_pub -t test/hello -m "haha~"
默认服务质量QoS为0,可在mosquitto_sub和mosquitto_pub的参数中加入-q 1
指定QoS;
编写MQTT客户端
安装成功mosquitto后可以查找到一个动态库libmosquitto.so.1,这就是我们的目标库文件, 在编译程序时需加上-lmosquitto进行链接。
-lmosquitto 需要在 /lib、/usr/lib、/usr/local/lib其中一个目录下有libmosquitto.so文件,没有就主动
ln -s libmosquitto.so.1 /usr/lib/libmosquitto.so
一下,或者用gcc -L xxx
指定库文件目录。
示例: ` gcc -o mosquitto_client_sub mosquitto_client_sub.c -lmosquitto `
对于api的介绍见官网: libmosquitto api介绍
一般编写流程
以下是订阅客户端的伪代码,很多传参需要去官网查询:
用
gcc -I
指定包含头文件的目录,mosquitto.h文件可去官网或者github进行下载。如果把头文件拷贝到/usr/include/
目录中,就不需要指定包含目录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <mosquitto.h>
mosquitto_lib_init();
mosq=mosquitto_new();
mosquitto_connect_callback_set(); // 其中进行mosquitto_subscribe();
mosquitto_disconnect_callback_set();
mosquitto_message_callback_set(); // 接收解析消息 并推送mosquitto_publish()
mosquitto_username_pw_set(mosq, "user", "pw");
mosquitto_connect();
while(1){
mosquitto_loop(mosq, timeout, 1); // 主功能需要不断循环判断
// 这里可加其他的条件进行终止循环
} // 这段如需要永远运行改为:mosquitto_loop_forever(mosq, timeout, 1)
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
特别注意:mosquitto_loop_start()
的含义是开个线程不断调用mosquitto_loop()
,这时不应该再去使用多线程编程。如必要使用多线程编程就不能使用mosquitto_loop_start()
(应该使用mosquitto_loop()
、mosquitto_loop_forever()
),且需要在调用mosquitto_new()
后接着调用mosquitto_threaded_set()
使其对多线程进行优化。
编写sub客户端
my_mqtt_sub.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("%s %s", message->topic, (char *)(message->payload));
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
if(!result){
/* Subscribe to broker information topics on successful connect. */
mosquitto_subscribe(mosq, NULL, "mytop:", 2);
}else{
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
/* Pring all log messages regardless of level. */
printf("%s\n", str);
}
int main()
{
bool session = true;
struct mosquitto *mosq = NULL;
//libmosquitto 库初始化
mosquitto_lib_init();
//创建mosquitto客户端
mosq = mosquitto_new(NULL,session,NULL);
if(!mosq){
printf("create client failed..\n");
mosquitto_lib_cleanup();
return 1;
}
//设置回调函数,需要时可使用
//mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
//mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
//客户端连接服务器
if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){
fprintf(stderr, "Unable to connect.\n");
return 1;
}
//循环处理网络消息
mosquitto_loop_forever(mosq, -1, 1);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
编写pub客户端
my_mqtt_pub.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
int main()
{
bool session = true;
char buff[MSG_MAX_SIZE];
struct mosquitto *mosq = NULL;
//libmosquitto 库初始化
mosquitto_lib_init();
//创建mosquitto客户端
mosq = mosquitto_new(NULL,session,NULL);
if(!mosq){
printf("create client failed..\n");
mosquitto_lib_cleanup();
return 1;
}
//连接服务器
if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){
fprintf(stderr, "Unable to connect.\n");
return 1;
}
//开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
printf("mosquitto loop error\n");
return 1;
}
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
{
/*发布消息*/
mosquitto_publish(mosq,NULL,"mytop:",strlen(buff)+1,buff,0,0);
memset(buff,0,sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
实验结果
编译:
1
2
gcc my_mqtt_sub.c -lmosquitto -o my_sub
gcc my_mqtt_pub.c -lmosquitto -o my_pub
测试:
这两个执行文件都是客户端,请确保和MQTT代理服务器连接畅通。这里是localhost上的mosquitto作为代理服务器。
写在后面:
好像默认mosquito会开机自启动,杀掉进程后又会重启,可用以下命令关掉:
1
2
sudo service mosquitto status
sudo service mosquitto stop
模板程序
单线程
单端单收sub.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdio.h>
#include <mosquitto.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
void my_conn(struct mosquitto *mosq, void *userdata, int result)
{
if(!result){
mosquitto_subscribe(mosq, NULL, "#", QOS);
}else{
printf("Connect failed!\n");
}
}
void my_subs(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_mess(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("%s %s\n", message->topic, (char *)(message->payload));
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
int main(int argc, char *argv[]){
printf("hello sub!\n");
struct mosquitto *mosq = NULL;
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, NULL);
mosquitto_connect_callback_set(mosq, my_conn);
mosquitto_subscribe_callback_set(mosq, my_subs);
mosquitto_message_callback_set(mosq, my_mess);
mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
while(1){
mosquitto_loop(mosq, -1, 1);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
单端单发pub.c
该程序使用:mypub <topic> <messages>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
#define RETAIN 0
#define MSG_MAX_SIZE 2048
int main(int argc, char *argv[]){
printf("hello pub!\n");
char *topic = malloc(256);
char *msg = malloc(MSG_MAX_SIZE);
memset(topic, 0, 256);
memset(msg, 0, MSG_MAX_SIZE);
if(argc >= 3){
memcpy(topic, argv[1], strlen(argv[1]));
memcpy(msg, argv[2], strlen(argv[2]));
}
else if(argc == 2){
memcpy(topic, "topic", 5); //默认值
memcpy(topic, argv[1], strlen(argv[1]));
}
else{
memcpy(topic, "topic", 5); //默认值
}
struct mosquitto *mosq = NULL;
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, NULL);
mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
mosquitto_publish(mosq, NULL, topic, strlen(msg), msg, QOS, RETAIN);
//如果是QoS大于0,就需要加循环处理接收
int i = 0;
while(i ++ < QOS){
mosquitto_loop(mosq, -1, 1);
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
free(msg);
return 0;
}
注意:以上提到的发布qos大于0,需要加mosquitto_loop对代理服务器的消息进行处理;可以通过打印log进行查看,如果不加,就收不到服务器的ack和comp。虽然很多情况下不加通讯也成功,但是失去了qos设定的意义。关于循环的次数,我这里是随便设定了一个,按需设定吧。
多线程
单端自发自收one.c
自收自发程序,主线程发,另一线程收:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>
#define PEERID "my_one"
#define SESSION true
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
#define RETAIN 0
struct mosquitto *mosq = NULL;
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("====>recv:%s %s\n", message->topic, (char *)message->payload);
}else{
printf("%s (null)\n", message->topic);
}
mosquitto_publish(mosq, NULL, "top/result", sizeof("loveresult"), "loveresult", QOS, RETAIN);
sleep(2);
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
if(!result){
mosquitto_subscribe(mosq, NULL, "top/love", QOS);
}else{
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("====>log:%s\n", str);
}
void mosq_init()
{
mosquitto_lib_init();
mosq = mosquitto_new(PEERID, SESSION, NULL);
if(!mosq){
fprintf(stderr, "Error: Out of memory.\n");
exit(-1);
}
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
//mosquitto_will_set(mosq,"top/exit", sizeof("livewill"), "livewill", QOS, RETAIN);
mosquitto_threaded_set(mosq, 1); // 多线程优化
}
void *pthread_sub(void *arg)
{
int toserver = -1;
int timeout = 0;
while(toserver){
toserver = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
if(toserver){
timeout++;
fprintf(stderr, "Unable to connect server [%d] times.\n", timeout);
if(timeout > 3){
fprintf(stderr, "Unable to connect server, exit.\n" );
pthread_exit(NULL);
}
sleep(10);
}
}
mosquitto_loop_forever(mosq, -1, 1);
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
pthread_t sub_id = 0;
mosq_init();
pthread_create(&sub_id, NULL, pthread_sub, NULL);
pthread_detach(sub_id);
while(1){
mosquitto_publish(mosq, NULL, "top/love", sizeof("love"), "love", QOS, RETAIN);
//如果是QoS大于0,就需要加循环处理接收
int i = 0;
while(i ++ < QOS){
mosquitto_loop(mosq, -1, 1);
}
}
return 0;
}
单端多收sub.c
多线程接收,每来一条消息都会都会新建一个线程进行处理,适合任务繁重的情况。
注意,这个慎用或者不能用,消息不密集才可使用,貌似一个进程仅能新建253个线程。但是像tcp那样有固定连接对象的可以使用这种方法,也就是每当客户端连接可以建一个线程,但是不能每个数据包来了都建立一个线程。
topic:top/love
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <mosquitto.h>
#define PEERID "my_sub_thread"
#define SESSION true
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
#define RETAIN 0
struct mosquitto *mosq = NULL;
void *pthread_sub_do(void *arg)
{
struct mosquitto_message *message = arg;
if(message->payloadlen){
printf("====>recv:%s %s\n", message->topic, (char *)(message->payload));
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
free(message->topic);
free(message->payload);
free(message);
pthread_exit(NULL);
}
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
struct mosquitto_message *trans = malloc(sizeof(struct mosquitto_message));
memcpy(trans, message, sizeof(struct mosquitto_message));
trans->topic = malloc(strlen(message->topic) + 1);
trans->payload = malloc(strlen(message->payload) + 1);
memcpy(trans->topic, message->topic, strlen(message->topic) + 1);
memcpy(trans->payload, message->payload, strlen(message->payload) + 1);
pthread_t sub_id = 0;
pthread_create(&sub_id, NULL, pthread_sub_do, (void *)trans);
pthread_detach(sub_id);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
if(!result){
mosquitto_subscribe(mosq, NULL, "top/love", QOS);
}else{
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i < qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("====>log:%s\n", str);
}
void mosq_init()
{
mosquitto_lib_init();
mosq = mosquitto_new(PEERID, SESSION, NULL);
if(!mosq){
fprintf(stderr, "Error: Out of memory.\n");
exit(-1);
}
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
//mosquitto_will_set(mosq,"top/exit", sizeof("livewill"), "livewill", QOS, RETAIN);
mosquitto_threaded_set(mosq, 1);
}
int main(int argc, char *argv[])
{
mosq_init();
mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
mosquitto_loop_forever(mosq, -1, 1);
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
单端多发pub.c
多线程发布,主线程mosquitto_loop处理回复,其他线程发布消息。
topic:top/love
payload:love
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>
#define PEERID "my_pub_thread"
#define SESSION true
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
#define RETAIN 0
struct mosquitto *mosq = NULL;
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("====>log:%s\n", str);
}
void mosq_init()
{
mosquitto_lib_init();
mosq = mosquitto_new(PEERID, SESSION, NULL);
if(!mosq){
fprintf(stderr, "Error: Out of memory.\n");
exit(-1);
}
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_threaded_set(mosq, 1); // 多线程优化
}
void *pthread_pub(void *arg)
{
while(1){
mosquitto_publish(mosq, NULL, "top/love", sizeof("love"), "love", QOS, RETAIN);
//如果是QoS大于0,就需要加循环处理接收
int i = 0;
while(i ++ < QOS){
mosquitto_loop(mosq, -1, 1);
}
}
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
int i = 0;
pthread_t *ppub_id = malloc(sizeof(pthread_t));
mosq_init();
mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
for(i = 0; i < 5; i++){ // 多个线程
pthread_t pub_id;
pthread_create(ppub_id + i, NULL, pthread_pub, NULL);
pthread_detach(*(ppub_id + i));
}
while(1){
mosquitto_loop(mosq, -1, 1); // 必须加这个,否则收不到代理回复
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
free(ppub_id);
ppub_id = NULL;
return 0;
}
多端单收nsub.c
多线程模拟多个sub客户端:
topic:top/love
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <mosquitto.h>
#define SESSION true
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
#define RETAIN 0
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("%s %s\n", message->topic, (char *)(message->payload));
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
if(!result){
mosquitto_subscribe(mosq, NULL, "top/love", QOS);
}else{
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i < qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("====>log:%s\n", str);
}
void *pthread_nsub(void *arg)
{
struct mosquitto *mosq = NULL;
mosq = mosquitto_new(NULL, SESSION, NULL);
if(!mosq){
fprintf(stderr, "Error: Out of memory.\n");
}
//mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
//mosquitto_will_set(mosq,"top/exit", sizeof("livewill"), "livewill", QOS, RETAIN);
//mosquitto_threaded_set(mosq, 1);//对于单个客户端而言是单线程的故不用加这句话
mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
mosquitto_loop_forever(mosq, -1, 1);
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
int i = 0;
mosquitto_lib_init();
for(i = 0; i < 5; i++){
pthread_t sub_id = 0;
pthread_create(&sub_id, NULL, pthread_nsub, NULL);
pthread_detach(sub_id);
}
while(1);
mosquitto_lib_cleanup();
return 0;
}
多端单发npub.c
多线程模拟多个发送客户端:
topic:top/love
payload:love
这里的npub(5客户端每个发1条),如nsub(5客户端订阅)已连接,那么broker会发送25条给nsub。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>
#define SESSION true
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define QOS 1
#define RETAIN 0
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("====>log:%s\n", str);
}
void *pthread_npub(void *arg)
{
struct mosquitto *mosq = NULL;
mosq = mosquitto_new(NULL, SESSION, NULL);
if(!mosq){
fprintf(stderr, "Error: Out of memory.\n");
}
//mosquitto_log_callback_set(mosq, my_log_callback);
//mosquitto_threaded_set(mosq, 1);//对于单个客户端而言是单线程的故不用加这句话
mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
mosquitto_publish(mosq, NULL, "top/love", sizeof("love"), "love", QOS, RETAIN);
//如果是QoS大于0,就需要加循环处理接收
int i = 0;
while(i ++ < QOS){
mosquitto_loop(mosq, -1, 1);
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
int i = 0;
pthread_t *ppub_id = malloc(5 * sizeof(pthread_t));
mosquitto_lib_init();
for(i = 0; i < 5; i++){
pthread_create(ppub_id + i, NULL, pthread_npub, NULL);
}
for(i = 0; i < 5; i++){
pthread_join(*(ppub_id + i), NULL);
}
mosquitto_lib_cleanup();
free(ppub_id);
ppub_id = NULL;
return 0;
}