-
您的位置:
- 網(wǎng)站首頁(yè)
- > 物聯(lián)百科
- > 行業(yè)動(dòng)態(tài)
您的位置:
網(wǎng)關(guān)與云平臺(tái)之間的通信方式一般都是客戶指定的,就那么幾種(阿里云、華為云、騰訊云、亞馬遜AWS平臺(tái))。一般都要求網(wǎng)關(guān)與云平臺(tái)之間處于長(zhǎng)連接的狀態(tài),這樣云端的各種指令就可以隨時(shí)發(fā)送到網(wǎng)關(guān)。
與云平臺(tái)之間的 MQTT 連接
目前的幾大物聯(lián)網(wǎng)云平臺(tái),都提供了不同的接入方式。對(duì)于網(wǎng)關(guān)來(lái)說(shuō),應(yīng)用最多的就是 MQTT 接入。
我們知道,MQTT 只是一個(gè)協(xié)議而已,不同的編程語(yǔ)言中都有實(shí)現(xiàn),在 C 語(yǔ)言中也有好幾個(gè)實(shí)現(xiàn)。
在網(wǎng)關(guān)內(nèi)部,運(yùn)行著一個(gè)后臺(tái) deamon: MQTT Broker,其實(shí)就是 mosquitto 這個(gè)可執(zhí)行程序,它充當(dāng)著消息總線的功能。這里請(qǐng)大家注意:因?yàn)檫@個(gè)消息總線是運(yùn)行在嵌入式系統(tǒng)的內(nèi)部,接入總線的客戶端就是需要相互通信的那些進(jìn)程。這些進(jìn)程的數(shù)量是有限的,即使是一個(gè)比較復(fù)雜的系統(tǒng),最多十幾個(gè)進(jìn)程也就差不多了。因此,mosquitto 這個(gè)實(shí)現(xiàn)是完全可以支撐系統(tǒng)負(fù)載的。
那么,如果在云端部署一個(gè) MQTT Broker,理論上是可以直接使用 mosquitto 這個(gè)實(shí)現(xiàn)來(lái)作為消息總線的,但是你要評(píng)估接入的客戶端(也就是網(wǎng)關(guān))在一個(gè)什么樣的數(shù)量級(jí),考慮到并發(fā)的問(wèn)題,一定要做壓力測(cè)試。
對(duì)于后臺(tái)開(kāi)發(fā),我的經(jīng)驗(yàn)不多,不敢(也不能)多言,誤導(dǎo)大家就罪過(guò)了。不過(guò),對(duì)于一般的學(xué)習(xí)和測(cè)試來(lái)說(shuō),在云端直接部署 mosquitto 作為消息總線,是沒(méi)有問(wèn)題的。
Proc_Bridge 進(jìn)程:外部和內(nèi)部消息總線之間的橋接器
下面這張圖,說(shuō)明了 Proc_Bridge 進(jìn)程在這個(gè)模型中的作用:
從云平臺(tái)消息總線接收到的消息,需要轉(zhuǎn)發(fā)到內(nèi)部的消息總線;從內(nèi)部消息總線接收到的消息,需要轉(zhuǎn)發(fā)到云平臺(tái)的消息總線;
如果用 mosquitto 來(lái)實(shí)現(xiàn),應(yīng)該如何來(lái)實(shí)現(xiàn)呢?
1. mosquitto 的 API 接口
mosquitto 這個(gè)實(shí)現(xiàn)是基于回調(diào)函數(shù)的機(jī)制來(lái)運(yùn)行的,例如:
// 連接成功時(shí)的回調(diào)函數(shù)void my_connect_callback(struct mosquitto *mosq, void *obj, int rc){ // ...}
// 連接失敗時(shí)的回調(diào)函數(shù)void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result){ // ...}
// 接收到消息時(shí)的回調(diào)函數(shù)void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message){ // ..}
int main(){ // 其他代碼 // ... // 創(chuàng)建一個(gè) mosquitto 對(duì)象 struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL); // 注冊(cè)回調(diào)函數(shù) mosquitto_connect_callback_set(g_mosq, my_connect_callback); mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback); mosquitto_message_callback_set(g_mosq, my_message_callback); // 這里還有其他的回調(diào)函數(shù)設(shè)置 // 開(kāi)始連接到消息總線 mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60); while(1) { int rc = mosquitto_loop(g_mosq, -1, 1); if (rc) { printf("mqtt_portal: mosquitto_loop rc = %d ", rc); sleep(1); mosquitto_reconnect(g_mosq); } } mosquitto_destroy(g_mosq); mosquitto_lib_cleanup(); return 0;}
以上代碼就是一個(gè) mosquitto 客戶端的最簡(jiǎn)代碼了,使用回調(diào)函數(shù)的機(jī)制,讓程序的開(kāi)發(fā)非常簡(jiǎn)單。
mosquitto 把底層的細(xì)節(jié)問(wèn)題都幫助我們處理了,只要我們注冊(cè)的函數(shù)被調(diào)用了,就說(shuō)明發(fā)生了我們感興趣的事件。
這樣的回調(diào)機(jī)制在各種開(kāi)源軟件中使用的比較多,比如:glib 里的定時(shí)器、libevent通訊處理、libmodbus 里的數(shù)據(jù)處理、linux 內(nèi)核中的驅(qū)動(dòng)開(kāi)發(fā)和定時(shí)器,都是這個(gè)套路,一通百通!
在網(wǎng)關(guān)中的每個(gè)進(jìn)程,只需要添加上面這部分代碼,就可以掛載到消息總線上,從而可以與其它進(jìn)程進(jìn)行收發(fā)數(shù)據(jù)了。
2. 利用 UserData 指針,實(shí)現(xiàn)多個(gè) MQTT 連接
上面的實(shí)例僅僅是連接到一個(gè)消息總線上,對(duì)于一個(gè)普通的進(jìn)程來(lái)說(shuō),達(dá)到了通信的目的。
但是對(duì)于 Proc_Bridge 進(jìn)程來(lái)說(shuō),還沒(méi)有達(dá)到目的,因?yàn)檫@個(gè)進(jìn)程處于橋接的位置,需要同時(shí)連接到遠(yuǎn)程和本地這兩個(gè)消息總線上。那么應(yīng)該如何實(shí)現(xiàn)呢?
看一下 mosquitto_new 這個(gè)函數(shù)的簽名:
* obj - A user pointer that will be passed as an argument to any * callbacks that are specified.最后一個(gè)參數(shù)的作用是:可以設(shè)置一個(gè)用戶自己的數(shù)據(jù)(作為指針傳入),那么mosquitto 在回調(diào)我們的注冊(cè)的任何一個(gè)函數(shù)時(shí),都會(huì)把這個(gè)指針傳入。
因此,我們可以利用這個(gè)參數(shù)來(lái)區(qū)分這個(gè)連接是遠(yuǎn)程連接?還是本地連接。libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
所以,我們可以定義一個(gè)結(jié)構(gòu)體變量,把一個(gè) MQTT 連接的所有信息都記錄在這里,然后注冊(cè)給 mosquitto。當(dāng) mosquitto 回調(diào)函數(shù)時(shí),把這個(gè)結(jié)構(gòu)體變量的指針回傳給我們,這樣就拿到了這個(gè)連接的所有數(shù)據(jù),在某種程度上來(lái)說(shuō),這也是一種面向?qū)ο蟮乃枷搿?/p>
// 從來(lái)表示一個(gè) MQTT 連接的結(jié)構(gòu)體typedef struct{ char *id; char *name; char *pw; char *host; int port; pthread_t tHandle; struct mosquitto *mosq; int mqtt_num;}MQData;
完整的代碼已經(jīng)放到網(wǎng)盤(pán)里了,為了讓你先從原理上看明白,我把關(guān)鍵幾個(gè)地方的代碼貼在這里:
// 分配結(jié)構(gòu)體變量MQData userData = (MQData *)malloc(sizeof(MQData));
// 設(shè)置屬于這里連接的參數(shù): id, name 等等
// 創(chuàng)建 mosquitto 對(duì)象時(shí),傳入 userData。struct mosquitto *mosq = mosquitto_new(userData->id, true, userData);
// 在回調(diào)函數(shù)中,把 obj 指針前轉(zhuǎn)成 MQData 指針static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message){ MQData *userData = (MQData *)obj; // 此時(shí)就可以根據(jù) userData 指針中的內(nèi)容分辨出這是哪一個(gè)鏈接了}
另外一個(gè)問(wèn)題:不知道你是否注意到示例中的 mosquitto_loop() 這個(gè)函數(shù)?這個(gè)函數(shù)需要放在 while 死循環(huán)中不停的調(diào)用,才能出發(fā) mosuiqtto 內(nèi)部的事件。(其實(shí)在 mosuiqtto 中,還提供了另一個(gè)簡(jiǎn)化的函數(shù) mosquitto_loop_forever)。
也就是說(shuō):在每個(gè)連接中,需要持續(xù)的觸發(fā) mosquitto 底層的事件,才能讓消息系統(tǒng)順利的收發(fā)。因此,在示例代碼中,使用兩個(gè)線程分別連接到云平臺(tái)的總線和內(nèi)部的總線。