Andrew Leech 7e14680a83 py/objringio: Add micropython.RingIO() interface for general use.
This commit adds a new `RingIO` type which exposes the internal ring-buffer
code for general use in Python programs.  It has the stream interface
making it similar to `StringIO` and `BytesIO`, except `RingIO` has a fixed
buffer size and is automatically safe when reads and writes are in
different threads or an IRQ.

This new type is enabled at the "extra features" ROM level.

Signed-off-by: Andrew Leech <andrew.leech@planetinnovation.com.au>
2024-09-19 18:00:44 +10:00

37 lines
659 B
Python

# Check that micropython.RingIO works correctly with asyncio.Stream.
import micropython
try:
import asyncio
asyncio.StreamWriter
micropython.RingIO
except (AttributeError, ImportError):
print("SKIP")
raise SystemExit
rb = micropython.RingIO(16)
rba = asyncio.StreamWriter(rb)
data = b"ABC123" * 20
print("w", len(data))
async def data_writer():
global data
rba.write(data)
await rba.drain()
async def main():
task = asyncio.create_task(data_writer())
await asyncio.sleep_ms(10)
read = await rba.readexactly(len(data))
print(read)
print("r", len(read))
print(read == data)
asyncio.run(main())