Make a Publisher from a Callback

Make a Publisher from a callback

It seems that Combine's Future is the correct tool for the job.

func createPersistentContainer(name: String) -> AnyPublisher<NSPersistentContainer, Error> {
let future = Future<NSPersistentContainer, Error> { promise in
let container = NSPersistentContainer(name: name)
container.loadPersistentStores { _, error in
if let error = error {
promise(.failure(error))
} else {
promise(.success(container))
}
}
}
return AnyPublisher(future)
}

Transform callback approach to reactive with Combine

So you have some AccountDetails type:

import Combine
import FirebaseAuth

struct AccountDetails {
var userId: String
var name: String?
var isLoggedIn: Bool
var isPremiumUser: Bool
}

Let's extend it with an init that takes a User, because it will simplify things later:

extension AccountDetails {
init(user: User) {
self.userId = user.uid
self.name = user.displayName
self.isLoggedIn = true
self.isPremiumUser = false
}
}

I think your end goal is a Publisher that emits AccountDetails. But since there isn't always a logged-in user, it should really emit Optional<AccountDetails>, so that it can emit nil when the user logs out.

Let's start by wrapping the addStateDidChangeListener API in a Publisher. We can't use a Future for this, because a Future emits at most one output, but addStateDidChangeListener can emit multiple events. So we'll use a CurrentValueSubject instead. That means we need a place to store the subject and the AuthStateDidChangeListenerHandle. You could store them as globals, or in your AppDelegate, or wherever you feel is appropriate. For this answer, let's create a Demo class to hold them:

class Demo {
static let shared = Demo()

let userPublisher: AnyPublisher<User?, Error>

private let userSubject = CurrentValueSubject<User?, Error>(nil)
private var tickets: [AnyCancellable] = []

private init() {
userPublisher = userSubject.eraseToAnyPublisher()
let handle = Auth.auth().addStateDidChangeListener { [userSubject] (_, user) in
userSubject.send(user)
}
AnyCancellable { Auth.auth().removeStateDidChangeListener(handle) }
.store(in: &tickets)
}
}

So now you can get a Publisher of the logged-in user (or nil if no user is logged in) like this:

let loggedInUserPublisher: AnyPublisher<User?, Error> = Demo.shared.userPublisher

But you really want an AccountDetails? publisher, not a User? publisher, like this:

let accountDetailsPublisher: AnyPublisher<AccountDetails?, Error> = Demo.shared
.accountDetailsPublisher()

So we need to write an accountDetailsPublisher method that maps the User? to an AccountDetails?.

If the User? is nil, we just want to emit nil. But if the User? is .some(user), we need to do more asynchronous actions: we need to check whether the user is in the database, and add the user if not. The flatMap operator lets you chain asynchronous actions, but there's some complexity because we need to take different actions depending on the output of the upstream publisher.

We'd really like to hide the complexity away and just write this:

extension Demo {
func loggedInAccountDetailsPublisher() -> AnyPublisher<AccountDetails?, Error> {
return userPublisher
.flatMap(
ifSome: { $0.accountDetailsPublisher().map { Optional.some($0) } },
ifNone: { Just(nil).setFailureType(to: Error.self) })
.eraseToAnyPublisher()
}
}

But then we need to write flatMap(ifSome:ifNone:). Here it is:

