Make a Publisher from a callback
It seems that Combine's Future
is the correct tool for the job.
func createPersistentContainer(name: String) -> AnyPublisher<NSPersistentContainer, Error> {
let future = Future<NSPersistentContainer, Error> { promise in
let container = NSPersistentContainer(name: name)
container.loadPersistentStores { _, error in
if let error = error {
promise(.failure(error))
} else {
promise(.success(container))
}
}
}
return AnyPublisher(future)
}
Transform callback approach to reactive with Combine
So you have some AccountDetails
type:
import Combine
import FirebaseAuth
struct AccountDetails {
var userId: String
var name: String?
var isLoggedIn: Bool
var isPremiumUser: Bool
}
Let's extend it with an init
that takes a User
, because it will simplify things later:
extension AccountDetails {
init(user: User) {
self.userId = user.uid
self.name = user.displayName
self.isLoggedIn = true
self.isPremiumUser = false
}
}
I think your end goal is a Publisher
that emits AccountDetails
. But since there isn't always a logged-in user, it should really emit Optional<AccountDetails>
, so that it can emit nil
when the user logs out.
Let's start by wrapping the addStateDidChangeListener
API in a Publisher
. We can't use a Future
for this, because a Future
emits at most one output, but addStateDidChangeListener
can emit multiple events. So we'll use a CurrentValueSubject
instead. That means we need a place to store the subject and the AuthStateDidChangeListenerHandle
. You could store them as globals, or in your AppDelegate
, or wherever you feel is appropriate. For this answer, let's create a Demo
class to hold them:
class Demo {
static let shared = Demo()
let userPublisher: AnyPublisher<User?, Error>
private let userSubject = CurrentValueSubject<User?, Error>(nil)
private var tickets: [AnyCancellable] = []
private init() {
userPublisher = userSubject.eraseToAnyPublisher()
let handle = Auth.auth().addStateDidChangeListener { [userSubject] (_, user) in
userSubject.send(user)
}
AnyCancellable { Auth.auth().removeStateDidChangeListener(handle) }
.store(in: &tickets)
}
}
So now you can get a Publisher
of the logged-in user (or nil if no user is logged in) like this:
let loggedInUserPublisher: AnyPublisher<User?, Error> = Demo.shared.userPublisher
But you really want an AccountDetails?
publisher, not a User?
publisher, like this:
let accountDetailsPublisher: AnyPublisher<AccountDetails?, Error> = Demo.shared
.accountDetailsPublisher()
So we need to write an accountDetailsPublisher
method that maps the User?
to an AccountDetails?
.
If the User?
is nil, we just want to emit nil
. But if the User?
is .some(user)
, we need to do more asynchronous actions: we need to check whether the user is in the database, and add the user if not. The flatMap
operator lets you chain asynchronous actions, but there's some complexity because we need to take different actions depending on the output of the upstream publisher.
We'd really like to hide the complexity away and just write this:
extension Demo {
func loggedInAccountDetailsPublisher() -> AnyPublisher<AccountDetails?, Error> {
return userPublisher
.flatMap(
ifSome: { $0.accountDetailsPublisher().map { Optional.some($0) } },
ifNone: { Just(nil).setFailureType(to: Error.self) })
.eraseToAnyPublisher()
}
}
But then we need to write flatMap(ifSome:ifNone:)
. Here it is:
extension Publisher {
func flatMap<Wrapped, Some: Publisher, None: Publisher>(
ifSome: @escaping (Wrapped) -> Some,
ifNone: @escaping () -> None
) -> AnyPublisher<Some.Output, Failure>
where Output == Optional<Wrapped>, Some.Output == None.Output, Some.Failure == Failure, None.Failure == Failure
{
return self
.flatMap { $0.map { ifSome($0).eraseToAnyPublisher() } ?? ifNone().eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}
Now we need to implement accountDetailsPublisher
in a User
extension. What does this method need to do? It needs to check whether the User
is in the database (an asynchronous action) and, if not, add the User
(another asynchronous action). Since we need to chain asynchronous actions, we again need flatMap
. But we'd really like to just write this:
extension User {
func accountDetailsPublisher() -> AnyPublisher<AccountDetails, Error> {
return isInDatabasePublisher()
.flatMap(
ifTrue: { Just(AccountDetails(user: self)).setFailureType(to: Error.self) },
ifFalse: { self.addToDatabase().map { AccountDetails(user: self) } })
}
}
Here is flatMap(ifTrue:ifFalse:)
:
extension Publisher where Output == Bool {
func flatMap<True: Publisher, False: Publisher>(
ifTrue: @escaping () -> True,
ifFalse: @escaping () -> False
) -> AnyPublisher<True.Output, Failure>
where True.Output == False.Output, True.Failure == Failure, False.Failure == Failure
{
return self
.flatMap { return $0 ? ifTrue().eraseToAnyPublisher() : ifFalse().eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}
Now we need to write isInDatabasePublisher
and addToDatabase
methods on User
. I don't have the source code to your checkIfUserIsInDatabase
and createEmptyUser
functions, so I can't convert them to publishers directly. But we can wrap them using Future
:
extension User {
func isInDatabasePublisher() -> AnyPublisher<Bool, Error> {
return Future { promise in
checkIfUserIsInDatabase(user: self.uid, completion: promise)
}.eraseToAnyPublisher()
}
func addToDatabase() -> AnyPublisher<Void, Error> {
return Future { promise in
createEmptyUser(user: self.uid, email: self.email, completion: promise)
} //
.map { _ in } // convert Bool to Void
.eraseToAnyPublisher()
}
}
Note that, since your example code ignores the Bool
output of createEmptyUser
, I wrote addToDatabase
to output Void
instead.
Attach a callback to a publish
I don't know of a "correct" way to do this but you could try using a separate channel and enforcing the connection "by convention":
dojo.subscribe('fooChannel', function(){
....
dojo.publish('fooChannelComplete', [...]);
});
A helper function to make this more seamless:
function add_to_foo(f){
dojo.subscribe('fooChannel', function(){
var ret = f.apply(this, arguments);
dojo.publish('fooChannelComplete', [ret]);
});
}
ROS - How do I publish a message and get the subscribed callback immediately
I think you need the ROS_Services (client/server) pattern instead of the publisher/subscriber.
Here is a simple example to do that in Python:
Client code snippet:
import rospy
from test_service.srv import MySrvFile
rospy.wait_for_service('a_topic')
try:
send_hi = rospy.ServiceProxy('a_topic', MySrvFile)
print('Client: Hi, do you hear me?')
resp = send_hi('Hi, do you hear me?')
print("Server: {}".format(resp.response))
except rospy.ServiceException, e:
print("Service call failed: %s"%e)
Server code snippet:
import rospy
from test_service.srv import MySrvFile, MySrvFileResponse
def callback_function(req):
print(req)
return MySrvFileResponse('Hello client, your message received.')
rospy.init_node('server')
rospy.Service('a_topic', MySrvFile, callback_function)
rospy.spin()
MySrvFile.srv
string request
---
string response
Server out:
request: "Hi, do you hear me?"
Client out:
Client: Hi, do you hear me?
Server: Hello client, your message received.
Learn more in ros-wiki
- Project repo on GitHub.
[UPDATE]
- If you are looking for fast communication, TCP-ROS communication is not your purpose because it is slower than a broker-less communicator like ZeroMQ (it has low latency and high throughput):
- ROS-Service pattern equivalent in ZeroMQ is REQ/REP (client/server)
- ROS publisher/subscriber pattern equivalent in ZeroMQ is PUB/SUB
- ROS publisher/subscriber with
waitformessage
equivalent in ZeroMQ is PUSH/PULL
ZeroMQ is available in both Python and C++
- Also, to transfer huge amounts of data (e.g.
pointcloud
), there is a mechanism in ROS callednodelet
which is supported only in C++. This communication is based on shared memory on a machine instead of TCP-ROS socket.What exactly is a nodelet?
Callback function does not change value of local variables (ROS)
I think it is because you are using a loop inside the callback, in the line while round(msg.ranges[360],3) != rounded_min
. Then, the callback gets "stuck", and it is called only once, causing msg.ranges
to remain constant. However, I am not able to test your code just now.
Related Topics
How to Update UIviewrepresentable with Observableobject
How to Get .Adjustsfontsizetofitwidth to Function Properly
Access Class Property from Instance
Debug View Hierarchy Does Not Render UI
Nsdatepicker in Nsstatusbar Nssmenuitem Not Receiving Input
Sknode Subclass Generates Error: Cannot Invoke Initializer for Type "X" with No Arguments
How to Call Completionhandler for Performfetchwithcompletionhandler in Swift
Xcode 7 Cast from Xcuielement to Unrelated Type 'string' Always Fails While Fetching JSON
Swift 2.0 Replicate Objc_Association_Retain
Make a Publisher from a Callback
Why Swift Call Too Shallow Here
How to Completely Remove All Xcode Program and Cache Files
How to Test Whether an Array's Type Is Optional