mirror of
https://github.com/Freenove/Freenove_ESP32_WROVER_Board.git
synced 2025-07-30 16:57:23 +00:00
Modify the code structure.
This commit is contained in:
parent
1f1910445f
commit
9618c2529b
Binary file not shown.
@ -0,0 +1,323 @@
|
||||
# Picoweb web pico-framework for Pycopy, https://github.com/pfalcon/pycopy
|
||||
# Copyright (c) 2014-2020 Paul Sokolovsky
|
||||
# SPDX-License-Identifier: MIT
|
||||
import sys
|
||||
import gc
|
||||
import micropython
|
||||
import utime
|
||||
import uio
|
||||
import ure as re
|
||||
import uerrno
|
||||
import uasyncio as asyncio
|
||||
import pkg_resources
|
||||
|
||||
from .utils import parse_qs
|
||||
|
||||
SEND_BUFSZ = 128
|
||||
|
||||
|
||||
def get_mime_type(fname):
|
||||
# Provide minimal detection of important file
|
||||
# types to keep browsers happy
|
||||
if fname.endswith(".html"):
|
||||
return "text/html"
|
||||
if fname.endswith(".css"):
|
||||
return "text/css"
|
||||
if fname.endswith(".png") or fname.endswith(".jpg"):
|
||||
return "image"
|
||||
return "text/plain"
|
||||
|
||||
def sendstream(writer, f):
|
||||
buf = bytearray(SEND_BUFSZ)
|
||||
while True:
|
||||
l = f.readinto(buf)
|
||||
if not l:
|
||||
break
|
||||
yield from writer.awrite(buf, 0, l)
|
||||
|
||||
|
||||
def jsonify(writer, dict):
|
||||
import ujson
|
||||
yield from start_response(writer, "application/json")
|
||||
yield from writer.awrite(ujson.dumps(dict))
|
||||
|
||||
def start_response(writer, content_type="text/html; charset=utf-8", status="200", headers=None):
|
||||
yield from writer.awrite("HTTP/1.0 %s NA\r\n" % status)
|
||||
yield from writer.awrite("Content-Type: ")
|
||||
yield from writer.awrite(content_type)
|
||||
if not headers:
|
||||
yield from writer.awrite("\r\n\r\n")
|
||||
return
|
||||
yield from writer.awrite("\r\n")
|
||||
if isinstance(headers, bytes) or isinstance(headers, str):
|
||||
yield from writer.awrite(headers)
|
||||
else:
|
||||
for k, v in headers.items():
|
||||
yield from writer.awrite(k)
|
||||
yield from writer.awrite(": ")
|
||||
yield from writer.awrite(v)
|
||||
yield from writer.awrite("\r\n")
|
||||
yield from writer.awrite("\r\n")
|
||||
|
||||
def http_error(writer, status):
|
||||
yield from start_response(writer, status=status)
|
||||
yield from writer.awrite(status)
|
||||
|
||||
|
||||
class HTTPRequest:
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def read_form_data(self):
|
||||
size = int(self.headers[b"Content-Length"])
|
||||
data = yield from self.reader.readexactly(size)
|
||||
form = parse_qs(data.decode())
|
||||
self.form = form
|
||||
|
||||
def parse_qs(self):
|
||||
form = parse_qs(self.qs)
|
||||
self.form = form
|
||||
|
||||
|
||||
class WebApp:
|
||||
|
||||
def __init__(self, pkg, routes=None, serve_static=True):
|
||||
if routes:
|
||||
self.url_map = routes
|
||||
else:
|
||||
self.url_map = []
|
||||
if pkg and pkg != "__main__":
|
||||
self.pkg = pkg.split(".", 1)[0]
|
||||
else:
|
||||
self.pkg = None
|
||||
if serve_static:
|
||||
self.url_map.append((re.compile("^/(static/.+)"), self.handle_static))
|
||||
self.mounts = []
|
||||
self.inited = False
|
||||
# Instantiated lazily
|
||||
self.template_loader = None
|
||||
self.headers_mode = "parse"
|
||||
|
||||
def parse_headers(self, reader):
|
||||
headers = {}
|
||||
while True:
|
||||
l = yield from reader.readline()
|
||||
if l == b"\r\n":
|
||||
break
|
||||
k, v = l.split(b":", 1)
|
||||
headers[k] = v.strip()
|
||||
return headers
|
||||
|
||||
def _handle(self, reader, writer):
|
||||
if self.debug > 1:
|
||||
micropython.mem_info()
|
||||
|
||||
close = True
|
||||
req = None
|
||||
try:
|
||||
request_line = yield from reader.readline()
|
||||
if request_line == b"":
|
||||
if self.debug >= 0:
|
||||
self.log.error("%s: EOF on request start" % reader)
|
||||
yield from writer.aclose()
|
||||
return
|
||||
req = HTTPRequest()
|
||||
# TODO: bytes vs str
|
||||
request_line = request_line.decode()
|
||||
method, path, proto = request_line.split()
|
||||
if self.debug >= 0:
|
||||
self.log.info('%.3f %s %s "%s %s"' % (utime.time(), req, writer, method, path))
|
||||
path = path.split("?", 1)
|
||||
qs = ""
|
||||
if len(path) > 1:
|
||||
qs = path[1]
|
||||
path = path[0]
|
||||
|
||||
#print("================")
|
||||
#print(req, writer)
|
||||
#print(req, (method, path, qs, proto), req.headers)
|
||||
|
||||
# Find which mounted subapp (if any) should handle this request
|
||||
app = self
|
||||
while True:
|
||||
found = False
|
||||
for subapp in app.mounts:
|
||||
root = subapp.url
|
||||
#print(path, "vs", root)
|
||||
if path[:len(root)] == root:
|
||||
app = subapp
|
||||
found = True
|
||||
path = path[len(root):]
|
||||
if not path.startswith("/"):
|
||||
path = "/" + path
|
||||
break
|
||||
if not found:
|
||||
break
|
||||
|
||||
# We initialize apps on demand, when they really get requests
|
||||
if not app.inited:
|
||||
app.init()
|
||||
|
||||
# Find handler to serve this request in app's url_map
|
||||
found = False
|
||||
for e in app.url_map:
|
||||
pattern = e[0]
|
||||
handler = e[1]
|
||||
extra = {}
|
||||
if len(e) > 2:
|
||||
extra = e[2]
|
||||
|
||||
if path == pattern:
|
||||
found = True
|
||||
break
|
||||
elif not isinstance(pattern, str):
|
||||
# Anything which is non-string assumed to be a ducktype
|
||||
# pattern matcher, whose .match() method is called. (Note:
|
||||
# Django uses .search() instead, but .match() is more
|
||||
# efficient and we're not exactly compatible with Django
|
||||
# URL matching anyway.)
|
||||
m = pattern.match(path)
|
||||
if m:
|
||||
req.url_match = m
|
||||
found = True
|
||||
break
|
||||
|
||||
if not found:
|
||||
headers_mode = "skip"
|
||||
else:
|
||||
headers_mode = extra.get("headers", self.headers_mode)
|
||||
|
||||
if headers_mode == "skip":
|
||||
while True:
|
||||
l = yield from reader.readline()
|
||||
if l == b"\r\n":
|
||||
break
|
||||
elif headers_mode == "parse":
|
||||
req.headers = yield from self.parse_headers(reader)
|
||||
else:
|
||||
assert headers_mode == "leave"
|
||||
|
||||
if found:
|
||||
req.method = method
|
||||
req.path = path
|
||||
req.qs = qs
|
||||
req.reader = reader
|
||||
close = yield from handler(req, writer)
|
||||
else:
|
||||
yield from start_response(writer, status="404")
|
||||
yield from writer.awrite("404\r\n")
|
||||
#print(req, "After response write")
|
||||
except Exception as e:
|
||||
if self.debug >= 0:
|
||||
self.log.exc(e, "%.3f %s %s %r" % (utime.time(), req, writer, e))
|
||||
yield from self.handle_exc(req, writer, e)
|
||||
|
||||
if close is not False:
|
||||
yield from writer.aclose()
|
||||
if __debug__ and self.debug > 1:
|
||||
self.log.debug("%.3f %s Finished processing request", utime.time(), req)
|
||||
|
||||
def handle_exc(self, req, resp, e):
|
||||
# Can be overriden by subclasses. req may be not (fully) initialized.
|
||||
# resp may already have (partial) content written.
|
||||
# NOTE: It's your responsibility to not throw exceptions out of
|
||||
# handle_exc(). If exception is thrown, it will be propagated, and
|
||||
# your webapp will terminate.
|
||||
# This method is a coroutine.
|
||||
return
|
||||
yield
|
||||
|
||||
def mount(self, url, app):
|
||||
"Mount a sub-app at the url of current app."
|
||||
# Inspired by Bottle. It might seem that dispatching to
|
||||
# subapps would rather be handled by normal routes, but
|
||||
# arguably, that's less efficient. Taking into account
|
||||
# that paradigmatically there's difference between handing
|
||||
# an action and delegating responisibilities to another
|
||||
# app, Bottle's way was followed.
|
||||
app.url = url
|
||||
self.mounts.append(app)
|
||||
# TODO: Consider instead to do better subapp prefix matching
|
||||
# in _handle() above.
|
||||
self.mounts.sort(key=lambda app: len(app.url), reverse=True)
|
||||
|
||||
def route(self, url, **kwargs):
|
||||
def _route(f):
|
||||
self.url_map.append((url, f, kwargs))
|
||||
return f
|
||||
return _route
|
||||
|
||||
def add_url_rule(self, url, func, **kwargs):
|
||||
# Note: this method skips Flask's "endpoint" argument,
|
||||
# because it's alleged bloat.
|
||||
self.url_map.append((url, func, kwargs))
|
||||
|
||||
def _load_template(self, tmpl_name):
|
||||
if self.template_loader is None:
|
||||
import utemplate.source
|
||||
self.template_loader = utemplate.source.Loader(self.pkg, "templates")
|
||||
return self.template_loader.load(tmpl_name)
|
||||
|
||||
def render_template(self, writer, tmpl_name, args=()):
|
||||
tmpl = self._load_template(tmpl_name)
|
||||
for s in tmpl(*args):
|
||||
yield from writer.awritestr(s)
|
||||
|
||||
def render_str(self, tmpl_name, args=()):
|
||||
#TODO: bloat
|
||||
tmpl = self._load_template(tmpl_name)
|
||||
return ''.join(tmpl(*args))
|
||||
|
||||
def sendfile(self, writer, fname, content_type=None, headers=None):
|
||||
if not content_type:
|
||||
content_type = get_mime_type(fname)
|
||||
try:
|
||||
with pkg_resources.resource_stream(self.pkg, fname) as f:
|
||||
yield from start_response(writer, content_type, "200", headers)
|
||||
yield from sendstream(writer, f)
|
||||
except OSError as e:
|
||||
if e.args[0] == uerrno.ENOENT:
|
||||
yield from http_error(writer, "404")
|
||||
else:
|
||||
raise
|
||||
|
||||
def handle_static(self, req, resp):
|
||||
path = req.url_match.group(1)
|
||||
print(path)
|
||||
if ".." in path:
|
||||
yield from http_error(resp, "403")
|
||||
return
|
||||
yield from self.sendfile(resp, path)
|
||||
|
||||
def init(self):
|
||||
"""Initialize a web application. This is for overriding by subclasses.
|
||||
This is good place to connect to/initialize a database, for example."""
|
||||
self.inited = True
|
||||
|
||||
def serve(self, loop, host, port):
|
||||
# Actually serve client connections. Subclasses may override this
|
||||
# to e.g. catch and handle exceptions when dealing with server socket
|
||||
# (which are otherwise unhandled and will terminate a Picoweb app).
|
||||
# Note: name and signature of this method may change.
|
||||
loop.create_task(asyncio.start_server(self._handle, host, port))
|
||||
loop.run_forever()
|
||||
|
||||
def run(self, host="127.0.0.1", port=8081, debug=False, lazy_init=False, log=None):
|
||||
if log is None and debug >= 0:
|
||||
import ulogging
|
||||
log = ulogging.getLogger("picoweb")
|
||||
if debug > 0:
|
||||
log.setLevel(ulogging.DEBUG)
|
||||
self.log = log
|
||||
gc.collect()
|
||||
self.debug = int(debug)
|
||||
self.init()
|
||||
if not lazy_init:
|
||||
for app in self.mounts:
|
||||
app.init()
|
||||
loop = asyncio.get_event_loop()
|
||||
if debug > 0:
|
||||
print("* Running on http://%s:%s/" % (host, port))
|
||||
self.serve(loop, host, port)
|
||||
loop.close()
|
@ -0,0 +1,28 @@
|
||||
def unquote_plus(s):
|
||||
# TODO: optimize
|
||||
s = s.replace("+", " ")
|
||||
arr = s.split("%")
|
||||
arr2 = [chr(int(x[:2], 16)) + x[2:] for x in arr[1:]]
|
||||
return arr[0] + "".join(arr2)
|
||||
|
||||
def parse_qs(s):
|
||||
res = {}
|
||||
if s:
|
||||
pairs = s.split("&")
|
||||
for p in pairs:
|
||||
vals = [unquote_plus(x) for x in p.split("=", 1)]
|
||||
if len(vals) == 1:
|
||||
vals.append(True)
|
||||
old = res.get(vals[0])
|
||||
if old is not None:
|
||||
if not isinstance(old, list):
|
||||
old = [old]
|
||||
res[vals[0]] = old
|
||||
old.append(vals[1])
|
||||
else:
|
||||
res[vals[0]] = vals[1]
|
||||
return res
|
||||
|
||||
#print(parse_qs("foo"))
|
||||
#print(parse_qs("fo%41o+bar=+++1"))
|
||||
#print(parse_qs("foo=1&foo=2"))
|
@ -0,0 +1,27 @@
|
||||
import uio
|
||||
|
||||
_c = {}
|
||||
|
||||
def resource_stream(package, resource):
|
||||
if package not in _c:
|
||||
try:
|
||||
if package:
|
||||
p = __import__(package + ".R", None, None, True)
|
||||
else:
|
||||
p = __import__("R")
|
||||
_c[package] = p.R
|
||||
except ImportError:
|
||||
if package:
|
||||
p = __import__(package)
|
||||
d = p.__path__
|
||||
else:
|
||||
d = "."
|
||||
# if d[0] != "/":
|
||||
# import uos
|
||||
# d = uos.getcwd() + "/" + d
|
||||
_c[package] = d + "/"
|
||||
|
||||
p = _c[package]
|
||||
if isinstance(p, dict):
|
||||
return uio.BytesIO(p[resource])
|
||||
return open(p + resource, "rb")
|
94
Python/Python_Codes/05.1_Camera_WebServer/lib/ulogging.py
Normal file
94
Python/Python_Codes/05.1_Camera_WebServer/lib/ulogging.py
Normal file
@ -0,0 +1,94 @@
|
||||
import sys
|
||||
|
||||
CRITICAL = 50
|
||||
ERROR = 40
|
||||
WARNING = 30
|
||||
INFO = 20
|
||||
DEBUG = 10
|
||||
NOTSET = 0
|
||||
|
||||
_level_dict = {
|
||||
CRITICAL: "CRIT",
|
||||
ERROR: "ERROR",
|
||||
WARNING: "WARN",
|
||||
INFO: "INFO",
|
||||
DEBUG: "DEBUG",
|
||||
}
|
||||
|
||||
_stream = sys.stderr
|
||||
|
||||
class Logger:
|
||||
|
||||
level = NOTSET
|
||||
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def _level_str(self, level):
|
||||
l = _level_dict.get(level)
|
||||
if l is not None:
|
||||
return l
|
||||
return "LVL%s" % level
|
||||
|
||||
def setLevel(self, level):
|
||||
self.level = level
|
||||
|
||||
def isEnabledFor(self, level):
|
||||
return level >= (self.level or _level)
|
||||
|
||||
def log(self, level, msg, *args):
|
||||
if level >= (self.level or _level):
|
||||
_stream.write("%s:%s:" % (self._level_str(level), self.name))
|
||||
if not args:
|
||||
print(msg, file=_stream)
|
||||
else:
|
||||
print(msg % args, file=_stream)
|
||||
|
||||
def debug(self, msg, *args):
|
||||
self.log(DEBUG, msg, *args)
|
||||
|
||||
def info(self, msg, *args):
|
||||
self.log(INFO, msg, *args)
|
||||
|
||||
def warning(self, msg, *args):
|
||||
self.log(WARNING, msg, *args)
|
||||
|
||||
def error(self, msg, *args):
|
||||
self.log(ERROR, msg, *args)
|
||||
|
||||
def critical(self, msg, *args):
|
||||
self.log(CRITICAL, msg, *args)
|
||||
|
||||
def exc(self, e, msg, *args):
|
||||
self.log(ERROR, msg, *args)
|
||||
sys.print_exception(e, _stream)
|
||||
|
||||
def exception(self, msg, *args):
|
||||
self.exc(sys.exc_info()[1], msg, *args)
|
||||
|
||||
|
||||
_level = INFO
|
||||
_loggers = {}
|
||||
|
||||
def getLogger(name):
|
||||
if name in _loggers:
|
||||
return _loggers[name]
|
||||
l = Logger(name)
|
||||
_loggers[name] = l
|
||||
return l
|
||||
|
||||
def info(msg, *args):
|
||||
getLogger(None).info(msg, *args)
|
||||
|
||||
def debug(msg, *args):
|
||||
getLogger(None).debug(msg, *args)
|
||||
|
||||
def basicConfig(level=INFO, filename=None, stream=None, format=None):
|
||||
global _level, _stream
|
||||
_level = level
|
||||
if stream:
|
||||
_stream = stream
|
||||
if filename is not None:
|
||||
print("logging.basicConfig: filename arg is not supported")
|
||||
if format is not None:
|
||||
print("logging.basicConfig: format arg is not supported")
|
119
Python/Python_Codes/05.1_Camera_WebServer/picoweb_video.py
Normal file
119
Python/Python_Codes/05.1_Camera_WebServer/picoweb_video.py
Normal file
@ -0,0 +1,119 @@
|
||||
# This section uses firmware from Lemariva's Micropython-camera-driver.
|
||||
# for details, please refer to: https://github.com/lemariva/micropython-camera-driver
|
||||
import picoweb
|
||||
import utime
|
||||
import camera
|
||||
import gc
|
||||
|
||||
SSID = "********" # Enter your WiFi name
|
||||
PASSWORD = "********" # Enter your WiFi password
|
||||
|
||||
# Let ESP32 connect to wifi.
|
||||
def wifi_connect():
|
||||
import network
|
||||
wlan = network.WLAN(network.STA_IF)
|
||||
wlan.active(True)
|
||||
if not wlan.isconnected():
|
||||
print('connecting to network...')
|
||||
wlan.connect(SSID, PASSWORD)
|
||||
start = utime.time()
|
||||
while not wlan.isconnected():
|
||||
utime.sleep(1)
|
||||
if utime.time()-start > 5:
|
||||
print("connect timeout!")
|
||||
break
|
||||
if wlan.isconnected():
|
||||
print('network config:', wlan.ifconfig())
|
||||
|
||||
# Initializing the Camera
|
||||
def camera_init():
|
||||
# Disable camera initialization
|
||||
camera.deinit()
|
||||
# Enable camera initialization
|
||||
camera.init(0, d0=4, d1=5, d2=18, d3=19, d4=36, d5=39, d6=34, d7=35,
|
||||
format=camera.JPEG, framesize=camera.FRAME_VGA,
|
||||
xclk_freq=camera.XCLK_20MHz,
|
||||
href=23, vsync=25, reset=-1, pwdn=-1,
|
||||
sioc=27, siod=26, xclk=21, pclk=22, fb_location=camera.PSRAM)
|
||||
|
||||
camera.framesize(camera.FRAME_VGA) # Set the camera resolution
|
||||
# The options are the following:
|
||||
# FRAME_96X96 FRAME_QQVGA FRAME_QCIF FRAME_HQVGA FRAME_240X240
|
||||
# FRAME_QVGA FRAME_CIF FRAME_HVGA FRAME_VGA FRAME_SVGA
|
||||
# FRAME_XGA FRAME_HD FRAME_SXGA FRAME_UXGA
|
||||
# Note: The higher the resolution, the more memory is used.
|
||||
# Note: And too much memory may cause the program to fail.
|
||||
|
||||
camera.flip(1) # Flip up and down window: 0-1
|
||||
camera.mirror(1) # Flip window left and right: 0-1
|
||||
camera.saturation(0) # saturation: -2,2 (default 0). -2 grayscale
|
||||
camera.brightness(0) # brightness: -2,2 (default 0). 2 brightness
|
||||
camera.contrast(0) # contrast: -2,2 (default 0). 2 highcontrast
|
||||
camera.quality(10) # quality: # 10-63 lower number means higher quality
|
||||
# Note: The smaller the number, the sharper the image. The larger the number, the more blurry the image
|
||||
|
||||
camera.speffect(camera.EFFECT_NONE) # special effects:
|
||||
# EFFECT_NONE (default) EFFECT_NEG EFFECT_BW EFFECT_RED EFFECT_GREEN EFFECT_BLUE EFFECT_RETRO
|
||||
camera.whitebalance(camera.WB_NONE) # white balance
|
||||
# WB_NONE (default) WB_SUNNY WB_CLOUDY WB_OFFICE WB_HOME
|
||||
|
||||
# HTTP Response Content
|
||||
index_web="""
|
||||
HTTP/1.0 200 OK\r\n
|
||||
<html>
|
||||
<head>
|
||||
<title>Video Streaming</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Video Streaming Demonstration</h1>
|
||||
<img src="/video" margin-top:100px; style="transform:rotate(180deg); "/>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
# HTTP Response
|
||||
def index(req, resp):
|
||||
# You can construct an HTTP response completely yourself, having
|
||||
yield from resp.awrite(index_web)
|
||||
|
||||
# Send camera pictures
|
||||
def send_frame():
|
||||
buf = camera.capture()
|
||||
yield (b'--frame\r\n'
|
||||
b'Content-Type: image/jpeg\r\n\r\n'
|
||||
+ buf + b'\r\n')
|
||||
del buf
|
||||
gc.collect()
|
||||
|
||||
# Video transmission
|
||||
def video(req, resp):
|
||||
yield from picoweb.start_response(resp, content_type="multipart/x-mixed-replace; boundary=frame")
|
||||
while True:
|
||||
yield from resp.awrite(next(send_frame()))
|
||||
gc.collect()
|
||||
|
||||
|
||||
ROUTES = [
|
||||
# You can specify exact URI string matches...
|
||||
("/", index),
|
||||
("/video", video),
|
||||
]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
import ulogging as logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
camera_init()
|
||||
wifi_connect()
|
||||
|
||||
#Create an app object that contains two decorators
|
||||
app = picoweb.WebApp(__name__, ROUTES)
|
||||
|
||||
app.run(debug=1, port=80, host="0.0.0.0")
|
||||
# debug values:
|
||||
# -1 disable all logging
|
||||
# 0 (False) normal logging: requests and errors
|
||||
# 1 (True) debug logging
|
||||
# 2 extra debug logging
|
||||
|
Loading…
x
Reference in New Issue
Block a user