extension Publisher {
func flatMap<Wrapped, Some: Publisher, None: Publisher>(
ifSome: @escaping (Wrapped) -> Some,
ifNone: @escaping () -> None
) -> AnyPublisher<Some.Output, Failure>
where Output == Optional<Wrapped>, Some.Output == None.Output, Some.Failure == Failure, None.Failure == Failure
{
return self
.flatMap { $0.map { ifSome($0).eraseToAnyPublisher() } ?? ifNone().eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}

Now we need to implement accountDetailsPublisher in a User extension. What does this method need to do? It needs to check whether the User is in the database (an asynchronous action) and, if not, add the User (another asynchronous action). Since we need to chain asynchronous actions, we again need flatMap. But we'd really like to just write this:

extension User {
func accountDetailsPublisher() -> AnyPublisher<AccountDetails, Error> {
return isInDatabasePublisher()
.flatMap(
ifTrue: { Just(AccountDetails(user: self)).setFailureType(to: Error.self) },
ifFalse: { self.addToDatabase().map { AccountDetails(user: self) } })
}
}

Here is flatMap(ifTrue:ifFalse:):

extension Publisher where Output == Bool {
func flatMap<True: Publisher, False: Publisher>(
ifTrue: @escaping () -> True,
ifFalse: @escaping () -> False
) -> AnyPublisher<True.Output, Failure>
where True.Output == False.Output, True.Failure == Failure, False.Failure == Failure
{
return self
.flatMap { return $0 ? ifTrue().eraseToAnyPublisher() : ifFalse().eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}

Now we need to write isInDatabasePublisher and addToDatabase methods on User. I don't have the source code to your checkIfUserIsInDatabase and createEmptyUser functions, so I can't convert them to publishers directly. But we can wrap them using Future:

extension User {
func isInDatabasePublisher() -> AnyPublisher<Bool, Error> {
return Future { promise in
checkIfUserIsInDatabase(user: self.uid, completion: promise)
}.eraseToAnyPublisher()
}

func addToDatabase() -> AnyPublisher<Void, Error> {
return Future { promise in
createEmptyUser(user: self.uid, email: self.email, completion: promise)
} //
.map { _ in } // convert Bool to Void
.eraseToAnyPublisher()
}
}

Note that, since your example code ignores the Bool output of createEmptyUser, I wrote addToDatabase to output Void instead.

Attach a callback to a publish

I don't know of a "correct" way to do this but you could try using a separate channel and enforcing the connection "by convention":

dojo.subscribe('fooChannel', function(){
....
dojo.publish('fooChannelComplete', [...]);
});

A helper function to make this more seamless:

function add_to_foo(f){
dojo.subscribe('fooChannel', function(){
var ret = f.apply(this, arguments);
dojo.publish('fooChannelComplete', [ret]);
});
}

ROS - How do I publish a message and get the subscribed callback immediately

I think you need the ROS_Services (client/server) pattern instead of the publisher/subscriber.


Here is a simple example to do that in Python:

Client code snippet:

import rospy
from test_service.srv import MySrvFile

rospy.wait_for_service('a_topic')
try:
send_hi = rospy.ServiceProxy('a_topic', MySrvFile)
print('Client: Hi, do you hear me?')
resp = send_hi('Hi, do you hear me?')
print("Server: {}".format(resp.response))

except rospy.ServiceException, e:
print("Service call failed: %s"%e)

Server code snippet:

import rospy
from test_service.srv import MySrvFile, MySrvFileResponse

def callback_function(req):
print(req)
return MySrvFileResponse('Hello client, your message received.')

rospy.init_node('server')
rospy.Service('a_topic', MySrvFile, callback_function)
rospy.spin()

MySrvFile.srv

string request
---
string response

Server out:

request: "Hi, do you hear me?"

Client out:

Client: Hi, do you hear me?
Server: Hello client, your message received.


Learn more in ros-wiki

  • Project repo on GitHub.

[UPDATE]

  • If you are looking for fast communication, TCP-ROS communication is not your purpose because it is slower than a broker-less communicator like ZeroMQ (it has low latency and high throughput):
  1. ROS-Service pattern equivalent in ZeroMQ is REQ/REP (client/server)
  2. ROS publisher/subscriber pattern equivalent in ZeroMQ is PUB/SUB
  3. ROS publisher/subscriber with waitformessage equivalent in ZeroMQ is PUSH/PULL

ZeroMQ is available in both Python and C++

  • Also, to transfer huge amounts of data (e.g. pointcloud), there is a mechanism in ROS called nodelet which is supported only in C++. This communication is based on shared memory on a machine instead of TCP-ROS socket.

    What exactly is a nodelet?

Callback function does not change value of local variables (ROS)

I think it is because you are using a loop inside the callback, in the line while round(msg.ranges[360],3) != rounded_min. Then, the callback gets "stuck", and it is called only once, causing msg.ranges to remain constant. However, I am not able to test your code just now.



Related Topics



Leave a reply



Submit