How to Apply a Grace Time Using Rx

How can I apply a grace time using RX?

If you switchMap (a flatMap where when a second item is emitted from the source the subscription to the original observable is unsubscribed and the subscription moves to the next) you could do something like this:

  1. booleanObservable
  2. .switchMap ( map true to an observable timer of 2 seconds, map false to an empty observable)
  3. .onNext show your message (next won't fire for the empty and a
    quick response would have cut off the 2 second timer).

Note switchMap is 'switchLatest' in RxSwift.

Could become something like this:

booleanObservable
.map { inProgress -> Observable<Bool> in
if inProgress {
return Observable.just(true).delay(time: 2)
} else {
return Observable.just(false)
}
}
.switchLatest()

Is it possible to make `ReplaySubject` to run a closure on being subscribed to?

Here are a couple of options:

Adding the expensive operation to a doOn(onSubscribe:) that's in between the Observable and the subscription:

let observable = Observable.of(1, 2)
.doOn(onSubscribe: { _ in
expensiveOperation()
})

observable
.subscribeNext { e in
print(e)
}

Making the Observable connectable and separating the doOn(onSubscribe:):

let observable = Observable.of(1, 2)
.publish()

observable
.doOn(onSubscribe: { _ in
expensiveOperation()
})
.subscribe()

observable
.subscribeNext { e in
print(e)
}

observable.connect()

rxJava debounce() operator not working with Observable.range()

debounce prevents the downstream from getting overwhelmed by defining a grace period which must elapse between events in order to get the last event, in other words, it will emit the latest element after some quiet time. Range will go through its items as fast it can thus there won't be enough time between items and only the very last will be emitted.

debounce is simply not the operator your use case requires. The extensions project for 2.x has the spanout operator for your use case.

Reactive Streams - batching with timeout

It depends on how you identify the start and end of each buffer, so the following RxJava 2 code is intended as a hint about using the main source's value to open and close the buffer's gate:

TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();

Prints:

[Start, A, B, End]
[Start, C]
[Start, D, End]

It works by sharing the source via publish which allows reusing the same value from the upstream without having multiple source copies running at once. The opening is governed by the detection of a "Start" string on the line. The closing is governed by either the detection of the "End" string or a timer firing after a grace period.

Edit:

If the "Start" is also the indicator for the next batch, you can replace the "End" check with "Start" and alter the contents of the buffer since it will include the new header in the previous buffer otherwise:

pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);

Proper handling of Backpressure and Concurrency with RxJava2

I'm not sure if this actually fits my requirements.

It does not. Apply either onBackpressureLatest or onBackpressureBuffer followed by observeOn in the observeSomePacketsOn and observeAllPacketsOn respectively.

Does the onBackpressureLatest call make it so that the items are no longer multicasted?

The multicasting is done by PublishProcessor and different subscribers will establish a channel to it independently where the onBackpressureXXX and observeOn operators take effect on an individual subscriber basis.

How can I test my requirements?

Subscribe through the lossy or lossless Flowable with a TestSubscriber (Flowable.test()), feed a known set of Packets into packets and see if all of them arrived either via TestSubscriber.assertValueCount() or TestSubscriber.values(). The lossy one should be 1 .. N and the lossless one should have N values after a grace period.

Bonus: If I have multiple such publishers (in same class or elsewhere) , what is the best way to make the same pattern reusable. Create my own Flowable with delegation/extra methods?

You could turn the observeAllPacketsOn into a FlowableTransformer and instead of a method call on MyBroadcaster, use compose, for example:

class MyTransformers {
public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
return f -> f.onBackpressureLatest().observeOn(s);
}
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);

How to implement a compile-time [dispatch] table for AVR?

You can definitely make a module system if you write your own custom linker script, and copy what was done for constructors and destructors (ctors and dtors). The linker script below was based on avr5.x from AVR GCC, but I added the dispatch stuff to it.

