Unverified Commit 23c1cdb1 authored by Zach Knox's avatar Zach Knox
Browse files

notifications work—still work to do but it's progress

also updated pods
parent fa006df2
......@@ -6,7 +6,7 @@
//
// The MIT License (MIT)
//
// Copyright (c) 2014-2016 Hearst
// Copyright (c) 2014-2018 Tristan Himmelman
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
......
......@@ -6,7 +6,7 @@
//
// The MIT License (MIT)
//
// Copyright (c) 2014-2016 Hearst
// Copyright (c) 2014-2018 Tristan Himmelman
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
......
......@@ -117,7 +117,7 @@ bool ResultsNotifier::need_to_run()
void ResultsNotifier::calculate_changes()
{
size_t table_ndx = m_query->get_table()->get_index_in_group();
if (has_run()) {
if (has_run() && have_callbacks()) {
CollectionChangeBuilder* changes = nullptr;
if (table_ndx == npos)
changes = &m_changes;
......
......@@ -49,6 +49,10 @@ ReadOnlyPropertyException::ReadOnlyPropertyException(const std::string& object_t
: std::logic_error(util::format("Cannot modify read-only property '%1.%2'", object_type, property_name))
, object_type(object_type), property_name(property_name) {}
ModifyPrimaryKeyException::ModifyPrimaryKeyException(const std::string& object_type, const std::string& property_name)
: std::logic_error(util::format("Cannot modify primary key after creation: '%1.%2'", object_type, property_name))
, object_type(object_type), property_name(property_name) {}
Object::Object(SharedRealm r, ObjectSchema const& s, RowExpr const& o)
: m_realm(std::move(r)), m_object_schema(&s), m_row(o) { }
......
......@@ -57,7 +57,7 @@ const size_t c_zeroRowIndex = 0;
const char c_object_table_prefix[] = "class_";
void create_metadata_tables(Group& group, bool partial_realm) {
void create_metadata_tables(Group& group) {
// The tables 'pk' and 'metadata' are treated specially by Sync. The 'pk' table
// is populated by `sync::create_table` and friends, while the 'metadata' table
// is simply ignored.
......@@ -76,14 +76,6 @@ void create_metadata_tables(Group& group, bool partial_realm) {
pk_table->insert_column(c_primaryKeyPropertyNameColumnIndex, type_String, c_primaryKeyPropertyNameColumnName);
}
pk_table->add_search_index(c_primaryKeyObjectClassColumnIndex);
#if REALM_ENABLE_SYNC
// Only add __ResultSets if Realm is a partial Realm
if (partial_realm)
_impl::initialize_schema(group);
#else
(void)partial_realm;
#endif
}
void set_schema_version(Group& group, uint64_t version) {
......@@ -258,7 +250,7 @@ void validate_primary_column_uniqueness(Group const& group)
} // anonymous namespace
void ObjectStore::set_schema_version(Group& group, uint64_t version) {
::create_metadata_tables(group, false);
::create_metadata_tables(group);
::set_schema_version(group, version);
}
......@@ -719,6 +711,7 @@ static void create_default_permissions(Group& group, std::vector<SchemaChange> c
static_cast<void>(changes);
static_cast<void>(sync_user_id);
#else
_impl::initialize_schema(group);
sync::set_up_basic_permissions(group, true);
// Ensure that this user exists so that local privileges checks work immediately
......@@ -786,7 +779,7 @@ void ObjectStore::apply_schema_changes(Group& group, uint64_t schema_version,
util::Optional<std::string> sync_user_id,
std::function<void()> migration_function)
{
create_metadata_tables(group, sync_user_id != util::none);
create_metadata_tables(group);
if (mode == SchemaMode::Additive) {
bool target_schema_is_newer = (schema_version < target_schema_version
......
......@@ -133,7 +133,7 @@ size_t Results::size()
case Mode::Query:
m_query.sync_view_if_needed();
if (!m_descriptor_ordering.will_apply_distinct())
return m_query.count();
return m_query.count(m_descriptor_ordering);
REALM_FALLTHROUGH;
case Mode::TableView:
evaluate_query_if_needed();
......@@ -259,17 +259,12 @@ void Results::evaluate_query_if_needed(bool wants_notifications)
return;
case Mode::Query:
m_query.sync_view_if_needed();
m_table_view = m_query.find_all();
if (!m_descriptor_ordering.is_empty()) {
m_table_view.apply_descriptor_ordering(m_descriptor_ordering);
}
m_table_view = m_query.find_all(m_descriptor_ordering);
m_mode = Mode::TableView;
REALM_FALLTHROUGH;
case Mode::TableView:
if (wants_notifications && !m_notifier && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) {
m_notifier = std::make_shared<_impl::ResultsNotifier>(*this);
_impl::RealmCoordinator::register_notifier(m_notifier);
}
if (wants_notifications)
prepare_async(ForCallback{false});
m_has_used_table_view = true;
m_table_view.sync_if_needed();
break;
......@@ -612,23 +607,31 @@ Results Results::sort(SortDescriptor&& sort) const
Results Results::filter(Query&& q) const
{
if (m_descriptor_ordering.will_apply_limit())
throw UnimplementedOperationException("Filtering a Results with a limit is not yet implemented");
return Results(m_realm, get_query().and_query(std::move(q)), m_descriptor_ordering);
}
Results Results::limit(size_t max_count) const
{
auto new_order = m_descriptor_ordering;
new_order.append_limit(max_count);
return Results(m_realm, get_query(), std::move(new_order));
}
Results Results::apply_ordering(DescriptorOrdering&& ordering)
{
DescriptorOrdering new_order = m_descriptor_ordering;
for (size_t i = 0; i < ordering.size(); ++i) {
const CommonDescriptor* desc = ordering[i];
if (const SortDescriptor* sort = dynamic_cast<const SortDescriptor*>(desc)) {
auto desc = ordering[i];
if (auto sort = dynamic_cast<const SortDescriptor*>(desc))
new_order.append_sort(std::move(*sort));
continue;
}
if (const DistinctDescriptor* distinct = dynamic_cast<const DistinctDescriptor*>(desc)) {
else if (auto distinct = dynamic_cast<const DistinctDescriptor*>(desc))
new_order.append_distinct(std::move(*distinct));
continue;
}
REALM_COMPILER_HINT_UNREACHABLE();
else if (auto limit = dynamic_cast<const LimitDescriptor*>(desc))
new_order.append_limit(std::move(*limit));
else
REALM_COMPILER_HINT_UNREACHABLE();
}
return Results(m_realm, get_query(), std::move(new_order));
}
......@@ -691,19 +694,34 @@ Results Results::snapshot() &&
REALM_COMPILER_HINT_UNREACHABLE();
}
void Results::prepare_async()
void Results::prepare_async(ForCallback force)
{
if (m_notifier) {
return;
}
if (m_realm->config().immutable()) {
throw InvalidTransactionException("Cannot create asynchronous query for immutable Realms");
if (force)
throw InvalidTransactionException("Cannot create asynchronous query for immutable Realms");
return;
}
if (m_realm->is_in_transaction()) {
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
if (force)
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
return;
}
if (m_update_policy == UpdatePolicy::Never) {
throw std::logic_error("Cannot create asynchronous query for snapshotted Results.");
if (force)
throw std::logic_error("Cannot create asynchronous query for snapshotted Results.");
return;
}
if (!force) {
// Don't do implicit background updates if we can't actually deliver them
if (!m_realm->can_deliver_notifications())
return;
// Don't do implicit background updates if there isn't actually anything
// that needs to be run.
if (!m_query.get_table() && m_descriptor_ordering.is_empty())
return;
}
m_wants_background_updates = true;
......@@ -713,7 +731,7 @@ void Results::prepare_async()
NotificationToken Results::add_notification_callback(CollectionChangeCallback cb) &
{
prepare_async();
prepare_async(ForCallback{true});
return {m_notifier, m_notifier->add_callback(std::move(cb))};
}
......@@ -748,7 +766,6 @@ void Results::Internal::set_table_view(Results& results, TableView &&tv)
REALM_ASSERT(results.m_table_view.is_in_sync());
REALM_ASSERT(results.m_table_view.is_attached());
}
#define REALM_RESULTS_TYPE(T) \
template T Results::get<T>(size_t); \
template util::Optional<T> Results::first<T>(); \
......@@ -795,4 +812,15 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c
{
}
Results::InvalidPropertyException::InvalidPropertyException(const std::string& object_type, const std::string& property_name)
: std::logic_error(util::format("Property '%1.%2' does not exist", object_type, property_name))
, object_type(object_type), property_name(property_name)
{
}
Results::UnimplementedOperationException::UnimplementedOperationException(const char* msg)
: std::logic_error(msg)
{
}
} // namespace realm
......@@ -41,10 +41,12 @@
#include <realm/sync/history.hpp>
#include <realm/sync/permissions.hpp>
#include <realm/sync/version.hpp>
#else
namespace realm {
namespace sync {
struct PermissionsCache {};
struct TableInfoCache {};
}
}
#endif
......@@ -712,6 +714,7 @@ void Realm::invalidate()
}
m_permissions_cache = nullptr;
m_table_info_cache = nullptr;
m_shared_group->end_read();
m_group = nullptr;
}
......@@ -727,11 +730,11 @@ bool Realm::compact()
throw InvalidTransactionException("Can't compact a Realm within a write transaction");
}
Group& group = read_group();
for (auto &object_schema : m_schema) {
ObjectStore::table_for_object_type(group, object_schema.name)->optimize();
verify_open();
// FIXME: when enum columns are ready, optimise all tables in a write transaction
if (m_group) {
m_shared_group->end_read();
}
m_shared_group->end_read();
m_group = nullptr;
return m_shared_group->compact();
......@@ -889,6 +892,7 @@ void Realm::close()
}
m_permissions_cache = nullptr;
m_table_info_cache = nullptr;
m_group = nullptr;
m_shared_group = nullptr;
m_history = nullptr;
......@@ -1020,7 +1024,13 @@ bool Realm::init_permission_cache()
// Admin users bypass permissions checks outside of the logic in PermissionsCache
if (m_config.sync_config && m_config.sync_config->is_partial && !m_config.sync_config->user->is_admin()) {
#if REALM_SYNC_VER_MAJOR == 3 && (REALM_SYNC_VER_MINOR < 13 || (REALM_SYNC_VER_MINOR == 13 && REALM_SYNC_VER_PATCH < 3))
m_permissions_cache = std::make_unique<sync::PermissionsCache>(read_group(), m_config.sync_config->user->identity());
#else
m_table_info_cache = std::make_unique<sync::TableInfoCache>(read_group());
m_permissions_cache = std::make_unique<sync::PermissionsCache>(read_group(), *m_table_info_cache,
m_config.sync_config->user->identity());
#endif
return true;
}
return false;
......@@ -1097,3 +1107,8 @@ Group& RealmFriend::read_group_to(Realm& realm, VersionID version)
realm.begin_read(version);
return *realm.m_group;
}
std::size_t Realm::compute_size() {
Group& group = read_group();
return group.compute_aggregated_byte_size();
}
......@@ -18,9 +18,8 @@
#include "sync/impl/sync_file.hpp"
#include "util/time.hpp"
#include <realm/util/file.hpp>
#include <realm/util/time.hpp>
#include <realm/util/scope_exit.hpp>
#include <iomanip>
......@@ -175,7 +174,7 @@ std::string create_timestamped_template(const std::string& prefix, int wildcard_
wildcard_count = std::min(WILDCARD_MAX, std::max(WILDCARD_MIN, wildcard_count));
std::time_t time = std::time(nullptr);
std::stringstream stream;
stream << prefix << "-" << util::put_time(time, "%Y%m%d-%H%M%S") << "-" << std::string(wildcard_count, 'X');
stream << prefix << "-" << util::format_local_time(time, "%Y%m%d-%H%M%S") << "-" << std::string(wildcard_count, 'X');
return stream.str();
}
......@@ -206,7 +205,6 @@ constexpr const char SyncFileManager::c_utility_directory[];
constexpr const char SyncFileManager::c_recovery_directory[];
constexpr const char SyncFileManager::c_metadata_directory[];
constexpr const char SyncFileManager::c_metadata_realm[];
constexpr const char SyncFileManager::c_user_info_file[];
std::string SyncFileManager::get_special_directory(std::string directory_name) const
{
......@@ -226,8 +224,7 @@ std::string SyncFileManager::get_base_sync_directory() const
return sync_path;
}
std::string SyncFileManager::user_directory(const std::string& local_identity,
util::Optional<SyncUserIdentifier> user_info) const
std::string SyncFileManager::user_directory(const std::string& local_identity) const
{
REALM_ASSERT(local_identity.length() > 0);
std::string escaped = util::make_percent_encoded_string(local_identity);
......@@ -237,18 +234,7 @@ std::string SyncFileManager::user_directory(const std::string& local_identity,
auto user_path = file_path_by_appending_component(get_base_sync_directory(),
escaped,
util::FilePathType::Directory);
bool dir_created = util::try_make_dir(user_path);
if (dir_created && user_info) {
// Add a text file in the user directory containing the user identity, for backup purposes.
// Only do this the first time the directory is created.
auto info_path = util::file_path_by_appending_component(user_path, c_user_info_file);
std::ofstream info_file;
info_file.open(info_path.c_str());
if (info_file.is_open()) {
info_file << user_info->user_id << "\n" << user_info->auth_server_url << "\n";
info_file.close();
}
}
util::try_make_dir(user_path);
return user_path;
}
......@@ -335,8 +321,7 @@ bool SyncFileManager::remove_realm(const std::string& local_identity, const std:
return remove_realm(realm_path);
}
std::string SyncFileManager::path(const std::string& local_identity, const std::string& raw_realm_path,
util::Optional<SyncUserIdentifier> user_info) const
std::string SyncFileManager::path(const std::string& local_identity, const std::string& raw_realm_path) const
{
REALM_ASSERT(local_identity.length() > 0);
REALM_ASSERT(raw_realm_path.length() > 0);
......@@ -344,8 +329,7 @@ std::string SyncFileManager::path(const std::string& local_identity, const std::
throw std::invalid_argument("A user or Realm can't have an identifier reserved by the filesystem.");
auto escaped = util::make_percent_encoded_string(raw_realm_path);
auto realm_path = util::file_path_by_appending_component(user_directory(local_identity, user_info), escaped);
return realm_path;
return util::file_path_by_appending_component(user_directory(local_identity), escaped);
}
std::string SyncFileManager::metadata_path() const
......
......@@ -33,13 +33,10 @@
#include <realm/lang_bind_helper.hpp>
#include <realm/util/scope_exit.hpp>
namespace {
constexpr const char* result_sets_type_name = "__ResultSets";
}
namespace realm {
namespace _impl {
using namespace ::realm::partial_sync;
void initialize_schema(Group& group)
{
......@@ -157,6 +154,14 @@ struct RowHandover {
namespace partial_sync {
InvalidRealmStateException::InvalidRealmStateException(const std::string& msg)
: std::logic_error(msg)
{}
ExistingSubscriptionException::ExistingSubscriptionException(const std::string& msg)
: std::runtime_error(msg)
{}
namespace {
template<typename F>
......@@ -169,28 +174,6 @@ void with_open_shared_group(Realm::Config const& config, F&& function)
function(*sg);
}
void update_schema(Group& group, Property matches_property)
{
Schema current_schema;
std::string table_name = ObjectStore::table_name_for_object_type(result_sets_type_name);
if (group.has_table(table_name))
current_schema = {ObjectSchema{group, result_sets_type_name}};
Schema desired_schema({
ObjectSchema(result_sets_type_name, {
{"name", PropertyType::String, Property::IsPrimary{false}, Property::IsIndexed{true}},
{"matches_property", PropertyType::String},
{"query", PropertyType::String},
{"status", PropertyType::Int},
{"error_message", PropertyType::String},
{"query_parse_counter", PropertyType::Int},
std::move(matches_property)
})
});
auto required_changes = current_schema.compare(desired_schema);
if (!required_changes.empty())
ObjectStore::apply_additive_changes(group, required_changes, true);
}
struct ResultSetsColumns {
ResultSetsColumns(Table& table, std::string const& matches_property_name)
......@@ -214,26 +197,62 @@ struct ResultSetsColumns {
size_t matches_property;
};
bool validate_existing_subscription(Table& table, ResultSetsColumns const& columns, std::string const& name,
std::string const& query, std::string const& matches_property)
// Validate the subscription about to be created against existing subscription.
// If an existing subscription already exists that matches the one we are about to create, the
// index of that Subscription is returned. If no current matching subscription exists `npos` is
// returned.
size_t validate_existing_subscription(Table& table, ResultSetsColumns const& columns, std::string const& name,
std::string const& query, std::string const& matches_property)
{
auto existing_row_ndx = table.find_first_string(columns.name, name);
if (existing_row_ndx == npos)
return false;
return npos;
StringData existing_query = table.get_string(columns.query, existing_row_ndx);
if (existing_query != query)
throw std::runtime_error(util::format("An existing subscription exists with the same name, "
"but a different query ('%1' vs '%2').",
existing_query, query));
throw ExistingSubscriptionException(util::format("An existing subscription exists with the same name, "
"but a different query ('%1' vs '%2').",
existing_query, query));
StringData existing_matches_property = table.get_string(columns.matches_property_name, existing_row_ndx);
if (existing_matches_property != matches_property)
throw std::runtime_error(util::format("An existing subscription exists with the same name, "
"but a different result type ('%1' vs '%2').",
existing_matches_property, matches_property));
throw ExistingSubscriptionException(util::format("An existing subscription exists with the same name, "
"but a different result type ('%1' vs '%2').",
existing_matches_property, matches_property));
return existing_row_ndx;
}
// Performs the logic of actually writing the subscription (if needed) to the Realm and making sure
// that the `matches_property` field is setup correctly. This method will throw if the query cannot
// be serialized or the name is already used by another subscription.
//
// The row of the resulting subscription is returned. If an old subscription exists that matches
// the one about to be created, a new subscription is not created, but the old one is returned
// instead.
RowExpr write_subscription(std::string const& object_type, std::string const& name, std::string const& query, Group& group)
{
auto matches_property = std::string(object_type) + "_matches";
auto table = ObjectStore::table_for_object_type(group, result_sets_type_name);
ResultSetsColumns columns(*table, matches_property);
// Update schema if needed.
if (columns.matches_property == npos) {
auto target_table = ObjectStore::table_for_object_type(group, object_type);
columns.matches_property = table->add_column_link(type_LinkList, matches_property, *target_table);
} else {
// FIXME: Validate that the column type and link target are correct.
}
return true;
size_t row_ndx = validate_existing_subscription(*table, columns, name, query, matches_property);
if (row_ndx == npos) {
row_ndx = sync::create_object(group, *table);
table->set_string(columns.name, row_ndx, name);
table->set_string(columns.query, row_ndx, query);
table->set_string(columns.matches_property_name, row_ndx, matches_property);
}
return table->get(row_ndx);
}
void enqueue_registration(Realm& realm, std::string object_type, std::string query, std::string name,
......@@ -247,27 +266,7 @@ void enqueue_registration(Realm& realm, std::string object_type, std::string que
try {
with_open_shared_group(config, [&](SharedGroup& sg) {
_impl::WriteTransactionNotifyingSync write(config, sg);
auto matches_property = std::string(object_type) + "_matches";
auto table = ObjectStore::table_for_object_type(write.get_group(), result_sets_type_name);
ResultSetsColumns columns(*table, matches_property);
// Update schema if needed.
if (columns.matches_property == npos) {
auto target_table = ObjectStore::table_for_object_type(write.get_group(), object_type);
columns.matches_property = table->add_column_link(type_LinkList, matches_property, *target_table);
} else {
// FIXME: Validate that the column type and link target are correct.
}
if (!validate_existing_subscription(*table, columns, name, query, matches_property)) {
auto row_ndx = sync::create_object(write.get_group(), *table);
table->set_string(columns.name, row_ndx, name);
table->set_string(columns.query, row_ndx, query);
table->set_string(columns.matches_property_name, row_ndx, matches_property);
}
write_subscription(object_type, name, query, write.get_group());
write.commit();
});
} catch (...) {
......@@ -318,77 +317,6 @@ std::string default_name_for_query(const std::string& query, const std::string&
} // unnamed namespace
void register_query(std::shared_ptr<Realm> realm, const std::string &object_class, const std::string &query,
std::function<void (Results, std::exception_ptr)> callback)
{
auto sync_config = realm->config().sync_config;
if (!sync_config || !sync_config->is_partial)
throw std::logic_error("A partial sync query can only be registered in a partially synced Realm");
if (realm->schema().find(object_class) == realm->schema().end())
throw std::logic_error("A partial sync query can only be registered for a type that exists in the Realm's schema");
auto matches_property = object_class + "_matches";
// The object schema must outlive `object` below.
std::unique_ptr<ObjectSchema> result_sets_schema;
Object raw_object;
{
realm->begin_transaction();
auto cleanup = util::make_scope_exit([&]() noexcept {
if (realm->is_in_transaction())
realm->cancel_transaction();
});
update_schema(realm->read_group(),
Property(matches_property, PropertyType::Object|PropertyType::Array, object_class));
result_sets_schema = std::make_unique<ObjectSchema>(realm->read_group(), result_sets_type_name);
CppContext context;
raw_object = Object::create<util::Any>(context, realm, *result_sets_schema,
AnyDict{
{"name", query},
{"matches_property", matches_property},
{"query", query},
{"status", int64_t(0)},
{"error_message", std::string()},
{"query_parse_counter", int64_t(0)},
}, false);
realm->commit_transaction();
}
auto object = std::make_shared<_impl::NotificationWrapper<Object>>(std::move(raw_object));
// Observe the new object and notify listener when the results are complete (status != 0).