diff --git a/CHANGELOG.md b/CHANGELOG.md index a56ff7584..d2c63a54b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file. - Added support for Zero-Cross Dimmer on ESP32, changed calculation on EPS8266, high resolution control e.g. Solar: `ZCDimmerSet` - ESP32: Enhanced Shutterbuttons functionality to control tilt position, additionally incr/decr possible to position and tilt. - ESP32: `Shuttersetup` for "Shelly 2.5 pro" automatic calibration and setup (experimental) +- Berry add `tcpclientasync` class for non-blocking TCP client ### Breaking Changed diff --git a/lib/libesp32/berry/default/be_modtab.c b/lib/libesp32/berry/default/be_modtab.c index 5051dd948..c4f548031 100644 --- a/lib/libesp32/berry/default/be_modtab.c +++ b/lib/libesp32/berry/default/be_modtab.c @@ -220,6 +220,7 @@ be_extern_native_class(md5); be_extern_native_class(udp); be_extern_native_class(webclient); be_extern_native_class(tcpclient); +be_extern_native_class(tcpclientasync); be_extern_native_class(tcpserver); be_extern_native_class(energy_struct); // LVGL core classes @@ -267,6 +268,7 @@ BERRY_LOCAL bclass_array be_class_table = { &be_native_class(udp), &be_native_class(webclient), &be_native_class(tcpclient), + &be_native_class(tcpclientasync), #endif // USE_WEBCLIENT #ifdef USE_BERRY_TCPSERVER &be_native_class(tcpserver), diff --git a/lib/libesp32/berry_mapping/src/be_mapping.h b/lib/libesp32/berry_mapping/src/be_mapping.h index 0b0ef2c1e..9876278f3 100644 --- a/lib/libesp32/berry_mapping/src/be_mapping.h +++ b/lib/libesp32/berry_mapping/src/be_mapping.h @@ -88,6 +88,7 @@ extern "C" { void be_raisef(bvm *vm, const char *except, const char *msg, ...); +extern void be_map_insert_nil(bvm *vm, const char *key); extern void be_map_insert_int(bvm *vm, const char *key, bint value); extern void be_map_insert_bool(bvm *vm, const char *key, bbool value); extern void be_map_insert_real(bvm *vm, const char *key, breal value); diff --git a/lib/libesp32/berry_mapping/src/be_mapping_utils.c b/lib/libesp32/berry_mapping/src/be_mapping_utils.c index e3a2cc6e2..51f7388ba 100644 --- a/lib/libesp32/berry_mapping/src/be_mapping_utils.c +++ b/lib/libesp32/berry_mapping/src/be_mapping_utils.c @@ -5,6 +5,14 @@ /*********************************************************************************************\ * Helper functions to create a map with single line calls \*********************************************************************************************/ +/* Insert an nil to a key */ +void be_map_insert_nil(bvm *vm, const char *key) +{ + be_pushstring(vm, key); + be_pushnil(vm); + be_data_insert(vm, -3); + be_pop(vm, 2); +} /* Insert an int to a key */ void be_map_insert_int(bvm *vm, const char *key, bint value) { diff --git a/lib/libesp32/berry_matter/solidify_all.be b/lib/libesp32/berry_matter/solidify_all.be index 81f392870..2d9bf3f45 100755 --- a/lib/libesp32/berry_matter/solidify_all.be +++ b/lib/libesp32/berry_matter/solidify_all.be @@ -14,7 +14,7 @@ import sys sys.path().push('src/embedded') # allow to import from src/embedded # globals that need to exist to make compilation succeed -var globs = "path,ctypes_bytes_dyn,tasmota,ccronexpr,gpio,light,webclient,load,MD5,lv,light_state,udp," +var globs = "path,ctypes_bytes_dyn,tasmota,ccronexpr,gpio,light,webclient,load,MD5,lv,light_state,udp,tcpclientasync," "lv_clock,lv_clock_icon,lv_signal_arcs,lv_signal_bars,lv_wifi_arcs_icon,lv_wifi_arcs," "lv_wifi_bars_icon,lv_wifi_bars," "_lvgl," diff --git a/lib/libesp32/berry_tasmota/src/be_tcpclientasyc_lib.c b/lib/libesp32/berry_tasmota/src/be_tcpclientasyc_lib.c new file mode 100644 index 000000000..16d5f7240 --- /dev/null +++ b/lib/libesp32/berry_tasmota/src/be_tcpclientasyc_lib.c @@ -0,0 +1,46 @@ +/******************************************************************** + * TCP client non-blocking (async) + * + * + *******************************************************************/ +#include "be_constobj.h" + +#ifdef USE_WEBCLIENT + +extern int wc_tcpasync_init(bvm *vm); +extern int wc_tcpasync_deinit(bvm *vm); + +extern int wc_tcpasync_connect(bvm *vm); +extern int wc_tcpasync_connected(bvm *vm); +extern int wc_tcpasync_listening(bvm *vm); +extern int wc_tcpasync_info(bvm *vm); +extern int wc_tcpasync_close(bvm *vm); +extern int wc_tcpasync_available(bvm *vm); + +extern int wc_tcpasync_write(bvm *vm); +extern int wc_tcpasync_read(bvm *vm); +extern int wc_tcpasync_readbytes(bvm *vm); + +#include "be_fixed_be_class_tcpclientasync.h" + +/* @const_object_info_begin + +class be_class_tcpclientasync (scope: global, name: tcpclientasync) { + .p, var + init, func(wc_tcpasync_init) + deinit, func(wc_tcpasync_deinit) + + connect, func(wc_tcpasync_connect) + connected, func(wc_tcpasync_connected) + listening, func(wc_tcpasync_listening) + info, func(wc_tcpasync_info) + close, func(wc_tcpasync_close) + available, func(wc_tcpasync_available) + + write, func(wc_tcpasync_write) + read, func(wc_tcpasync_read) + readbytes, func(wc_tcpasync_readbytes) +} +@const_object_info_end */ + +#endif // USE_WEBCLIENT diff --git a/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_tcpclientasync.ino b/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_tcpclientasync.ino new file mode 100644 index 000000000..8c6fa9841 --- /dev/null +++ b/tasmota/tasmota_xdrv_driver/xdrv_52_3_berry_tcpclientasync.ino @@ -0,0 +1,534 @@ +/* + xdrv_52_3_berry_tcpclientasync.ino - Berry scripting language, TCP client non-blocking + + Copyright (C) 2021 Stephan Hadinger, Berry language by Guan Wenliang https://github.com/Skiars/berry + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +// also includes tcp_client + +#ifdef USE_BERRY + +#ifdef USE_WEBCLIENT + +#include +// #include "be_sys.h" +#include +#include + +#define T_IN6_IS_ADDR_V4MAPPED(a) \ + ((((__const uint32_t *) (a))[0] == 0) \ + && (((__const uint32_t *) (a))[1] == 0) \ + && (((__const uint32_t *) (a))[2] == htonl (0xffff))) + +enum class AsyncTCPState { + INPROGRESS, + CONNECTED, + REFUSED, + CLOSED +}; + +class AsyncTCPClient { +public: + + AsyncTCPClient() : sockfd(-1), state(AsyncTCPState::INPROGRESS), _timeout_ms(1), local_port(-1) { + + } + + ~AsyncTCPClient() { + this->stop(); + } + + void stop() { + if (sockfd > 0) { + close(sockfd); + } + sockfd = -1; + state = AsyncTCPState::CLOSED; + } + + int connect(IPAddress ip, uint16_t port) { + struct sockaddr_storage serveraddr = {}; + sockfd = -1; + + if (state != AsyncTCPState::INPROGRESS) { + stop(); + state = AsyncTCPState::INPROGRESS; // reset state + } + + if (ip.type() == IPv6) { + struct sockaddr_in6 *tmpaddr = (struct sockaddr_in6 *)&serveraddr; + sockfd = socket(AF_INET6, SOCK_STREAM, 0); + tmpaddr->sin6_family = AF_INET6; + memcpy(tmpaddr->sin6_addr.un.u8_addr, &ip[0], 16); + tmpaddr->sin6_port = htons(port); + } else { + struct sockaddr_in *tmpaddr = (struct sockaddr_in *)&serveraddr; + sockfd = socket(AF_INET, SOCK_STREAM, 0); + tmpaddr->sin_family = AF_INET; + tmpaddr->sin_addr.s_addr = ip; + tmpaddr->sin_port = htons(port); + } + if (sockfd < 0) { + AddLog(LOG_LEVEL_DEBUG, "BRY: Error: socket: %d", errno); + return 0; + } + // berry_log_C("sockfd=%i", sockfd); + fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK ); // set non-blocking + + fd_set fdset; + FD_ZERO(&fdset); + FD_SET(sockfd, &fdset); + + int res = lwip_connect(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)); + if (res < 0 && errno != EINPROGRESS) { + AddLog(LOG_LEVEL_INFO, "BRY: Error: connect on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); + close(sockfd); + return 0; + } + // berry_log_C("lwip_connect res=%i errno=%i (EINPROGRESS=%i)", res, errno, EINPROGRESS); + return 1; + } + + void _update_connection_state(void) { + if (state != AsyncTCPState::INPROGRESS) { + return; + } + + fd_set fdset; + FD_ZERO(&fdset); + FD_SET(sockfd, &fdset); + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + int res = ::select(sockfd + 1, nullptr, &fdset, nullptr, &tv); + if (res < 0) { + AddLog(LOG_LEVEL_DEBUG, "BRY: select on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); + stop(); + state = AsyncTCPState::REFUSED; + return; + } else if (res == 0) { + // AddLog(LOG_LEVEL_DEBUG, "BRY: select returned due to timeout %d ms for fd %d", _timeout_ms, sockfd); + // stop(); + // state = AsyncTCPState::TIMEOUT; + return; + } else { + int sockerr; + socklen_t len = (socklen_t)sizeof(int); + res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &sockerr, &len); + + if (res < 0) { + AddLog(LOG_LEVEL_DEBUG, "BRY: getsockopt on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); + stop(); + state = AsyncTCPState::REFUSED; + return; + } + + if (sockerr != 0) { + AddLog(LOG_LEVEL_DEBUG, "BRY: socket error on fd %d, errno: %d, \"%s\"", sockfd, sockerr, strerror(sockerr)); + stop(); + state = AsyncTCPState::REFUSED; + return; + } + + state = AsyncTCPState::CONNECTED; + return; + } + } + + void _update_connected(void) { + if (state == AsyncTCPState::INPROGRESS) { + this->_update_connection_state(); // force an update + } + if (state == AsyncTCPState::CONNECTED) { + uint8_t dummy; + int res = recv(sockfd, &dummy, 0, MSG_DONTWAIT); + // recv only sets errno if res is <= 0 + if (res <= 0){ + switch (errno) { + case EWOULDBLOCK: + case ENOENT: //caused by vfs + // connected, don't change status + break; + case ENOTCONN: + case EPIPE: + case ECONNRESET: + case ECONNREFUSED: + case ECONNABORTED: + stop(); + break; + default: + // AddLog(LOG_LEVEL_DEBUG, "BRY: tcpclientasync unexpected: RES: %d, ERR: %d", res, errno); + stop(); + break; + } + } else { + // connected do nothing + } + } + } + + // connected() + // returns: + // 0: still in progres, unknown + // 1: connected + // -1: refused + int connected(void) { + _update_connected(); + switch (state) { + case AsyncTCPState::CONNECTED: return 1; break; + case AsyncTCPState::INPROGRESS: return 0; break; + // all other values indicated that the connection is down + case AsyncTCPState::REFUSED: + case AsyncTCPState::CLOSED: + default: + return -1; + break; + } + } + + size_t available(void) { + _update_connected(); + if (state == AsyncTCPState::CONNECTED) { + size_t bytes_available = 0; + int res = ioctl(sockfd, FIONREAD, &bytes_available); + if (res >= 0) { + return bytes_available; + } else { + return 0; + } + } else { + return 0; + } + } + + bool listening(void) { + _update_connected(); + if (state == AsyncTCPState::CONNECTED) { + fd_set fdset; + FD_ZERO(&fdset); + FD_SET(sockfd, &fdset); + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + int res = ::select(sockfd + 1, nullptr, &fdset, nullptr, &tv); + if (res < 0) { + stop(); + return false; + } else if (res == 0) { + // connection is up but not ready to send + return false; + } else { + // we assume that since we got select answering it's good + return true; + } + } + return false; + } + + size_t write(const char *buffer, size_t size) { + _update_connected(); + if (state == AsyncTCPState::CONNECTED) { + // is the socket ready for writing + fd_set set; + struct timeval tv; + FD_ZERO(&set); // empties the set + FD_SET(sockfd, &set); // adds FD to the set + tv.tv_sec = 0; + tv.tv_usec = 0; // non-blocking + + if (select(sockfd + 1, NULL, &set, NULL, &tv) < 0) { + return 0; // error TODO close connection? + } + + if (FD_ISSET(sockfd, &set)) { + // ready for writes + int res = send(sockfd, (void*) buffer, size, MSG_DONTWAIT); + if (res >= 0) { + return res; + } else { // res < 0 + if (errno != EAGAIN) { + AddLog(LOG_LEVEL_DEBUG, "BRY: tcpclientasync send fail on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); + stop(); + return 0; + } + } + } else { + return 0; // not ready for writes + } + + } else { + return 0; + } + return 0; + } + + size_t read(uint8_t* buf, size_t size) { + _update_connected(); + if (state == AsyncTCPState::CONNECTED) { + if (size > 1436) { size = 1436; } // let's keep it reasonable + + int res = recv(sockfd, buf, size, MSG_DONTWAIT); + if (res < 0) { // check error + if (errno != EWOULDBLOCK) { // something went wrong + stop(); + return 0; + } + } + return res; + } + return 0; + } + + void update_local_addr_port(void) { + local_port = -1; // default to unknwon + if (sockfd > 0) { + struct sockaddr_storage local_address; + socklen_t addr_size = sizeof(local_address); + // getpeername(fd, (struct sockaddr*)&addr, &len); + int res = getsockname(sockfd, (struct sockaddr*)&local_address, &addr_size); + if (res != 0) { return; } + + // IPv4 socket, old way + if (((struct sockaddr*)&local_address)->sa_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&local_address; + local_port = ntohs(s->sin_port); + local_addr = IPAddress((uint32_t)(s->sin_addr.s_addr)); + // return IPAddress((uint32_t)(s->sin_addr.s_addr)); + } + + // IPv6, but it might be IPv4 mapped address + if (((struct sockaddr*)&local_address)->sa_family == AF_INET6) { + struct sockaddr_in6 *saddr6 = (struct sockaddr_in6 *)&local_address; + local_port = ntohs(saddr6->sin6_port); + if (T_IN6_IS_ADDR_V4MAPPED(saddr6->sin6_addr.un.u32_addr)) { + local_addr = IPAddress(IPv4, (uint8_t*)saddr6->sin6_addr.s6_addr+12); + // return IPAddress(IPv4, (uint8_t*)saddr6->sin6_addr.s6_addr+12); + } else { + local_addr = IPAddress(IPv6, (uint8_t*)(saddr6->sin6_addr.s6_addr)); + // return IPAddress(IPv6, (uint8_t*)(saddr6->sin6_addr.s6_addr)); + } + } + } + } + +public: + int sockfd; + AsyncTCPState state; + uint32_t _timeout_ms; + String remota_addr; // address in numerical format (after DNS resolution), either IPv4 or IPv6 + uint16_t remote_port; // remote port number + IPAddress local_addr; + int32_t local_port; // -1 if unknown or invalid +}; + +/*********************************************************************************************\ + * Native functions mapped to Berry functions + * + * +\*********************************************************************************************/ +extern "C" { + // same but using `.p` argument + AsyncTCPClient * wc_gettcpclientasync_p(struct bvm *vm) { + be_getmember(vm, 1, ".p"); + void *p = be_tocomptr(vm, -1); + be_pop(vm, 1); + return (AsyncTCPClient*) p; + } + + int32_t wc_tcpasync_init(struct bvm *vm); + int32_t wc_tcpasync_init(struct bvm *vm) { + int32_t argc = be_top(vm); + AsyncTCPClient * wcl; + if (argc >= 2 && be_iscomptr(vm, 2)) { + wcl = (AsyncTCPClient*) be_tocomptr(vm, 2); + } else { + wcl = new AsyncTCPClient(); + } + be_pushcomptr(vm, (void*) wcl); + be_setmember(vm, 1, ".p"); + be_return_nil(vm); + } + + int32_t wc_tcpasync_deinit(struct bvm *vm); + int32_t wc_tcpasync_deinit(struct bvm *vm) { + AsyncTCPClient * wcl = wc_gettcpclientasync_p(vm); + if (wcl != nullptr) { delete wcl; } + be_setmember(vm, 1, ".p"); + be_return_nil(vm); + } + + // tcp.connect(address:string, port:int) -> bool + int32_t wc_tcpasync_connect(struct bvm *vm); + int32_t wc_tcpasync_connect(struct bvm *vm) { + int32_t argc = be_top(vm); + if (argc >= 3 && be_isstring(vm, 2) && be_isint(vm, 3)) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + const char * address = be_tostring(vm, 2); + int32_t port = be_toint(vm, 3); + // open connection + IPAddress ipaddr; + bool success = WifiHostByName(address, ipaddr); + if (success) { + success = tcp->connect(ipaddr, port); + } else { + AddLog(LOG_LEVEL_DEBUG, "BRY: tcpclientasync.connect couldn't resolve '%s'", address); + } + be_pushbool(vm, success); + be_return(vm); /* return self */ + } + be_raise(vm, "attribute_error", NULL); + } + + // tcp.close(void) -> nil + int32_t wc_tcpasync_close(struct bvm *vm); + int32_t wc_tcpasync_close(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + tcp->stop(); + be_return_nil(vm); + } + + // tcp.available(void) -> int + int32_t wc_tcpasync_available(struct bvm *vm); + int32_t wc_tcpasync_available(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + int32_t available = tcp->available(); + be_pushint(vm, available); + be_return(vm); + } + + // tcp.connected(void) -> bool or nil + // new value: returns `nil` if the status is still unknown + int32_t wc_tcpasync_connected(struct bvm *vm); + int32_t wc_tcpasync_connected(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + int res = tcp->connected(); + if (res) { + be_pushbool(vm, res > 0); + } else { + be_pushnil(vm); + } + be_return(vm); /* return code */ + } + + // tcp.listening(void) -> bool + // is the socket ready for sending + int32_t wc_tcpasync_listening(struct bvm *vm); + int32_t wc_tcpasync_listening(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + bool res = tcp->listening(); + be_pushbool(vm, res); + be_return(vm); /* return code */ + } + + // tcp.info(void) -> map + // get information about the socket + int32_t wc_tcpasync_info(struct bvm *vm); + int32_t wc_tcpasync_info(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + be_newobject(vm, "map"); + int connected = tcp->connected(); + if (connected == 0) { + be_map_insert_nil(vm, "connected"); + } else { + be_map_insert_bool(vm, "connected", connected > 0); + } + if (connected >= 0) { + be_map_insert_bool(vm, "available", tcp->available()); + be_map_insert_bool(vm, "listening", tcp->listening()); + be_map_insert_int(vm, "fd", tcp->sockfd); + tcp->update_local_addr_port(); + if (tcp->local_port > 0) { + be_map_insert_int(vm, "local_port", tcp->local_port); + be_map_insert_str(vm, "local_addr", tcp->local_addr.toString().c_str()); + } + } + be_pop(vm, 1); + be_return(vm); + } + + // tcp.write(bytes | string) -> int + int32_t wc_tcpasync_write(struct bvm *vm); + int32_t wc_tcpasync_write(struct bvm *vm) { + int32_t argc = be_top(vm); + if (argc >= 2 && (be_isstring(vm, 2) || be_isbytes(vm, 2))) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + const char * buf = nullptr; + size_t buf_len = 0; + if (be_isstring(vm, 2)) { // string + buf = be_tostring(vm, 2); + buf_len = strlen(buf); + } else { // bytes + buf = (const char*) be_tobytes(vm, 2, &buf_len); + } + size_t bw = tcp->write(buf, buf_len); + be_pushint(vm, bw); + be_return(vm); /* return code */ + } + be_raise(vm, kTypeError, nullptr); + } + + // tcp.read() -> string + int32_t wc_tcpasync_read(struct bvm *vm); + int32_t wc_tcpasync_read(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + int32_t max_read = -1; // by default read as much as we can + if (be_top(vm) >= 2 && be_isint(vm, 2)) { + max_read = be_toint(vm, 2); + } + int32_t btr = tcp->available(); + if (btr <= 0) { + be_pushstring(vm, ""); + } else { + if ((max_read >= 0) && (btr > max_read)) { + btr = max_read; + } + char * buf = (char*) be_pushbuffer(vm, btr); + int32_t btr2 = tcp->read((uint8_t*) buf, btr); + be_pushnstring(vm, buf, btr2); + } + be_return(vm); /* return code */ + } + + // tcp.readbytes() -> bytes + int32_t wc_tcpasync_readbytes(struct bvm *vm); + int32_t wc_tcpasync_readbytes(struct bvm *vm) { + AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm); + int32_t max_read = -1; // by default read as much as we can + if (be_top(vm) >= 2 && be_isint(vm, 2)) { + max_read = be_toint(vm, 2); + } + int32_t btr = tcp->available(); + if (btr <= 0) { + be_pushbytes(vm, nullptr, 0); + } else { + if ((max_read >= 0) && (btr > max_read)) { + btr = max_read; + } + uint8_t * buf = (uint8_t*) be_pushbuffer(vm, btr); + int32_t btr2 = tcp->read(buf, btr); + be_pushbytes(vm, buf, btr2); + } + be_return(vm); /* return code */ + } + +} + +#endif // USE_WEBCLIENT +#endif // USE_BERRY