If you look at the output of the build script in the shell session below, you can see that the dispatch table is set up correctly and has symbols pointing to the start and end of it. The shell session includes all the source code and build scripts that I used to compile this example.

$ ls
avr5-x-modules.ld build.sh kernel.c kernel.h module_foo.c

$ cat avr5-x-modules.ld
/* Default linker script, for normal executables */
/* Copyright (C) 2014 Free Software Foundation, Inc.
Copying and distribution of this script, with or without modification,
are permitted in any medium without royalty provided the copyright
notice and this notice are preserved. */
OUTPUT_FORMAT("elf32-avr","elf32-avr","elf32-avr")
OUTPUT_ARCH(avr:5)
MEMORY
{
text (rx) : ORIGIN = 0, LENGTH = 128K
data (rw!x) : ORIGIN = 0x800060, LENGTH = 0xffa0
eeprom (rw!x) : ORIGIN = 0x810000, LENGTH = 64K
fuse (rw!x) : ORIGIN = 0x820000, LENGTH = 1K
lock (rw!x) : ORIGIN = 0x830000, LENGTH = 1K
signature (rw!x) : ORIGIN = 0x840000, LENGTH = 1K
user_signatures (rw!x) : ORIGIN = 0x850000, LENGTH = 1K
}
SECTIONS
{
/* Read-only sections, merged into text segment: */
.hash : { *(.hash) }
.dynsym : { *(.dynsym) }
.dynstr : { *(.dynstr) }
.gnu.version : { *(.gnu.version) }
.gnu.version_d : { *(.gnu.version_d) }
.gnu.version_r : { *(.gnu.version_r) }
.rel.init : { *(.rel.init) }
.rela.init : { *(.rela.init) }
.rel.text :
{
*(.rel.text)
*(.rel.text.*)
*(.rel.gnu.linkonce.t*)
}
.rela.text :
{
*(.rela.text)
*(.rela.text.*)
*(.rela.gnu.linkonce.t*)
}
.rel.fini : { *(.rel.fini) }
.rela.fini : { *(.rela.fini) }
.rel.rodata :
{
*(.rel.rodata)
*(.rel.rodata.*)
*(.rel.gnu.linkonce.r*)
}
.rela.rodata :
{
*(.rela.rodata)
*(.rela.rodata.*)
*(.rela.gnu.linkonce.r*)
}
.rel.data :
{
*(.rel.data)
*(.rel.data.*)
*(.rel.gnu.linkonce.d*)
}
.rela.data :
{
*(.rela.data)
*(.rela.data.*)
*(.rela.gnu.linkonce.d*)
}
.rel.ctors : { *(.rel.ctors) }
.rela.ctors : { *(.rela.ctors) }
.rel.dtors : { *(.rel.dtors) }
.rela.dtors : { *(.rela.dtors) }
.rel.got : { *(.rel.got) }
.rela.got : { *(.rela.got) }
.rel.bss : { *(.rel.bss) }
.rela.bss : { *(.rela.bss) }
.rel.plt : { *(.rel.plt) }
.rela.plt : { *(.rela.plt) }
/* Internal text space or external memory. */
.text :
{
*(.vectors)
KEEP(*(.vectors))
/* For data that needs to reside in the lower 64k of progmem. */
*(.progmem.gcc*)
/* PR 13812: Placing the trampolines here gives a better chance
that they will be in range of the code that uses them. */
. = ALIGN(2);
__trampolines_start = . ;
/* The jump trampolines for the 16-bit limited relocs will reside here. */
*(.trampolines)
*(.trampolines*)
__trampolines_end = . ;
*(.progmem*)
. = ALIGN(2);
/* For future tablejump instruction arrays for 3 byte pc devices.
We don't relax jump/call instructions within these sections. */
*(.jumptables)
*(.jumptables*)
/* For code that needs to reside in the lower 128k progmem. */
*(.lowtext)
*(.lowtext*)
__ctors_start = . ;
*(.ctors)
__ctors_end = . ;
__dtors_start = . ;
*(.dtors)
__dtors_end = . ;
KEEP(SORT(*)(.ctors))
KEEP(SORT(*)(.dtors))
__dispatch_start = . ;
*(.dispatch)
__dispatch_end = . ;
KEEP(SORT(*)(.dispatch))
/* From this point on, we don't bother about wether the insns are
below or above the 16 bits boundary. */
*(.init0) /* Start here after reset. */
KEEP (*(.init0))
*(.init1)
KEEP (*(.init1))
*(.init2) /* Clear __zero_reg__, set up stack pointer. */
KEEP (*(.init2))
*(.init3)
KEEP (*(.init3))
*(.init4) /* Initialize data and BSS. */
KEEP (*(.init4))
*(.init5)
KEEP (*(.init5))
*(.init6) /* C++ constructors. */
KEEP (*(.init6))
*(.init7)
KEEP (*(.init7))
*(.init8)
KEEP (*(.init8))
*(.init9) /* Call main(). */
KEEP (*(.init9))
*(.text)
. = ALIGN(2);
*(.text.*)
. = ALIGN(2);
*(.fini9) /* _exit() starts here. */
KEEP (*(.fini9))
*(.fini8)
KEEP (*(.fini8))
*(.fini7)
KEEP (*(.fini7))
*(.fini6) /* C++ destructors. */
KEEP (*(.fini6))
*(.fini5)
KEEP (*(.fini5))
*(.fini4)
KEEP (*(.fini4))
*(.fini3)
KEEP (*(.fini3))
*(.fini2)
KEEP (*(.fini2))
*(.fini1)
KEEP (*(.fini1))
*(.fini0) /* Infinite loop after program termination. */
KEEP (*(.fini0))
_etext = . ;
} > text
.data :
{
PROVIDE (__data_start = .) ;
*(.data)
*(.data*)
*(.rodata) /* We need to include .rodata here if gcc is used */
*(.rodata*) /* with -fdata-sections. */
*(.gnu.linkonce.d*)
. = ALIGN(2);
_edata = . ;
PROVIDE (__data_end = .) ;
} > data AT> text
.bss ADDR(.data) + SIZEOF (.data) : AT (ADDR (.bss))
{
PROVIDE (__bss_start = .) ;
*(.bss)
*(.bss*)
*(COMMON)
PROVIDE (__bss_end = .) ;
} > data
__data_load_start = LOADADDR(.data);
__data_load_end = __data_load_start + SIZEOF(.data);
/* Global data not cleared after reset. */
.noinit ADDR(.bss) + SIZEOF (.bss) : AT (ADDR (.noinit))
{
PROVIDE (__noinit_start = .) ;
*(.noinit*)
PROVIDE (__noinit_end = .) ;
_end = . ;
PROVIDE (__heap_start = .) ;
} > data
.eeprom :
{
/* See .data above... */
KEEP(*(.eeprom*))
__eeprom_end = . ;
} > eeprom
.fuse :
{
KEEP(*(.fuse))
KEEP(*(.lfuse))
KEEP(*(.hfuse))
KEEP(*(.efuse))
} > fuse
.lock :
{
KEEP(*(.lock*))
} > lock
.signature :
{
KEEP(*(.signature*))
} > signature
.user_signatures :
{
KEEP(*(.user_signatures*))
} > user_signatures
/* Stabs debugging sections. */
.stab 0 : { *(.stab) }
.stabstr 0 : { *(.stabstr) }
.stab.excl 0 : { *(.stab.excl) }
.stab.exclstr 0 : { *(.stab.exclstr) }
.stab.index 0 : { *(.stab.index) }
.stab.indexstr 0 : { *(.stab.indexstr) }
.comment 0 : { *(.comment) }
.note.gnu.build-id : { *(.note.gnu.build-id) }
/* DWARF debug sections.
Symbols in the DWARF debugging sections are relative to the beginning
of the section so we begin them at 0. */
/* DWARF 1 */
.debug 0 : { *(.debug) }
.line 0 : { *(.line) }
/* GNU DWARF 1 extensions */
.debug_srcinfo 0 : { *(.debug_srcinfo) }
.debug_sfnames 0 : { *(.debug_sfnames) }
/* DWARF 1.1 and DWARF 2 */
.debug_aranges 0 : { *(.debug_aranges) }
.debug_pubnames 0 : { *(.debug_pubnames) }
/* DWARF 2 */
.debug_info 0 : { *(.debug_info .gnu.linkonce.wi.*) }
.debug_abbrev 0 : { *(.debug_abbrev) }
.debug_line 0 : { *(.debug_line .debug_line.* .debug_line_end ) }
.debug_frame 0 : { *(.debug_frame) }
.debug_str 0 : { *(.debug_str) }
.debug_loc 0 : { *(.debug_loc) }
.debug_macinfo 0 : { *(.debug_macinfo) }
/* SGI/MIPS DWARF 2 extensions */
.debug_weaknames 0 : { *(.debug_weaknames) }
.debug_funcnames 0 : { *(.debug_funcnames) }
.debug_typenames 0 : { *(.debug_typenames) }
.debug_varnames 0 : { *(.debug_varnames) }
/* DWARF 3 */
.debug_pubtypes 0 : { *(.debug_pubtypes) }
.debug_ranges 0 : { *(.debug_ranges) }
/* DWARF Extension. */
.debug_macro 0 : { *(.debug_macro) }
}

