lyquor/
lyquor.rs

1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_oci::pack::{DigestAlgorithm, LyquidPackDigest};
9use lyquor_oci::registry::{OCIReference, OCIRegistry, OCIRegistryAuth, Registry};
10use lyquor_primitives::{LyquidNumber, debug_struct_name};
11use serde_json::{Map, Value};
12use tokio::signal::unix::{SignalKind, signal};
13use tokio::sync::Notify;
14
15use lyquor_api::{
16    actor::Stop,
17    anyhow::{self, Context as _Context},
18    config::{DB, NodeConfig},
19    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
20    profile::{Devnet, LyquorProfile, NetworkType},
21    subkey_builder,
22};
23use lyquor_db::{MemDB, RocksDB};
24use lyquor_image_store::{DirectoryRepo, LyquidImageRepo};
25use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
26use lyquor_jsonrpc::types::{EthGetChainId, EthGetChainIdResp};
27use lyquor_lib::{http, lyquid, utils};
28use lyquor_primitives::B256;
29use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
30use lyquor_seq::{
31    SequenceBackend, eth,
32    fco::FCO,
33    side_effects::{SideEffectProviderHandle, SideEffectStore},
34};
35use lyquor_vm::log::Log;
36
37subkey_builder!(RootSubKey(
38    ([0x01])-lyquid() => Key,
39    ([0x02])-fco() => Key,
40    ([0x03])-log() => Key,
41    ([0x04])-side_effect_store() => Key,
42));
43
44fn subkey() -> &'static RootSubKey {
45    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
46    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
47}
48
49const ANVIL_STATE_FILENAME: &str = "anvil.state";
50
51static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
52
53#[derive(Debug, Message)]
54#[rtype(result = "()")]
55struct LoadLyquid {
56    id: LyquidID,
57    deps: Vec<LyquidID>,
58}
59
60#[derive(Debug, Message)]
61#[rtype(result = "()")]
62struct SetPool {
63    pool: Addr<lyquid::LyquidPool>,
64}
65
66struct LyquidDiscovery {
67    pool: Option<Addr<lyquid::LyquidPool>>,
68    register: Arc<Notify>,
69    bartender_id: LyquidID,
70    bartender_addr: Address,
71}
72
73lyquor_primitives::debug_struct_name!(LyquidDiscovery);
74
75impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
76    type Result = ResponseFuture<Address>;
77    #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
78    fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
79        let lyquor_seq::eth::GetContract { id, idx } = msg;
80        if id == self.bartender_id && idx == Some(0) {
81            return Box::pin(std::future::ready(self.bartender_addr));
82        }
83        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
84        let reg = self.register.clone();
85        Box::pin(async move {
86            loop {
87                // FIXME: improve the efficiency of this by filtering the reigster event for
88                // the specific lyquid in bartender
89                // idx=None means latest; else nth deployment
90                if let Some(n) = idx {
91                    match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
92                        Ok(Some(info)) => return info.contract,
93                        Ok(None) => reg.notified().await,
94                        Err(e) => {
95                            tracing::error!("Failed to get lyquid deployment info: {}", e);
96                            // Stall forever for safety
97                            std::future::pending::<()>().await;
98                        }
99                    }
100                } else {
101                    match pool.send(crate::lyquid::GetLatestLyquidDeploymentInfo { id }).await {
102                        Ok(Some(info)) => return info.contract,
103                        Ok(None) => reg.notified().await,
104                        Err(e) => {
105                            tracing::error!("Failed to get latest lyquid deployment info: {}", e);
106                            std::future::pending::<()>().await;
107                        }
108                    }
109                }
110            }
111        })
112    }
113}
114
115// latest variant handled in GetContract via idx=None
116
117impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
118    type Result = ();
119
120    #[tracing::instrument(level = "trace", skip(msg))]
121    fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
122        let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
123        if let Some(event) = event {
124            ctx.spawn(self.load(event.id, event.deps).into_actor(self));
125        }
126    }
127}
128
129impl Handler<LoadLyquid> for LyquidDiscovery {
130    type Result = ResponseFuture<()>;
131
132    #[tracing::instrument(level = "trace")]
133    fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
134        Box::pin(self.load(msg.id, msg.deps))
135    }
136}
137
138impl Handler<SetPool> for LyquidDiscovery {
139    type Result = ();
140
141    #[tracing::instrument(level = "trace")]
142    fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
143        if self.pool.is_some() {
144            return;
145        }
146        let pool = msg.pool;
147        let me = ctx.address();
148        self.pool = Some(pool.clone());
149        ctx.spawn(
150            async move {
151                // TODO: right now we just subscribe to every lyquid known in history, need to change this
152                // to user-configured set (or groups)
153                let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
154                    None => {
155                        // FIXME: this could be triggered when bartender has only executed one slot
156                        // upon recovery. There is a race that before bartender repeat the
157                        // execution of the first slot, its state is without image so this call
158                        // will fail.
159                        tracing::error!("Failed to obtain the registered Lyquid list.");
160                        return;
161                    }
162                    Some(lyquids_with_deps) => lyquids_with_deps,
163                };
164                // Now we have the actual dependency list from the bartender
165                for (id, deps) in lyquids_with_deps.into_iter() {
166                    //
167                    //  --- NB: We assume we already have all dependency binaries. ---
168                    //
169                    if let Err(e) = me.send(LoadLyquid { id, deps }).await {
170                        tracing::error!("Failed to send LoadLyquid message: {}", e);
171                    }
172                }
173            }
174            .into_actor(self),
175        );
176    }
177}
178
179impl LyquidDiscovery {
180    fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
181        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
182        let register = self.register.clone();
183        async move {
184            // NOTE: repo_hint is only a retrieval hint; image_hash in slots remains authoritative.
185            let (repo_hint, image_digest) = match pool
186                .send(crate::lyquid::GetLatestLyquidDeploymentInfo { id })
187                .await
188                .ok()
189                .flatten()
190            {
191                Some(info) => (info.repo_hint, info.image_digest),
192                None => (None, B256::ZERO),
193            };
194
195            if let Some(repo_hint) = repo_hint {
196                match pool.send(crate::lyquid::ImageRepo).await {
197                    Ok(repo) => {
198                        Self::pull_image_from_repo_hint(repo.0, repo_hint, image_digest)
199                            .await
200                            .unwrap_or_else(|e| {
201                                tracing::error!("Failed to pull lyquid image from repo hint: {e}");
202                            });
203                    }
204                    Err(e) => tracing::error!("LyquidDiscovery: failed to get ImageRepo: {e}"),
205                }
206            }
207
208            match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
209                Ok(result) => match result {
210                    Ok(already_exists) => {
211                        if !already_exists {
212                            tracing::info!("Discovered {}", id);
213                        }
214                    }
215                    Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
216                },
217                Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
218            }
219            register.notify_waiters();
220        }
221    }
222
223    async fn pull_image_from_repo_hint(
224        image_repo: Arc<dyn LyquidImageRepo>, repo_hint: String, image_digest: B256,
225    ) -> Result<(), String> {
226        let oci_reference = match OCIReference::from_str(&repo_hint) {
227            Ok(reference) => reference,
228            Err(primary_err) => {
229                // OCI parser can reject bare repos without a tag or digest.
230                let with_latest = format!("{repo_hint}:latest");
231                OCIReference::from_str(&with_latest)
232                    .map_err(|_| format!("Unsupported repo hint format (expected OCI reference): {primary_err}"))?
233            }
234        };
235        let host = match oci_reference.registry().split_once('/').map(|(host, _)| host) {
236            Some(h) => h,
237            None => oci_reference.registry(),
238        };
239        let auth = match OCIRegistry::get_docker_credential(host) {
240            Ok(auth) => auth,
241            Err(e) => {
242                tracing::trace!(
243                    "No docker credential found (reason: {}), falling back to anonymous auth.",
244                    e
245                );
246                OCIRegistryAuth::Anonymous
247            }
248        };
249        let oci = OCIRegistry::new(auth, oci_reference.registry());
250        let digest = LyquidPackDigest::new(image_digest, DigestAlgorithm::Sha256);
251
252        let has_image = image_repo
253            .contains(image_digest)
254            .await
255            .map_err(|e| format!("Failed to check local ImageRepo: {e}"))?;
256        if has_image {
257            tracing::debug!("LyquidDiscovery: image already exists in local ImageRepo: {image_digest:?}");
258            return Ok(());
259        }
260
261        tracing::info!("LyquidDiscovery: downloading image from repo hint: {}", repo_hint);
262
263        // Get image
264        let pack = oci
265            .pull_full(oci_reference.repository(), None, Some(&digest))
266            .await
267            .map_err(|e| e.to_string())?;
268
269        if *pack.digest().digest() != image_digest {
270            return Err(format!(
271                "Pulled image digest mismatch. expected={}, got={}",
272                digest.to_oci_digest(),
273                pack.digest().to_oci_digest()
274            ));
275        }
276
277        // Push image into repo
278        match image_repo.put(pack).await {
279            Ok(()) => (),
280            Err(e) => return Err(e.to_string()),
281        }
282
283        tracing::info!("Image digest: {:?}", image_digest);
284
285        // Is this necessary to check the hash here?
286
287        Ok(())
288    }
289}
290
291impl Handler<Stop> for LyquidDiscovery {
292    type Result = ();
293
294    #[tracing::instrument(level = "trace")]
295    fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
296        ctx.stop();
297    }
298}
299
300impl Actor for LyquidDiscovery {
301    type Context = actix::Context<Self>;
302
303    #[tracing::instrument(level = "trace", skip(_ctx))]
304    fn started(&mut self, _ctx: &mut Self::Context) {}
305
306    #[tracing::instrument(level = "trace", skip(_ctx))]
307    fn stopped(&mut self, _ctx: &mut Self::Context) {}
308}
309
310struct DBFlusher {
311    last_bn: u64,
312    db_dir: PathBuf,
313    store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
314    fco: Addr<FCO>,
315    anvil_chain: Option<ClientHandle>,
316    flush_interval_secs: u64,
317}
318
319lyquor_primitives::debug_struct_name!(DBFlusher);
320
321impl DBFlusher {
322    fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
323        let anvil_chain = self.anvil_chain.clone();
324        let fco = self.fco.clone();
325        let db_dir = self.db_dir.clone();
326        async move {
327            fco.send(lyquor_seq::fco::Commit)
328                .await
329                .map_err(anyhow::Error::from)
330                .and_then(|e| e.map_err(anyhow::Error::from))?;
331
332            if let Some(client) = &anvil_chain {
333                // If running devnet (anvil), dump chain state.
334                //
335                // We dump the anvil chain state *after* the commit of FCO, so FCO's progress will
336                // not outpace the dumped chain state (i.e., includes execution caused by the
337                // blocks later). Of course, this somehow assume Anvil's API handling has an
338                // ordering (the dump chain state is more recent than that handled previous API
339                // responses).
340                use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
341                let dump = client
342                    .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
343                    .await
344                    .map(|r| r.0);
345
346                match dump {
347                    Ok(state) => {
348                        // Write the anvil state to file
349                        let devnet_path = db_dir.join("devnet");
350                        std::fs::create_dir_all(&devnet_path)?;
351                        std::fs::write(devnet_path.join(ANVIL_STATE_FILENAME), &state)?;
352                    }
353                    Err(_) => {
354                        tracing::error!(
355                            "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
356                        );
357                    }
358                }
359            }
360
361            Ok(())
362        }
363    }
364
365    fn flush(&mut self, res: anyhow::Result<()>) {
366        if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
367            tracing::error!("Failed to write: {e}");
368        } else {
369            tracing::info!("Written to the storage backend (block={}).", self.last_bn);
370        }
371    }
372}
373
374#[derive(Message, Debug)]
375#[rtype(result = "()")]
376struct FlushDB;
377
378impl Handler<FlushDB> for DBFlusher {
379    type Result = ResponseActFuture<Self, ()>;
380
381    #[tracing::instrument(level = "trace", skip(self, _ctx))]
382    fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
383        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
384            actor.flush(res);
385        }))
386    }
387}
388
389impl Handler<Stop> for DBFlusher {
390    type Result = ResponseActFuture<Self, ()>;
391
392    #[tracing::instrument(level = "trace")]
393    fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
394        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
395            actor.flush(res);
396        }))
397    }
398}
399
400impl Actor for DBFlusher {
401    type Context = actix::Context<Self>;
402
403    #[tracing::instrument(level = "trace", skip_all)]
404    fn started(&mut self, ctx: &mut Self::Context) {
405        if self.flush_interval_secs > 0 {
406            let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
407            ctx.run_interval(periodic_write, |_act, ctx| {
408                ctx.address().do_send(FlushDB);
409            });
410        }
411    }
412
413    #[tracing::instrument(level = "trace", skip_all)]
414    fn stopped(&mut self, _ctx: &mut Self::Context) {}
415}
416
417async fn setup_eth_backend(
418    profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
419) -> anyhow::Result<eth::Backend> {
420    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
421        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
422
423    Ok(eth::Backend::new(
424        eth::Config::builder()
425            .finality_tag(finality_tag)
426            .registry(reg)
427            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
428            .build(),
429        jsonrpc_client,
430    ))
431}
432
433#[derive(Default)]
434struct Subsystems {
435    fco: Option<Addr<FCO>>,
436    log: Option<Addr<Log>>,
437    pool: Option<Addr<lyquid::LyquidPool>>,
438    api: Option<http::HttpServer>,
439    db: Option<Addr<DBFlusher>>,
440    dis: Option<Addr<LyquidDiscovery>>,
441    client: Option<ClientHandle>,
442    eth_backend: Option<lyquor_seq::eth::Backend>,
443}
444debug_struct_name!(Subsystems);
445
446impl Subsystems {
447    #[tracing::instrument(level = "trace")]
448    async fn stop(&mut self) {
449        if let Some(api) = self.api.take() {
450            api.stop();
451        }
452        if let Some(eth_backend) = self.eth_backend.take() {
453            eth_backend.stop().await;
454        }
455        if let Some(log) = self.log.take() {
456            log.send(Stop).await.ok();
457        }
458        if let Some(dis) = self.dis.take() {
459            dis.send(Stop).await.ok();
460        }
461        if let Some(pool) = self.pool.take() {
462            pool.send(Stop).await.ok();
463        }
464        if let Some(db) = self.db.take() {
465            db.send(Stop).await.ok();
466        }
467        if let Some(dis) = self.dis.take() {
468            dis.send(Stop).await.ok();
469        }
470        if let Some(fco) = self.fco.take() {
471            fco.send(Stop).await.ok();
472        }
473    }
474}
475
476/// TODO: refactor this with actix's Actor model.
477struct Node {
478    sys: Subsystems,
479    config: Arc<NodeConfig>,
480    profile: Arc<LyquorProfile>,
481    seq_eth_url: String,
482    started_anvil: bool,
483    network: Option<lyquor_net::hub::Hub>,
484}
485
486lyquor_primitives::debug_struct_name!(Node);
487
488impl Handler<Stop> for Node {
489    type Result = AtomicResponse<Self, ()>;
490
491    #[tracing::instrument(level = "trace")]
492    fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
493        let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
494        AtomicResponse::new(Box::pin(
495            async move {
496                sys.stop().await;
497            }
498            .into_actor(self)
499            .map(|_, _act, ctx| {
500                ctx.stop();
501            }),
502        ))
503    }
504}
505
506impl Actor for Node {
507    type Context = Context<Self>;
508
509    #[tracing::instrument(level = "trace", skip(ctx))]
510    fn started(&mut self, ctx: &mut Self::Context) {
511        // NOTE: use spawn here instead of wait so Stop can be handled
512        ctx.spawn(
513            Self::init(
514                self.config.clone(),
515                self.profile.clone(),
516                self.seq_eth_url.clone(),
517                self.started_anvil,
518                self.network.take().expect("Node: Network not found."),
519            )
520            .into_actor(self)
521            .map(|sys, act, ctx| {
522                match sys {
523                    Ok(sys) => {
524                        act.sys = sys;
525                    }
526                    Err(e) => {
527                        tracing::error!("Failed to initialize node: {e:?}");
528                        ctx.stop();
529                    }
530                };
531            }),
532        );
533    }
534
535    #[tracing::instrument(level = "trace", skip(_ctx))]
536    fn stopped(&mut self, _ctx: &mut Self::Context) {}
537}
538
539impl Node {
540    #[tracing::instrument(level = "trace", skip_all)]
541    async fn init(
542        config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String, started_anvil: bool,
543        network: lyquor_net::hub::Hub,
544    ) -> anyhow::Result<Subsystems> {
545        let mut sys = Subsystems::default();
546
547        // Set up Ethereum sequence backend.
548        let jsonrpc_client = ClientConfig::builder().url(seq_eth_url.parse()?).build().into_client();
549        sys.client = Some(jsonrpc_client.clone());
550        let sequence_backend_chain_id: u64 = jsonrpc_client
551            .request::<EthGetChainId, EthGetChainIdResp>(EthGetChainId)
552            .await
553            .context("Failed to query sequence backend chain ID.")?
554            .0
555            .to();
556        let sequence_backend_bartender_addr =
557            Address::from_str(profile.bartender_addr()).context("Invalid bartender address in profile.")?;
558
559        //// 1. DB & Storage
560        let (store, lyq_store, anvil_chain): (
561            Arc<ShadowKVStore<Box<dyn KVStore>>>,
562            Arc<dyn utils::LVMStoreFactory>,
563            Option<ClientHandle>,
564        ) = match &config.storage.db {
565            DB::MemDb => {
566                tracing::warn!("using MemDB, all data will be lost upon exit");
567                let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
568                (store, Arc::new(utils::LVMMemStore), None)
569            }
570            DB::RocksDb { db_dir } => {
571                let base_dir = db_dir.clone();
572                let store = Arc::new(ShadowKVStore::new(
573                    Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
574                ));
575                let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
576                    store.clone(),
577                    subkey().lyquid(),
578                ))));
579
580                // Load anvil state if we started our own anvil and state file exists
581                let anvil_chain = if started_anvil {
582                    let state_file = db_dir.join(profile.id()).join(ANVIL_STATE_FILENAME);
583                    if state_file.exists() {
584                        use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
585                        let state_data = std::fs::read(&state_file)?;
586                        let load_result = jsonrpc_client
587                            .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
588                            .await?;
589                        tracing::info!("Loaded anvil state from {}: {}.", state_file.display(), load_result.0);
590                    } else {
591                        tracing::info!(
592                            "No existing anvil state file found at {}, starting with fresh state.",
593                            state_file.display()
594                        );
595                    }
596                    Some(jsonrpc_client.clone())
597                } else {
598                    None
599                };
600
601                (store, lyq_store, anvil_chain)
602            }
603        };
604        ////
605
606        //// 2. Fate-Constrainted Ordering Subsystem.
607        // Start discovering Lyquid and sequencing contracts.
608        let dis = LyquidDiscovery {
609            pool: None,
610            register: Arc::new(Notify::new()),
611            bartender_id: *profile.bartender_id(),
612            bartender_addr: sequence_backend_bartender_addr,
613        }
614        .start();
615        sys.dis = Some(dis.clone());
616
617        let eth_backend = setup_eth_backend(&profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
618        let side_effect_store = Arc::new(SideEffectStore::new(
619            Arc::new(PrefixedKVStore::new(store.clone(), subkey().side_effect_store())),
620            1000,
621        ));
622        let side_effect_provider = SideEffectProviderHandle::local(side_effect_store.clone());
623
624        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
625        let fco = FCO::new(
626            eth_backend.clone(),
627            Arc::new(fco_store),
628            Some(profile.init_chain_position()),
629            Some(side_effect_provider),
630        )?
631        .start();
632        sys.fco = Some(fco.clone());
633        // Keep a separate reference for shutdown management
634        sys.eth_backend = Some(eth_backend);
635        ////
636
637        //// 3. Lyquor Virtual Machine Subsystem.
638        // Configure Log subsystem.
639        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
640        let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
641        sys.log = Some(log.clone());
642
643        let mut lvm = lyquid::LVM::new(
644            log.clone(),
645            network,
646            10,
647            Some(lyquor_vm::Sequencer {
648                inter: fco.clone().recipient(),
649                submit: fco.clone().recipient(),
650                get_oracle_epoch: fco.clone().recipient(),
651            }),
652        );
653        let api_subs = http::Subscriptions::new().start();
654        ////
655
656        //// 4. UPC peers
657        for peer in &config.peers {
658            lvm.upc_mut()
659                .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
660                .await
661                .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
662        }
663        ////
664
665        //// 5. Pool of all Lyquids
666        let image_repo: Arc<dyn LyquidImageRepo> = Arc::new(
667            DirectoryRepo::new(&config.storage.image_repo_dir)
668                .await
669                .context("Failed to initialize image store.")?,
670        );
671        let bartender_id = *profile.bartender_id();
672        let pool = lyquid::LyquidPool::new(
673            bartender_id,
674            lvm,
675            lyquid::LyquidPoolSetup {
676                fco: fco.clone(),
677                store_factory: lyq_store,
678                image_repo,
679                side_effect_store,
680                api_subs: api_subs.clone(),
681                sequence_backend_chain_id,
682                sequence_backend_bartender_addr,
683                config: lyquid::LyquidConfig::builder().build(),
684            },
685        )
686        .await?
687        .start();
688        sys.pool = Some(pool.clone());
689        ////
690
691        // Configure bartender.
692        let bartender_setup = {
693            let register_topic = LyteLog::tagged_value_topic("Register");
694            match log
695                .send(lyquor_vm::log::GetNumberOfRecords {
696                    id: bartender_id,
697                    topic: register_topic,
698                })
699                .await
700            {
701                Ok(count) if count > 0 => None,
702                Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
703                    id: bartender_id,
704                    topic: register_topic,
705                })),
706                Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
707            }
708        };
709
710        // Start the API server - use the same jsonrpc client as sequencer
711        let mut api = lyquor_lib::http::HttpServer::new(
712            &config.network.api_addr,
713            jsonrpc_client.clone(),
714            api_subs,
715            fco.clone(),
716            pool.clone(),
717        )
718        .await?;
719        api.start();
720        sys.api = Some(api);
721
722        // Wait for bartender to be registered.
723        if let Some(reg) = bartender_setup {
724            tracing::info!("Waiting for bartender to be registered.");
725            reg.await.context("Failed to wait for bartender registration.")?;
726        }
727        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
728        let bartender_ins = pool
729            .send(lyquid::GetBartender)
730            .await
731            .expect("Failed to obtain bartender.");
732        let bartender_ln = bartender_ins.instance().max_number().unwrap_or(LyquidNumber::ZERO);
733        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
734        bartender_ins
735            .wait_until(bartender_ln)
736            .await
737            .context("Failed to wait for the initial re-execution of bartender.")?;
738
739        // Finish up LyquidDiscovery.
740        log.send(lyquor_vm::log::Subscribe {
741            id: bartender_id,
742            topic: LyteLog::tagged_value_topic("Register"),
743            subscriber: dis.clone().recipient(),
744        })
745        .await
746        .context("Failed to subscribe to log system for LyquidDiscovery.")?;
747        dis.send(SetPool { pool: pool.clone() })
748            .await
749            .context("Failed to set pool in LyquidDiscovery.")?;
750
751        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
752
753        // Start a DB writer that persist when node is idle.
754        let db = DBFlusher {
755            last_bn: 0,
756            db_dir: config.resolved_db_dir(),
757            store,
758            fco,
759            anvil_chain,
760            flush_interval_secs: config.flush_interval_secs,
761        }
762        .start();
763        sys.db = Some(db);
764
765        Ok(sys)
766    }
767}
768
769fn generate_unused_port() -> anyhow::Result<u16> {
770    let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
771    let port = listener.local_addr().context("Failed to get local address.")?.port();
772    Ok(port)
773}
774
775fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
776    let anvil_port = generate_unused_port()?;
777    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
778    // terminate when dropped)
779    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
780    let mut sigset = SigSet::empty();
781    sigset.add(Signal::SIGINT);
782    let mut old_mask = SigSet::empty();
783    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
784        .context("Failed to mask SIGINT for anvil.")?;
785
786    let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
787
788    let stdout_reader = std::io::BufReader::new(
789        anvil
790            .child_mut()
791            .stdout
792            .take()
793            .context("Failed to read from anvil stdout.")?,
794    );
795
796    tokio::task::spawn_blocking(move || {
797        use std::io::BufRead;
798        let mut stdout_lines = stdout_reader.lines();
799        while let Some(line) = stdout_lines.next() {
800            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
801        }
802    });
803
804    tracing::info!("Anvil started at port {}.", anvil_port);
805    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
806        .context("Failed to restore old signal mask for anvil.")?;
807
808    Ok(anvil)
809}
810
811fn parse_override_value(raw: &str) -> Value {
812    if let Ok(v) = raw.parse::<bool>() {
813        return Value::from(v);
814    }
815    if let Ok(v) = raw.parse::<u64>() {
816        return Value::from(v);
817    }
818    if let Ok(v) = raw.parse::<i64>() {
819        return Value::from(v);
820    }
821    if let Ok(v) = raw.parse::<f64>() {
822        return Value::from(v);
823    }
824    Value::from(raw.to_string())
825}
826
827fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
828    let mut segments = key.split('.').peekable();
829    let mut current = overrides;
830
831    while let Some(segment) = segments.next() {
832        if segments.peek().is_none() {
833            current.insert(segment.to_string(), value);
834            return Ok(());
835        }
836
837        let entry = current
838            .entry(segment.to_string())
839            .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
840
841        match entry {
842            Value::Object(dict) => {
843                current = dict;
844            }
845            _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
846        }
847    }
848
849    Ok(())
850}
851
852fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
853    let mut overrides: Map<String, Value> = Map::new();
854
855    if let Some(entries) = matches.get_many::<String>("config-override") {
856        for entry in entries {
857            let (key, value) = entry
858                .split_once('=')
859                .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
860            insert_override_path(&mut overrides, key, parse_override_value(value))?;
861        }
862    }
863
864    Ok(overrides)
865}
866
867#[actix::main]
868async fn main() -> anyhow::Result<()> {
869    // Install panic hook to cleanup anvil on panic
870    std::panic::set_hook(Box::new(|_| {
871        if let Some(&pid) = ANVIL_PID.get() {
872            tracing::warn!("Panic detected, killing anvil process {}", pid);
873            use nix::sys::signal::{Signal, kill};
874            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
875        }
876    }));
877
878    lyquor_cli::setup_tracing()?;
879
880    println!("{}", lyquor_cli::format_logo_banner());
881
882    let matches = clap::command!()
883        .propagate_version(true)
884        .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
885        .arg(
886            clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
887                .required(false)
888                .action(clap::ArgAction::Append)
889                .value_parser(clap::builder::NonEmptyStringValueParser::new()),
890        )
891        .get_matches();
892
893    let config_path = matches.get_one::<String>("config").map(PathBuf::from);
894    let overrides = build_config_overrides(&matches)?;
895    let config = NodeConfig::load(config_path, overrides)?;
896
897    let node_seed = config.node_key.seed_bytes()?;
898    let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed);
899    let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
900        .listen_addr(config.network.upc_addr.clone())
901        .build()
902        .context("Failed to setup network.")?;
903    network.start().await.context("Failed to start network.")?;
904
905    let mut _anvil: Option<AnvilInstance> = None;
906    let profile: Arc<LyquorProfile> = Arc::new(match config.profile {
907        NetworkType::Devnet => {
908            let ws_url = match &config.force_seq_eth {
909                None => {
910                    let anvil = start_devnet_anvil()?;
911                    // Store anvil PID for panic hook
912                    ANVIL_PID.set(anvil.child().id()).ok();
913                    let ep = anvil.ws_endpoint();
914                    _anvil = Some(anvil);
915                    ep
916                }
917                Some(url) => url.to_string(),
918            };
919
920            Devnet::new(ws_url).into()
921        }
922        _ => return Err(anyhow::anyhow!("Network is not supported.")),
923    });
924    let seq_eth_url = config
925        .force_seq_eth
926        .clone()
927        .unwrap_or_else(|| profile.sequencer().to_string());
928
929    // Signal handlers should be registered before starting any actors, so they'll be stopped
930    // gracefully.
931    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
932    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
933
934    let node = Node {
935        sys: Subsystems::default(),
936        config: Arc::new(config),
937        profile: profile.clone(),
938        seq_eth_url,
939        started_anvil: _anvil.is_some(),
940        network: Some(network),
941    }
942    .start();
943
944    tokio::select! {
945        _ = sigint.recv() => {
946            tracing::info!("received SIGINT");
947        }
948        _ = sigterm.recv() => {
949            tracing::info!("received SIGTERM");
950        }
951    }
952
953    Ok(node.send(Stop).await?)
954}