Berry add tcpclientasync class for non-blocking TCP client (#18584)

This commit is contained in:
s-hadinger 2023-05-05 15:13:22 +02:00 committed by GitHub
parent 07a9f33ae7
commit 069278f966
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 593 additions and 1 deletions

View File

@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
- Added support for Zero-Cross Dimmer on ESP32, changed calculation on EPS8266, high resolution control e.g. Solar: `ZCDimmerSet` - Added support for Zero-Cross Dimmer on ESP32, changed calculation on EPS8266, high resolution control e.g. Solar: `ZCDimmerSet`
- ESP32: Enhanced Shutterbuttons functionality to control tilt position, additionally incr/decr possible to position and tilt. - ESP32: Enhanced Shutterbuttons functionality to control tilt position, additionally incr/decr possible to position and tilt.
- ESP32: `Shuttersetup` for "Shelly 2.5 pro" automatic calibration and setup (experimental) - ESP32: `Shuttersetup` for "Shelly 2.5 pro" automatic calibration and setup (experimental)
- Berry add `tcpclientasync` class for non-blocking TCP client
### Breaking Changed ### Breaking Changed

View File

@ -220,6 +220,7 @@ be_extern_native_class(md5);
be_extern_native_class(udp); be_extern_native_class(udp);
be_extern_native_class(webclient); be_extern_native_class(webclient);
be_extern_native_class(tcpclient); be_extern_native_class(tcpclient);
be_extern_native_class(tcpclientasync);
be_extern_native_class(tcpserver); be_extern_native_class(tcpserver);
be_extern_native_class(energy_struct); be_extern_native_class(energy_struct);
// LVGL core classes // LVGL core classes
@ -267,6 +268,7 @@ BERRY_LOCAL bclass_array be_class_table = {
&be_native_class(udp), &be_native_class(udp),
&be_native_class(webclient), &be_native_class(webclient),
&be_native_class(tcpclient), &be_native_class(tcpclient),
&be_native_class(tcpclientasync),
#endif // USE_WEBCLIENT #endif // USE_WEBCLIENT
#ifdef USE_BERRY_TCPSERVER #ifdef USE_BERRY_TCPSERVER
&be_native_class(tcpserver), &be_native_class(tcpserver),

View File

@ -88,6 +88,7 @@ extern "C" {
void be_raisef(bvm *vm, const char *except, const char *msg, ...); void be_raisef(bvm *vm, const char *except, const char *msg, ...);
extern void be_map_insert_nil(bvm *vm, const char *key);
extern void be_map_insert_int(bvm *vm, const char *key, bint value); extern void be_map_insert_int(bvm *vm, const char *key, bint value);
extern void be_map_insert_bool(bvm *vm, const char *key, bbool value); extern void be_map_insert_bool(bvm *vm, const char *key, bbool value);
extern void be_map_insert_real(bvm *vm, const char *key, breal value); extern void be_map_insert_real(bvm *vm, const char *key, breal value);

View File

@ -5,6 +5,14 @@
/*********************************************************************************************\ /*********************************************************************************************\
* Helper functions to create a map with single line calls * Helper functions to create a map with single line calls
\*********************************************************************************************/ \*********************************************************************************************/
/* Insert an nil to a key */
void be_map_insert_nil(bvm *vm, const char *key)
{
be_pushstring(vm, key);
be_pushnil(vm);
be_data_insert(vm, -3);
be_pop(vm, 2);
}
/* Insert an int to a key */ /* Insert an int to a key */
void be_map_insert_int(bvm *vm, const char *key, bint value) void be_map_insert_int(bvm *vm, const char *key, bint value)
{ {

View File

@ -14,7 +14,7 @@ import sys
sys.path().push('src/embedded') # allow to import from src/embedded sys.path().push('src/embedded') # allow to import from src/embedded
# globals that need to exist to make compilation succeed # globals that need to exist to make compilation succeed
var globs = "path,ctypes_bytes_dyn,tasmota,ccronexpr,gpio,light,webclient,load,MD5,lv,light_state,udp," var globs = "path,ctypes_bytes_dyn,tasmota,ccronexpr,gpio,light,webclient,load,MD5,lv,light_state,udp,tcpclientasync,"
"lv_clock,lv_clock_icon,lv_signal_arcs,lv_signal_bars,lv_wifi_arcs_icon,lv_wifi_arcs," "lv_clock,lv_clock_icon,lv_signal_arcs,lv_signal_bars,lv_wifi_arcs_icon,lv_wifi_arcs,"
"lv_wifi_bars_icon,lv_wifi_bars," "lv_wifi_bars_icon,lv_wifi_bars,"
"_lvgl," "_lvgl,"

View File

@ -0,0 +1,46 @@
/********************************************************************
* TCP client non-blocking (async)
*
*
*******************************************************************/
#include "be_constobj.h"
#ifdef USE_WEBCLIENT
extern int wc_tcpasync_init(bvm *vm);
extern int wc_tcpasync_deinit(bvm *vm);
extern int wc_tcpasync_connect(bvm *vm);
extern int wc_tcpasync_connected(bvm *vm);
extern int wc_tcpasync_listening(bvm *vm);
extern int wc_tcpasync_info(bvm *vm);
extern int wc_tcpasync_close(bvm *vm);
extern int wc_tcpasync_available(bvm *vm);
extern int wc_tcpasync_write(bvm *vm);
extern int wc_tcpasync_read(bvm *vm);
extern int wc_tcpasync_readbytes(bvm *vm);
#include "be_fixed_be_class_tcpclientasync.h"
/* @const_object_info_begin
class be_class_tcpclientasync (scope: global, name: tcpclientasync) {
.p, var
init, func(wc_tcpasync_init)
deinit, func(wc_tcpasync_deinit)
connect, func(wc_tcpasync_connect)
connected, func(wc_tcpasync_connected)
listening, func(wc_tcpasync_listening)
info, func(wc_tcpasync_info)
close, func(wc_tcpasync_close)
available, func(wc_tcpasync_available)
write, func(wc_tcpasync_write)
read, func(wc_tcpasync_read)
readbytes, func(wc_tcpasync_readbytes)
}
@const_object_info_end */
#endif // USE_WEBCLIENT

View File

@ -0,0 +1,534 @@
/*
xdrv_52_3_berry_tcpclientasync.ino - Berry scripting language, TCP client non-blocking
Copyright (C) 2021 Stephan Hadinger, Berry language by Guan Wenliang https://github.com/Skiars/berry
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// also includes tcp_client
#ifdef USE_BERRY
#ifdef USE_WEBCLIENT
#include <berry.h>
// #include "be_sys.h"
#include <lwip/sockets.h>
#include <lwip/netdb.h>
#define T_IN6_IS_ADDR_V4MAPPED(a) \
((((__const uint32_t *) (a))[0] == 0) \
&& (((__const uint32_t *) (a))[1] == 0) \
&& (((__const uint32_t *) (a))[2] == htonl (0xffff)))
enum class AsyncTCPState {
INPROGRESS,
CONNECTED,
REFUSED,
CLOSED
};
class AsyncTCPClient {
public:
AsyncTCPClient() : sockfd(-1), state(AsyncTCPState::INPROGRESS), _timeout_ms(1), local_port(-1) {
}
~AsyncTCPClient() {
this->stop();
}
void stop() {
if (sockfd > 0) {
close(sockfd);
}
sockfd = -1;
state = AsyncTCPState::CLOSED;
}
int connect(IPAddress ip, uint16_t port) {
struct sockaddr_storage serveraddr = {};
sockfd = -1;
if (state != AsyncTCPState::INPROGRESS) {
stop();
state = AsyncTCPState::INPROGRESS; // reset state
}
if (ip.type() == IPv6) {
struct sockaddr_in6 *tmpaddr = (struct sockaddr_in6 *)&serveraddr;
sockfd = socket(AF_INET6, SOCK_STREAM, 0);
tmpaddr->sin6_family = AF_INET6;
memcpy(tmpaddr->sin6_addr.un.u8_addr, &ip[0], 16);
tmpaddr->sin6_port = htons(port);
} else {
struct sockaddr_in *tmpaddr = (struct sockaddr_in *)&serveraddr;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
tmpaddr->sin_family = AF_INET;
tmpaddr->sin_addr.s_addr = ip;
tmpaddr->sin_port = htons(port);
}
if (sockfd < 0) {
AddLog(LOG_LEVEL_DEBUG, "BRY: Error: socket: %d", errno);
return 0;
}
// berry_log_C("sockfd=%i", sockfd);
fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK ); // set non-blocking
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(sockfd, &fdset);
int res = lwip_connect(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
if (res < 0 && errno != EINPROGRESS) {
AddLog(LOG_LEVEL_INFO, "BRY: Error: connect on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
close(sockfd);
return 0;
}
// berry_log_C("lwip_connect res=%i errno=%i (EINPROGRESS=%i)", res, errno, EINPROGRESS);
return 1;
}
void _update_connection_state(void) {
if (state != AsyncTCPState::INPROGRESS) {
return;
}
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(sockfd, &fdset);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
int res = ::select(sockfd + 1, nullptr, &fdset, nullptr, &tv);
if (res < 0) {
AddLog(LOG_LEVEL_DEBUG, "BRY: select on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
stop();
state = AsyncTCPState::REFUSED;
return;
} else if (res == 0) {
// AddLog(LOG_LEVEL_DEBUG, "BRY: select returned due to timeout %d ms for fd %d", _timeout_ms, sockfd);
// stop();
// state = AsyncTCPState::TIMEOUT;
return;
} else {
int sockerr;
socklen_t len = (socklen_t)sizeof(int);
res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &sockerr, &len);
if (res < 0) {
AddLog(LOG_LEVEL_DEBUG, "BRY: getsockopt on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
stop();
state = AsyncTCPState::REFUSED;
return;
}
if (sockerr != 0) {
AddLog(LOG_LEVEL_DEBUG, "BRY: socket error on fd %d, errno: %d, \"%s\"", sockfd, sockerr, strerror(sockerr));
stop();
state = AsyncTCPState::REFUSED;
return;
}
state = AsyncTCPState::CONNECTED;
return;
}
}
void _update_connected(void) {
if (state == AsyncTCPState::INPROGRESS) {
this->_update_connection_state(); // force an update
}
if (state == AsyncTCPState::CONNECTED) {
uint8_t dummy;
int res = recv(sockfd, &dummy, 0, MSG_DONTWAIT);
// recv only sets errno if res is <= 0
if (res <= 0){
switch (errno) {
case EWOULDBLOCK:
case ENOENT: //caused by vfs
// connected, don't change status
break;
case ENOTCONN:
case EPIPE:
case ECONNRESET:
case ECONNREFUSED:
case ECONNABORTED:
stop();
break;
default:
// AddLog(LOG_LEVEL_DEBUG, "BRY: tcpclientasync unexpected: RES: %d, ERR: %d", res, errno);
stop();
break;
}
} else {
// connected do nothing
}
}
}
// connected()
// returns:
// 0: still in progres, unknown
// 1: connected
// -1: refused
int connected(void) {
_update_connected();
switch (state) {
case AsyncTCPState::CONNECTED: return 1; break;
case AsyncTCPState::INPROGRESS: return 0; break;
// all other values indicated that the connection is down
case AsyncTCPState::REFUSED:
case AsyncTCPState::CLOSED:
default:
return -1;
break;
}
}
size_t available(void) {
_update_connected();
if (state == AsyncTCPState::CONNECTED) {
size_t bytes_available = 0;
int res = ioctl(sockfd, FIONREAD, &bytes_available);
if (res >= 0) {
return bytes_available;
} else {
return 0;
}
} else {
return 0;
}
}
bool listening(void) {
_update_connected();
if (state == AsyncTCPState::CONNECTED) {
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(sockfd, &fdset);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
int res = ::select(sockfd + 1, nullptr, &fdset, nullptr, &tv);
if (res < 0) {
stop();
return false;
} else if (res == 0) {
// connection is up but not ready to send
return false;
} else {
// we assume that since we got select answering it's good
return true;
}
}
return false;
}
size_t write(const char *buffer, size_t size) {
_update_connected();
if (state == AsyncTCPState::CONNECTED) {
// is the socket ready for writing
fd_set set;
struct timeval tv;
FD_ZERO(&set); // empties the set
FD_SET(sockfd, &set); // adds FD to the set
tv.tv_sec = 0;
tv.tv_usec = 0; // non-blocking
if (select(sockfd + 1, NULL, &set, NULL, &tv) < 0) {
return 0; // error TODO close connection?
}
if (FD_ISSET(sockfd, &set)) {
// ready for writes
int res = send(sockfd, (void*) buffer, size, MSG_DONTWAIT);
if (res >= 0) {
return res;
} else { // res < 0
if (errno != EAGAIN) {
AddLog(LOG_LEVEL_DEBUG, "BRY: tcpclientasync send fail on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
stop();
return 0;
}
}
} else {
return 0; // not ready for writes
}
} else {
return 0;
}
return 0;
}
size_t read(uint8_t* buf, size_t size) {
_update_connected();
if (state == AsyncTCPState::CONNECTED) {
if (size > 1436) { size = 1436; } // let's keep it reasonable
int res = recv(sockfd, buf, size, MSG_DONTWAIT);
if (res < 0) { // check error
if (errno != EWOULDBLOCK) { // something went wrong
stop();
return 0;
}
}
return res;
}
return 0;
}
void update_local_addr_port(void) {
local_port = -1; // default to unknwon
if (sockfd > 0) {
struct sockaddr_storage local_address;
socklen_t addr_size = sizeof(local_address);
// getpeername(fd, (struct sockaddr*)&addr, &len);
int res = getsockname(sockfd, (struct sockaddr*)&local_address, &addr_size);
if (res != 0) { return; }
// IPv4 socket, old way
if (((struct sockaddr*)&local_address)->sa_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&local_address;
local_port = ntohs(s->sin_port);
local_addr = IPAddress((uint32_t)(s->sin_addr.s_addr));
// return IPAddress((uint32_t)(s->sin_addr.s_addr));
}
// IPv6, but it might be IPv4 mapped address
if (((struct sockaddr*)&local_address)->sa_family == AF_INET6) {
struct sockaddr_in6 *saddr6 = (struct sockaddr_in6 *)&local_address;
local_port = ntohs(saddr6->sin6_port);
if (T_IN6_IS_ADDR_V4MAPPED(saddr6->sin6_addr.un.u32_addr)) {
local_addr = IPAddress(IPv4, (uint8_t*)saddr6->sin6_addr.s6_addr+12);
// return IPAddress(IPv4, (uint8_t*)saddr6->sin6_addr.s6_addr+12);
} else {
local_addr = IPAddress(IPv6, (uint8_t*)(saddr6->sin6_addr.s6_addr));
// return IPAddress(IPv6, (uint8_t*)(saddr6->sin6_addr.s6_addr));
}
}
}
}
public:
int sockfd;
AsyncTCPState state;
uint32_t _timeout_ms;
String remota_addr; // address in numerical format (after DNS resolution), either IPv4 or IPv6
uint16_t remote_port; // remote port number
IPAddress local_addr;
int32_t local_port; // -1 if unknown or invalid
};
/*********************************************************************************************\
* Native functions mapped to Berry functions
*
*
\*********************************************************************************************/
extern "C" {
// same but using `.p` argument
AsyncTCPClient * wc_gettcpclientasync_p(struct bvm *vm) {
be_getmember(vm, 1, ".p");
void *p = be_tocomptr(vm, -1);
be_pop(vm, 1);
return (AsyncTCPClient*) p;
}
int32_t wc_tcpasync_init(struct bvm *vm);
int32_t wc_tcpasync_init(struct bvm *vm) {
int32_t argc = be_top(vm);
AsyncTCPClient * wcl;
if (argc >= 2 && be_iscomptr(vm, 2)) {
wcl = (AsyncTCPClient*) be_tocomptr(vm, 2);
} else {
wcl = new AsyncTCPClient();
}
be_pushcomptr(vm, (void*) wcl);
be_setmember(vm, 1, ".p");
be_return_nil(vm);
}
int32_t wc_tcpasync_deinit(struct bvm *vm);
int32_t wc_tcpasync_deinit(struct bvm *vm) {
AsyncTCPClient * wcl = wc_gettcpclientasync_p(vm);
if (wcl != nullptr) { delete wcl; }
be_setmember(vm, 1, ".p");
be_return_nil(vm);
}
// tcp.connect(address:string, port:int) -> bool
int32_t wc_tcpasync_connect(struct bvm *vm);
int32_t wc_tcpasync_connect(struct bvm *vm) {
int32_t argc = be_top(vm);
if (argc >= 3 && be_isstring(vm, 2) && be_isint(vm, 3)) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
const char * address = be_tostring(vm, 2);
int32_t port = be_toint(vm, 3);
// open connection
IPAddress ipaddr;
bool success = WifiHostByName(address, ipaddr);
if (success) {
success = tcp->connect(ipaddr, port);
} else {
AddLog(LOG_LEVEL_DEBUG, "BRY: tcpclientasync.connect couldn't resolve '%s'", address);
}
be_pushbool(vm, success);
be_return(vm); /* return self */
}
be_raise(vm, "attribute_error", NULL);
}
// tcp.close(void) -> nil
int32_t wc_tcpasync_close(struct bvm *vm);
int32_t wc_tcpasync_close(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
tcp->stop();
be_return_nil(vm);
}
// tcp.available(void) -> int
int32_t wc_tcpasync_available(struct bvm *vm);
int32_t wc_tcpasync_available(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
int32_t available = tcp->available();
be_pushint(vm, available);
be_return(vm);
}
// tcp.connected(void) -> bool or nil
// new value: returns `nil` if the status is still unknown
int32_t wc_tcpasync_connected(struct bvm *vm);
int32_t wc_tcpasync_connected(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
int res = tcp->connected();
if (res) {
be_pushbool(vm, res > 0);
} else {
be_pushnil(vm);
}
be_return(vm); /* return code */
}
// tcp.listening(void) -> bool
// is the socket ready for sending
int32_t wc_tcpasync_listening(struct bvm *vm);
int32_t wc_tcpasync_listening(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
bool res = tcp->listening();
be_pushbool(vm, res);
be_return(vm); /* return code */
}
// tcp.info(void) -> map
// get information about the socket
int32_t wc_tcpasync_info(struct bvm *vm);
int32_t wc_tcpasync_info(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
be_newobject(vm, "map");
int connected = tcp->connected();
if (connected == 0) {
be_map_insert_nil(vm, "connected");
} else {
be_map_insert_bool(vm, "connected", connected > 0);
}
if (connected >= 0) {
be_map_insert_bool(vm, "available", tcp->available());
be_map_insert_bool(vm, "listening", tcp->listening());
be_map_insert_int(vm, "fd", tcp->sockfd);
tcp->update_local_addr_port();
if (tcp->local_port > 0) {
be_map_insert_int(vm, "local_port", tcp->local_port);
be_map_insert_str(vm, "local_addr", tcp->local_addr.toString().c_str());
}
}
be_pop(vm, 1);
be_return(vm);
}
// tcp.write(bytes | string) -> int
int32_t wc_tcpasync_write(struct bvm *vm);
int32_t wc_tcpasync_write(struct bvm *vm) {
int32_t argc = be_top(vm);
if (argc >= 2 && (be_isstring(vm, 2) || be_isbytes(vm, 2))) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
const char * buf = nullptr;
size_t buf_len = 0;
if (be_isstring(vm, 2)) { // string
buf = be_tostring(vm, 2);
buf_len = strlen(buf);
} else { // bytes
buf = (const char*) be_tobytes(vm, 2, &buf_len);
}
size_t bw = tcp->write(buf, buf_len);
be_pushint(vm, bw);
be_return(vm); /* return code */
}
be_raise(vm, kTypeError, nullptr);
}
// tcp.read() -> string
int32_t wc_tcpasync_read(struct bvm *vm);
int32_t wc_tcpasync_read(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
int32_t max_read = -1; // by default read as much as we can
if (be_top(vm) >= 2 && be_isint(vm, 2)) {
max_read = be_toint(vm, 2);
}
int32_t btr = tcp->available();
if (btr <= 0) {
be_pushstring(vm, "");
} else {
if ((max_read >= 0) && (btr > max_read)) {
btr = max_read;
}
char * buf = (char*) be_pushbuffer(vm, btr);
int32_t btr2 = tcp->read((uint8_t*) buf, btr);
be_pushnstring(vm, buf, btr2);
}
be_return(vm); /* return code */
}
// tcp.readbytes() -> bytes
int32_t wc_tcpasync_readbytes(struct bvm *vm);
int32_t wc_tcpasync_readbytes(struct bvm *vm) {
AsyncTCPClient * tcp = wc_gettcpclientasync_p(vm);
int32_t max_read = -1; // by default read as much as we can
if (be_top(vm) >= 2 && be_isint(vm, 2)) {
max_read = be_toint(vm, 2);
}
int32_t btr = tcp->available();
if (btr <= 0) {
be_pushbytes(vm, nullptr, 0);
} else {
if ((max_read >= 0) && (btr > max_read)) {
btr = max_read;
}
uint8_t * buf = (uint8_t*) be_pushbuffer(vm, btr);
int32_t btr2 = tcp->read(buf, btr);
be_pushbytes(vm, buf, btr2);
}
be_return(vm); /* return code */
}
}
#endif // USE_WEBCLIENT
#endif // USE_BERRY