$ cat build.sh
CFLAGS="-std=gnu11 -mmcu=atmega328p"
set -uex
avr-gcc $CFLAGS -c module_foo.c -o module_foo.o
avr-gcc $CFLAGS -c kernel.c -o kernel.o
avr-gcc -T avr5-x-modules.ld kernel.o module_foo.o \
-o program.elf -Wl,-Map=program.map
grep dispatch program.map

$ cat kernel.c
#include "kernel.h"
#include <avr/pgmspace.h>

extern dispatch_item * __dispatch_start;
extern dispatch_item * __dispatch_end;

int main()
{
while (1)
{
for (dispatch_item * item = __dispatch_start; item < __dispatch_end; item++)
{
// TODO: Insert code here for reading the contents of the
// dispatch item from program space and using it. You
// probably have to use pgm_read_word avr avr/pgmspace.h,
// but with GCC 5 you could probably use the new named
// memory space feature to just access the dispatch item
// the same way you would access any other struct:
// https://gcc.gnu.org/onlinedocs/gcc/Named-Address-Spaces.html
}
}
}

$ cat kernel.h
#pragma once

#include <stdint.h>

typedef struct dispatch_item {
uint16_t func_id;
void (*func)(void);
} dispatch_item;

#define DISPATCH_ITEM dispatch_item const __attribute__((section (".dispatch")))

$ cat module_foo.c
#include "kernel.h"
#include <avr/io.h>

// This gets called before main.
void __attribute__((constructor)) foo_init()
{
PINB = 0;
}

// There is a pointer to this in the dispatch table.
void foo()
{
PINB = 1;
}

// DISPATHCH_TABLE_ENTRY(0x12, &foo);

DISPATCH_ITEM foo_dispatch = { 0x12, &foo };

DISPATCH_ITEM foo_dispatch2 = { 0x13, &foo };

$ ./build.sh
++ avr-gcc -std=gnu11 -mmcu=atmega328p -c module_foo.c -o module_foo.o
++ avr-gcc -std=gnu11 -mmcu=atmega328p -c kernel.c -o kernel.o
++ avr-gcc -T avr5-x-modules.ld kernel.o module_foo.o -o program.elf -Wl,-Map=program.map
++ grep dispatch program.map
0x00000002 __dispatch_start = .
*(.dispatch)
.dispatch 0x00000002 0x8 module_foo.o
0x00000002 foo_dispatch
0x00000006 foo_dispatch2
0x0000000a __dispatch_end = .
SORT(*)(.dispatch)


Related Topics



Leave a reply



Submit