1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
extern crate futures;
use action::{Action, ActionWrapper};
use agent::actions::commit::commit_entry;
use context::Context;
use futures::{executor::block_on, future, Async, Future};
use holochain_core_types::entry::ToEntry;
use holochain_dna::Dna;
use instance::dispatch_action_and_wait;
use nucleus::{
    ribosome::callback::{genesis::genesis, CallbackParams, CallbackResult},
    state::NucleusStatus,
};
use std::{sync::Arc, thread, time::*};

/// Timeout in seconds for initialization process.
/// Future will resolve to an error after this duration.
const INITIALIZATION_TIMEOUT: u64 = 30;

/// Initialize Application, Action Creator
/// This is the high-level initialization function that wraps the whole process of initializing an
/// instance. It creates both InitApplication and ReturnInitializationResult actions asynchronously.
///
/// Returns a future that resolves to an Ok(NucleusStatus) or an Err(String) which carries either
/// the Dna error or errors from the genesis callback.
///
/// Use futures::executor::block_on to wait for an initialized instance.
pub fn initialize_application(
    dna: Dna,
    context: Arc<Context>,
) -> Box<dyn Future<Item = NucleusStatus, Error = String>> {
    if context.state().unwrap().nucleus().status != NucleusStatus::New {
        return Box::new(future::err(
            "Can't trigger initialization: Nucleus status is not New".to_string(),
        ));
    }

    let context_clone = context.clone();

    thread::spawn(move || {
        let action_wrapper = ActionWrapper::new(Action::InitApplication(dna.clone()));
        dispatch_action_and_wait(
            &context_clone.action_channel,
            &context_clone.observer_channel,
            action_wrapper.clone(),
        );

        // Commit DNA to chain
        let dna_entry = dna.to_entry();
        let dna_commit = block_on(commit_entry(
            dna_entry,
            &context_clone.action_channel.clone(),
            &context_clone,
        ));

        // Let initialization fail if DNA could not be committed.
        // Currently this cannot happen since ToEntry for Dna always creates
        // an entry from a Dna object. So I can't create a test for the code below.
        // Hence skipping it for codecov for now but leaving it in for resilience.
        #[cfg_attr(tarpaulin, skip)]
        {
            if dna_commit.is_err() {
                context_clone
                    .action_channel
                    .send(ActionWrapper::new(Action::ReturnInitializationResult(
                        Some(dna_commit.map_err(|e| e.to_string()).err().unwrap()),
                    )))
                    .expect("Action channel not usable in initialize_application()");
                return;
            };
        }

        // Commit AgentId to chain
        let agent_id_entry = context_clone.agent.to_entry();
        let agent_id_commit = block_on(commit_entry(
            agent_id_entry,
            &context_clone.action_channel.clone(),
            &context_clone,
        ));

        // Let initialization fail if AgentId could not be committed.
        // Currently this cannot happen since ToEntry for Agent always creates
        // an entry from an Agent object. So I can't create a test for the code below.
        // Hence skipping it for codecov for now but leaving it in for resilience.
        #[cfg_attr(tarpaulin, skip)]
        {
            if agent_id_commit.is_err() {
                context_clone
                    .action_channel
                    .send(ActionWrapper::new(Action::ReturnInitializationResult(
                        Some(agent_id_commit.map_err(|e| e.to_string()).err().unwrap()),
                    )))
                    .expect("Action channel not usable in initialize_application()");
                return;
            };
        }

        // map genesis across every zome
        let results: Vec<_> = dna
            .zomes
            .keys()
            .map(|zome_name| genesis(context_clone.clone(), zome_name, &CallbackParams::Genesis))
            .collect();

        let fail_result = results.iter().find(|ref r| match r {
            CallbackResult::Fail(_) => true,
            _ => false,
        });

        let maybe_error = match fail_result {
            Some(result) => match result {
                CallbackResult::Fail(error_string) => Some(error_string.clone()),
                _ => None,
            },
            None => None,
        };

        context_clone
            .action_channel
            .send(ActionWrapper::new(Action::ReturnInitializationResult(
                maybe_error,
            )))
            .expect("Action channel not usable in initialize_application()");
    });

    Box::new(InitializationFuture {
        context: context.clone(),
        created_at: Instant::now(),
    })
}

/// InitializationFuture resolves to an Ok(NucleusStatus) or an Err(String).
/// Tracks the nucleus status.
pub struct InitializationFuture {
    context: Arc<Context>,
    created_at: Instant,
}

impl Future for InitializationFuture {
    type Item = NucleusStatus;
    type Error = String;

    fn poll(
        &mut self,
        cx: &mut futures::task::Context<'_>,
    ) -> Result<Async<Self::Item>, Self::Error> {
        //
        // TODO: connect the waker to state updates for performance reasons
        // See: https://github.com/holochain/holochain-rust/issues/314
        //
        cx.waker().wake();

        if Instant::now().duration_since(self.created_at)
            > Duration::from_secs(INITIALIZATION_TIMEOUT)
        {
            return Err("Timeout while initializing".to_string());
        }
        if let Some(state) = self.context.state() {
            match state.nucleus().status {
                NucleusStatus::New => Ok(futures::Async::Pending),
                NucleusStatus::Initializing => Ok(futures::Async::Pending),
                NucleusStatus::Initialized => Ok(futures::Async::Ready(NucleusStatus::Initialized)),
                NucleusStatus::InitializationFailed(ref error) => Err(error.clone()),
            }
        } else {
            Ok(futures::Async::Pending)
        }
    }
}